

# Amazon EventBridge Pipes
<a name="eb-pipes"></a>

Amazon EventBridge Pipes connects sources to targets. Pipes are intended for point-to-point integrations between supported [sources](eb-pipes-event-source.md) and [targets](eb-pipes-event-target.md), with support for advanced transformations and [enrichment](pipes-enrichment.md). It reduces the need for specialized knowledge and integration code when developing event-driven architectures, fostering consistency across your company’s applications. To set up a pipe, you choose the source, add optional filtering, define optional enrichment, and choose the target for the event data.

**Note**  
You can also route events using event buses. Event buses are well-suited for many-to-many routing of events between event-driven services. For more information, see [Event buses in Amazon EventBridge](eb-event-bus.md).

## How EventBridge Pipes work
<a name="pipes-how-it-works"></a>

At a high level, here's how EventBridge Pipes works:

1. You create a pipe in your account. This includes:
   + Specifying one of the supported [event sources](eb-pipes-event-source.md) from which you want your pipe to receive events.
   + Optionally, configuring a filter so that the pipe only processes a subset of the events it receives from the source.
   + Optionally, configuring an enrichment step that enhances the event data before sending it to the target.
   + Specifying one of the supported [targets](eb-pipes-event-target.md) to which you want your pipe to send events.

1. The event source begins sending events to the pipe, and the pipe processes the event before sending it to the target.
   + If you have configured a filter, the pipe evaluates the event and only sends it to the target if it matches that filter.

     You are only charged for those events that match the filter.
   + If you have configured an enrichment, the pipe performs that enrichment on the event before sending it to the target.

     If the events are batched, the enrichment maintains the ordering of the events in the batch.

![\[A source sends events to a pipe, which filters and routes matching events to the target.\]](http://docs.aws.amazon.com/eventbridge/latest/userguide/images/pipes-overview_eventbridge_architectural.svg)


For example, a pipe could be used to create an e-commerce system. Suppose you have an API that contains customer information, such as shipping addresses. 

1. You then create a pipe with the following: 
   + An Amazon SQS order received message queue as the event source.
   + An EventBridge API Destination as an enrichment
   + An AWS Step Functions state machine as the target

1. Then, when an Amazon SQS order received message appears in the queue, it is sent to your pipe.

1. The pipe then sends that data to the EventBridge API Destination enrichment, which returns the customer information for that order. 

1. Lastly, the pipe sends the enriched data to the AWS Step Functions state machine, which processes the order.

# Amazon EventBridge Pipes concepts
<a name="pipes-concepts"></a>

Here's a closer look at the basic components of EventBridge Pipes.

## Pipe
<a name="pipes-concepts-pipe"></a>

A pipe routes events from a single source to a single target. The pipe also includes the ability to filter for specific events, and to perform enrichments on the event data before it is sent to the target.

![\[A pipe routes filtered events to the specified target, with optional enrichment steps.\]](http://docs.aws.amazon.com/eventbridge/latest/userguide/images/pipes-overview-detailed_eventbridge_architectural.svg)


## Source
<a name="pipes-sources"></a>

EventBridge Pipes receives event data from a variety of sources, applies optional filters and enrichment to that data, and sends it to a target. If a source enforces order to the events sent to pipes, that order is maintained throughout the entire process to the target. 

For more information about sources, see [Amazon EventBridge Pipes sources](eb-pipes-event-source.md).

## Filters
<a name="pipes-filtering"></a>

A pipe can filter a given source’s events and then process only a subset of those events. To configure filtering on a pipe, you define an event pattern the pipe uses to determine which events to send to the target. 

You are only charged for those events that match the filter.

For more information, see [Event filtering in Amazon EventBridge Pipes](eb-pipes-event-filtering.md).

## Enrichment
<a name="pipes-enrichment-overview"></a>

With the enrichment step of EventBridge Pipes, you can enhance the data from the source before sending it to the target. For example, you might receive *Ticket created* events that don’t include the full ticket data. Using enrichment, you can have a Lambda function call the `get-ticket` API for the full ticket details. The pipe can then send that information to a [target](eb-pipes-event-target.md).

For more information about enriching event data, see [Event enrichment in Amazon EventBridge Pipes](pipes-enrichment.md).

## Target
<a name="pipes-targets"></a>

After the event data has been filtered and enriched, you can send it to a specific target, such as an Amazon Kinesis stream or an Amazon CloudWatch log group. For a list of the available targets, see [Amazon EventBridge Pipes targets](eb-pipes-event-target.md).

You can transform the data after it’s enhanced and before it’s sent by the pipe to the target. For more information, see [Amazon EventBridge Pipes input transformation](eb-pipes-input-transformation.md).

Multiple pipes, each with a different source, can send events to the same target.

You can also use pipes and event buses together to send events to multiple targets. A common use case is to create a pipe with an event bus as its target; the pipe sends events to the event bus, which then sends those events on to multiple targets. For example, you could create a pipe with a DynamoDB stream for a source, and an event bus as the target. The pipe receives events from the DynamoDB stream and sends them to the event bus, which then sends them on to multiple targets according to the rules you've specified on the event bus.

# Event source permissions for Amazon EventBridge Pipes
<a name="eb-pipes-permissions"></a>

When setting up a pipe, you can use an existing execution role, or have EventBridge create one for you with the needed permissions. The permissions EventBridge Pipes requires vary based on the source type, and are listed below. If you’re setting up your own execution role, you must add these permissions yourself.

**Note**  
If you’re unsure of the exact well-scoped permissions required to access the source, use the EventBridge Pipes console to create a new role, then inspect the actions listed in the policy.

**Topics**
+ [DynamoDB execution role permissions](#pipes-perms-ddb)
+ [Kinesis execution role permissions](#pipes-perms-ak)
+ [Amazon MQ execution role permissions](#pipes-perms-mq)
+ [Amazon MSK execution role permissions](#pipes-perms-msk)
+ [Self managed Apache Kafka execution role permissions](#pipes-perms-kafka)
+ [Amazon SQS execution role permissions](#pipes-perms-sqs)
+ [Enrichment and target permissions](#pipes-perms-enhance-target)

## DynamoDB execution role permissions
<a name="pipes-perms-ddb"></a>

For DynamoDB Streams, EventBridge Pipes requires the following permissions to manage resources that are related to your DynamoDB data stream.
+ [https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_DescribeStream.html](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_DescribeStream.html)
+ [https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html)
+ [https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html)
+ [https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_ListStreams.html](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_ListStreams.html)

To send records of failed batches to the pipe dead-letter queue, your pipe execution role needs the following permission:
+ [https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)

## Kinesis execution role permissions
<a name="pipes-perms-ak"></a>

For Kinesis, EventBridge Pipes requires the following permissions to manage resources that are related to your Kinesis data stream.
+ [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html)
+ [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamSummary.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamSummary.html)
+ [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html)
+ [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html)
+ [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html)
+ [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreams.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreams.html)
+ [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)

To send records of failed batches to the pipe dead-letter queue, your pipe execution role needs the following permission:
+ [https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html)

## Amazon MQ execution role permissions
<a name="pipes-perms-mq"></a>

For Amazon MQ, EventBridge Pipes requires the following permissions to manage resources that are related to your Amazon MQ message broker.
+ [https://docs.aws.amazon.com/amazon-mq/latest/api-reference/brokers-broker-id.html#brokers-broker-id-http-methods](https://docs.aws.amazon.com/amazon-mq/latest/api-reference/brokers-broker-id.html#brokers-broker-id-http-methods)
+ [https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html](https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html)
+ [https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_CreateNetworkInterface.html](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_CreateNetworkInterface.html)
+ [https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DeleteNetworkInterface.html](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DeleteNetworkInterface.html)
+ [https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeNetworkInterfaces.html](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeNetworkInterfaces.html)
+ [https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSecurityGroups.html](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSecurityGroups.html)
+ [https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSubnets.html](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSubnets.html)
+ [https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeVpcs.html](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeVpcs.html)
+ [https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogGroup.html](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogGroup.html)
+ [https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogStream.html](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogStream.html)
+ [https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html)

## Amazon MSK execution role permissions
<a name="pipes-perms-msk"></a>

For Amazon MSK, EventBridge requires the following permissions to manage resources that are related to your Amazon MSK topic.

**Note**  
If you're using IAM role-based authentication, your execution role will need the permissions listed in [IAM role-based authentication](eb-pipes-msk.md#pipes-msk-permissions-iam-policy) in addition the ones listed below.
+ [https://docs.aws.amazon.com/MSK/2.0/APIReference/v2-clusters-clusterarn.html#v2-clusters-clusterarnget](https://docs.aws.amazon.com/MSK/2.0/APIReference/v2-clusters-clusterarn.html#v2-clusters-clusterarnget)
+ [https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-bootstrap-brokers.html#clusters-clusterarn-bootstrap-brokersget](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-bootstrap-brokers.html#clusters-clusterarn-bootstrap-brokersget)
+ [https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_CreateNetworkInterface.html](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_CreateNetworkInterface.html)
+ [https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeNetworkInterfaces.html](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeNetworkInterfaces.html)
+ [https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeVpcs.html](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeVpcs.html)
+ [https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DeleteNetworkInterface.html](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DeleteNetworkInterface.html)
+ [https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSubnets.html](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSubnets.html)
+ [https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSecurityGroups.html](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSecurityGroups.html)
+ [https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogGroup.html](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogGroup.html)
+ [https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogStream.html](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogStream.html)
+ [https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html)

## Self managed Apache Kafka execution role permissions
<a name="pipes-perms-kafka"></a>

For self managed Apache Kafka, EventBridge requires the following permissions to manage resources that are related to your self managed Apache Kafka stream.

### Required permissions
<a name="pipes-perms-kafka-req"></a>

To create and store logs in a log group in Amazon CloudWatch Logs, your pipe must have the following permissions in its execution role:
+ [https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogGroup.html](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogGroup.html)
+ [https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogStream.html](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogStream.html)
+ [https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html)

### Optional permissions
<a name="pipes-perms-kafka-optional"></a>

Your pipe might also need permissions to:
+ Describe your Secrets Manager secret.
+ Access your AWS Key Management Service (AWS KMS) customer managed key.
+ Access your Amazon VPC.

### Secrets Manager and AWS KMS permissions
<a name="pipes-perms-kafka-sm-kms"></a>

Depending on the type of access control that you're configuring for your Apache Kafka brokers, your pipe might need permission to access your Secrets Manager secret or to decrypt your AWS KMS customer managed key. To access these resources, your function's execution role must have the following permissions:
+ [https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html](https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html)
+ [https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html](https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html)

### VPC permissions
<a name="pipes-perms-kafka-vpc"></a>

If only users within a VPC can access your self managed Apache Kafka cluster, your pipe must have permission to access your Amazon VPC resources. These resources include your VPC, subnets, security groups, and network interfaces. To access these resources, your pipe's execution role must have the following permissions:
+ [https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_CreateNetworkInterface.html](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_CreateNetworkInterface.html)
+ [https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeNetworkInterfaces.html](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeNetworkInterfaces.html)
+ [https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeVpcs.html](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeVpcs.html)
+ [https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DeleteNetworkInterface.html](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DeleteNetworkInterface.html)
+ [https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSubnets.html](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSubnets.html)
+ [https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSecurityGroups.html](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSecurityGroups.html)

## Amazon SQS execution role permissions
<a name="pipes-perms-sqs"></a>

For Amazon SQS, EventBridge requires the following permissions to manage resources that are related to your Amazon SQS queue. 
+ [https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html)
+ [https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_DeleteMessage.html](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_DeleteMessage.html)
+ [https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_GetQueueAttributes.html](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_GetQueueAttributes.html)

## Enrichment and target permissions
<a name="pipes-perms-enhance-target"></a>

To make API calls on the resources that you own, EventBridge Pipes needs appropriate permission. EventBridge Pipes uses the IAM role that you specify on the pipe for enrichment and target calls using the IAM principal `pipes.amazonaws.com`. 

# Creating an Amazon EventBridge pipe
<a name="eb-pipes-create"></a>

EventBridge Pipes enables you to create point-to-point integrations between sources and targets, including advanced event transformations and enrichment. 

To create an EventBridge pipe, you perform the following steps: 

1. [Specifying a source](#pipes-configure-source)

1. [Configuring event filtering (optional)](#pipes-configure-filtering)

1. [Defining event enrichment (optional)](#pipes-define-enrichment)

1. [Configuring a target](#pipes-configure-target)

1. [Configuring the pipe settings](#pipes-configure-pipe-settings)

To quickly set up a sample pipe, see [Getting started: Create an Amazon EventBridge pipe](pipes-get-started.md). This topic uses CloudFormation to deploy a pipe and its associated resources, and walks you through an overview of a pipe's capabilities.

For information on how to create a pipe using the AWS CLI, see [create-pipe](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/pipes/create-pipe.html) in the *AWS CLI Command Reference*.

## Specifying a source
<a name="pipes-configure-source"></a>

To start, specify the source from which you want the pipe to receive events.

**To specify a pipe source by using the console**

1. Open the Amazon EventBridge console at [https://console.aws.amazon.com/events/](https://console.aws.amazon.com/events/).

1. On the navigation pane, choose **Pipes**.

1. Choose **Create pipe**.

1. Enter a name for the pipe.

1. (Optional) Add a description for the pipe.

1. On the **Build pipe** tab, for **Source**, choose the type of source you want to specify for this pipe, and configure the source.

   Configuration properties differ based on the type of source you choose:

------
#### [ Confluent ]

**To configure a Confluent Cloud stream as a source, by using the console**

   1. For **Source**, choose **Confluent Cloud**.

   1. For **Bootstrap servers**, enter the `host:port` pair addresses of your brokers.

   1. For **Topic name**, enter the name of topic that the pipe will read from.

   1. (Optional) For **VPC**, choose the VPC that you want. Then, for **VPC subnets**, choose the desired subnets. For **VPC security groups**, choose the security groups.

   1. For **Authentication - optional**, turn on **Use Authentication** and do the following:

      1. For **Authentication method**, choose the authentication type.

      1. For **Secret key**, choose the secret key.

      For more information, see [Authenticate to Confluent Cloud resources](https://docs.confluent.io/cloud/current/access-management/authenticate/overview.html) in the Confluent documentation.

   1. (Optional) For **Additional setting - optional**, do the following:

      1. For **Starting position**, choose one of the following:
         + **Latest** – Start reading the stream with the most recent record in the shard.
         + **Trim horizon** – Start reading the stream with the last untrimmed record in the shard. This is the oldest record in the shard.

      1. For **Batch size - optional**, enter a maximum number of records for each batch. The default value is 100.

      1. For **Batch window - optional**, enter a maximum number of seconds to gather records before proceeding.

------
#### [ DynamoDB ]

   1. For **Source**, choose **DynamoDB**.

   1. For **DynamoDB stream**, choose the stream you want to use as a source.

   1. For **Starting position**, choose one of the following:
      + **Latest** – Start reading the stream with the most recent record in the shard.
      + **Trim horizon** – Start reading the stream with the last untrimmed record in the shard. This is the oldest record in the shard.

   1. (Optional) For **Additional setting - optional**, do the following:

      1. For **Batch size - optional**, enter a maximum number of records for each batch. The default value is 10.

      1. For **Batch window - optional**, enter a maximum number of seconds to gather records before proceeding.

      1. For **Concurrent batches per shard - optional**, enter the number of batches from the same shard that can be read at the same time.

      1. For **On partial batch item failure**, choose the following:
         + **AUTOMATIC\$1BISECT** – Halve each batch and retry each half until all the records are processed or there is one failed message remaining in the batch.
**Note**  
If you don't choose **AUTOMATIC\$1BISECT**, you can return specific failed records and only those get retried.

------
#### [ Kinesis ]

**To configure a Kinesis source by using the console**

   1. For **Source**, choose **Kinesis**.

   1. For **Kinesis stream**, choose the stream that you want to use as a source.

   1. For **Starting position**, choose one of the following:
      + **Latest** – Start reading the stream with the most recent record in the shard.
      + **Trim horizon** – Start reading the stream with the last untrimmed record in the shard. This is the oldest record in the shard.
      + **At timestamp** – Start reading the stream from a specified time. Under **Timestamp**, enter a data and time using YYYY/MM/DD and hh:mm:ss format.

   1. (Optional) For **Additional setting - optional**, do the following:

      1. For **Batch size - optional**, enter a maximum number of records for each batch. The default value is 10.

      1. (Optional) For **Batch window - optional**, enter a maximum number of seconds to gather records before proceeding.

      1. For **Concurrent batches per shard - optional**, enter the number of batches from the same shard that can be read at the same time.

      1. For **On partial batch item failure**, choose the following:
         + **AUTOMATIC\$1BISECT** – Halve each batch and retry each half until all the records are processed or there is one failed message remaining in the batch.
**Note**  
If you don't choose **AUTOMATIC\$1BISECT**, you can return specific failed records and only those get retried.

------
#### [ Amazon MQ ]

**To configure an Amazon MQ source by using the console**

   1. For **Source**, choose **Amazon MQ**.

   1. For **Amazon MQ broker**, choose the stream you want to use as a source.

   1. For **Queue name**, enter the name of the queue that the pipe will read from.

   1. For **Authentication Method**, choose **BASIC\$1AUTH**.

   1. For **Secret key**, choose the secret key.

   1. (Optional) For **Additional setting - optional**, do the following:

      1. For **Batch size - optional**, enter a maximum number of messages for each batch. The default value is 100.

      1. For **Batch window - optional**, enter a maximum number of seconds to gather records before proceeding.

------
#### [ Amazon MSK ]

**To configure an Amazon MSK source by using the console**

   1. For **Source**, choose **Amazon MSK**.

   1. For **Amazon MSK cluster**, choose the cluster that you want to use.

   1. For **Topic name**, enter the name of topic that the pipe will read from.

   1. (Optional) For **Consumer Group ID - optional**, enter the ID of the consumer group you want the pipe to join.

   1. (Optional) For **Authentication - optional**, turn on **Use Authentication** and do the following:

      1. For **Authentication method**, choose the type you want.

      1. For **Secret key**, choose the secret key.

   1. (Optional) For **Additional setting - optional**, do the following:

      1. For **Batch size - optional**, enter a maximum number of records for each batch. The default value is 100.

      1. For **Batch window - optional**, enter a maximum number of seconds to gather records before proceeding.

      1. For **Starting position**, choose one of the following:
         + **Latest** – Start reading the topic with the most recent record in the shard.
         + **Trim horizon** – Start reading the topic with the last untrimmed record in the shard. This is the oldest record in the shard.
**Note**  
**Trim horizon** is the same as **Earliest** for Apache Kafka.

------
#### [ Self managed Apache Kafka ]

**To configure a self managed Apache Kafka source by using the console**

   1. For **Source**, choose **Self-managed Apache Kafka**.

   1. For **Bootstrap servers**, enter the `host:port` pair addresses of your brokers.

   1. For **Topic name**, enter the name of topic that the pipe will read from.

   1. (Optional) For **VPC**, choose the VPC that you want. Then, for **VPC subnets**, choose the desired subnets. For **VPC security groups**, choose the security groups.

   1. (Optional) For **Authentication - optional**, turn on **Use Authentication** and do the following:

      1. For **Authentication method**, choose the authentication type.

      1. For **Secret key**, choose the secret key.

   1. (Optional) For **Additional setting - optional**, do the following:

      1. For **Starting position**, choose one of the following:
         + **Latest** – Start reading the stream with the most recent record in the shard.
         + **Trim horizon** – Start reading the stream with the last untrimmed record in the shard. This is the oldest record in the shard.

      1. For **Batch size - optional**, enter a maximum number of records for each batch. The default value is 100.

      1. For **Batch window - optional**, enter a maximum number of seconds to gather records before proceeding.

------
#### [ Amazon SQS ]

**To configure an Amazon SQS source by using the console**

   1. For **Source**, choose **SQS**.

   1. For **SQS queue**, choose the queue you want to use.

   1. (Optional) For **Additional setting - optional**, do the following:

      1. For **Batch size - optional**, enter a maximum number of records for each batch. The default value is 100.

      1. For **Batch window - optional**, enter a maximum number of seconds to gather records before proceeding.

------

## Configuring event filtering (optional)
<a name="pipes-configure-filtering"></a>

You can add filtering to your pipe so you’re sending only a subset of events from your source to the target.

**To configure filtering by using the console**

1. Choose **Filtering**.

1. Under **Sample event - optional**, you’ll see a sample event that you can use to build your event pattern, or you can enter your own event by choosing **Enter your own**.

1. Under **Event pattern**, enter the event pattern that you want to use to filter the events. For more information about constructing filters, see [Event filtering in Amazon EventBridge Pipes](eb-pipes-event-filtering.md).

   The following is an example event pattern that only sends events with the value **Seattle** in the **City** field.

   ```
   {
     "data": {
       "City": ["Seattle"]
     }
   }
   ```

Now that events are being filtered, you can add optional enrichment and a target for the pipe.

## Defining event enrichment (optional)
<a name="pipes-define-enrichment"></a>

You can send the event data for enrichment to a Lambda function, AWS Step Functions state machine, Amazon API Gateway, or API destination.

**To select enrichment**

1. Choose **Enrichment**.

1. Under **Details**, for **Service**, select the service and related settings you want to use for enrichment.

You can also transform the data before sending it to be enhanced.

**(Optional) To define the input transformer**

1. Choose **Enrichment Input Transformer - optional**.

1. For **Sample events/Event Payload**, choose the sample event type.

1. For **Transformer**, enter the transformer syntax, such as `"Event happened at <$.detail.field>."` where `<$.detail.field>` is a reference to a field from the sample event. You can also double-click a field from the sample event to add it to the transformer.

1. For **Output**, verify that the output looks like you want it to.

Now that the data has been filtered and enhanced, you must define a target to send the event data to.

## Configuring a target
<a name="pipes-configure-target"></a>

**To configure a target**

1. Choose **Target**.

1. Under **Details**, for **Target service**, choose the target. The fields that display vary depending on the target that you choose. Enter information specific to this target type, as needed.

You can also transform the data before sending it to the target.

**(Optional) To define the input transformer**

1. Choose **Target Input Transformer - optional**.

1. For **Sample events/Event Payload**, choose the sample event type.

1. For **Transformer**, enter the transformer syntax, such as `"Event happened at <$.detail.field>."` where `<$.detail.field>` is a reference to a field from the sample event. You can also double-click a field from the sample event to add it to the transformer.

1. For **Output**, verify that the output looks like you want it to.

Now that the pipe is configured, make sure that its settings are configured correctly.

## Configuring the pipe settings
<a name="pipes-configure-pipe-settings"></a>

A pipe is active by default, but you can deactivate it. You can also specify the permissions of the pipe, set up pipe logging, and add tags.

**To configure the pipe settings**

1. Choose the **Pipe settings** tab.

1. By default, newly created pipes are active as soon as they're created. If you want to create an inactive pipe, under **Activation**, for **Activate pipe**, turn off **Active**.

1. Under **Permissions**, for **Execution role**, do one of the following:

   1. To have EventBridge create a new execution role for this pipe, choose **Create a new role for this specific resource.** Under **Role name**, you can optionally edit the role name.

   1. To use an existing execution role, choose **Use existing role**. Under **Role name**, choose the role.

1. (Optional) If you have specified a Kinesis or DynamoDB stream as the pipe source, you can configure a retry policy and dead-letter queue (DLQ).

   For **Retry policy and Dead-letter queue - optional**, do the following:

   Under **Retry policy**, do the following:

   1. If you want to turn on retry policies, turn on **Retry**. By default, newly created pipes don't have a retry policy turned on. 

   1. For **Maximum age of event**, enter a value between one minute (00:01) and 24 hours (24:00).

   1. For **Retry attempts**, enter a number between 0 and 185.

   1. If you want to use a dead-letter queue (DLQ), turn on **Dead-letter queue**, choose the method of your choice, and choose the queue or topic you'd like to use. By default, newly created pipes don't use a DLQ. 

1. Choose the KMS key for EventBridge to use when encrypting pipe data.

   For more information on how EventBridge uses KMS keys, see [Encryption at rest](eb-data-protection.md#eb-encryption-at-rest).
   + Choose **Use AWS owned key** for EventBridge to encrypt the data using an AWS owned key.

     This AWS owned key is a KMS key that EventBridge owns and manages for use in multiple AWS accounts. In general, unless you are required to audit or control the encryption key that protects your resources, an AWS owned key is a good choice. 

     This is the default.
   + Choose **Use customer managed key** for EventBridge to encrypt the data using the customer managed key that you specify or create.

     Customer managed keys are KMS keys in your AWS account that you create, own, and manage. You have full control over these KMS keys.

     1. Specify an existing customer managed key, or choose **Create a new KMS key**.

       EventBridge displays the key status and any key aliases that have been associated with the specified customer managed key.

1. (Optional) Under **Logs - optional**, you can set up how EventBridge Pipes sends logging information to supported services, including how to configure those logs. 

   For more information about logging pipe records, see [Logging Amazon EventBridge Pipes performance](eb-pipes-logs.md).

   CloudWatch logs is selected as a log destination by default, as is the `ERROR` log level. So, by default, EventBridge Pipes creates a new CloudWatch log group to which it sends log records containing the `ERROR` level of detail.

   To have EventBridge Pipes send log records to any of the supported log destinations, do the following: 

   1. Under **Logs - optional**, choose the destinations to which you want log records delivered.

   1. For **Log level**, choose the level of information for EventBridge to include in log records. The `ERROR` log level is selected by default.

      For more information, see [Specifying EventBridge Pipes log level](eb-pipes-logs.md#eb-pipes-logs-level).

   1. Select **Include execution data** if you want EventBridge to include event payload information and service request and response information in log records.

      For more information, see [Including execution data in EventBridge Pipes logs](eb-pipes-logs.md#eb-pipes-logs-execution-data).

   1. Configure each log destination you selected:

      For CloudWatch Logs logs, under **CloudWatch logs** do the following:
      + For **CloudWatch log group**, choose whether to have EventBridge create a new log group, or you can select an existing log group or specifying the ARN of an existing log group.
      + For new log groups, edit the log group name as desired.

      CloudWatch logs is selected by default.

      For Firehose stream logs, under **Firehose stream log**, select the Firehose stream. 

      For Amazon S3 logs, under **S3 logs** do the following:
      + Enter the name of the bucket to use as the log destination.
      + Enter the AWS account ID of the bucket owner.
      + Enter any prefix text you want used when EventBridge creates S3 objects.

        For more information, see [Organizing objects using prefixes](https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-prefixes.html) in the *Amazon Simple Storage Service User Guide*.
      + Choose how you want EventBridge to format S3 log records:
        + `json`: JSON 
        + `plain`: Plain text
        + `w3c`: [W3C extended logging file format](https://www.w3.org/TR/WD-logfile)

1. (Optional) Under **Tags - optional**, choose **Add new tag** and enter one or more tags for the rule. For more information, see [Tagging resources in Amazon EventBridge](eb-tagging.md).

1. Choose **Create pipe**.

## Validating configuration parameters
<a name="pipes-validation"></a>

After a pipe is created, EventBridge validates the following configuration parameters:
+ **IAM role** – Because the source of a pipe can't be changed after the pipe is created, EventBridge verifies that the provided IAM role can access the source.
**Note**  
EventBridge doesn't perform the same validation for enrichments or targets because they can be updated after the pipe is created.
+ **Batching** – EventBridge validates that the batch size of the source doesn't exceed the maximum batch size of the target. If it does, EventBridge requires a lower batch size. Additionally, if a target doesn't support batching, you can't configure batching in EventBridge for the source.
+ **Enrichments** – EventBridge validates that the batch size for API Gateway and API destination enrichments is 1 because only batch sizes of 1 are supported.

# Starting or stopping an Amazon EventBridge pipe
<a name="pipes-start-stop"></a>

By default, a pipe is `Running` and processes events when it's created.

If you create a pipe with Amazon SQS, Kinesis, or DynamoDB sources, pipe creation can typically take a minute or two.

If you create a pipe with Amazon MSK, self managed Apache Kafka, or Amazon MQ sources, pipes creation can take up to ten minutes.

**To create a pipe without processing events using the console**
+ Turn off the **Activate pipe** setting.

**To create a pipe without processing events programmatically**
+ In your API call, set the `DesiredState` to `Stopped`.

**To start or stop an existing pipe using the console**
+ On the **Pipes settings** tab, under **Activation**, for **Activate pipe**, turn **Active** on or off.

**To start or stop an existing pipe programmatically**
+ In your API call, set the `DesiredState` parameter to either `RUNNING` or `STOPPED`.

There can be a delay between when a pipe is `STOPPED` and when it no longer processes events: 
+ For Amazon SQS and stream sources, this delay is typically less than two minutes.
+ For Amazon MQ and Apache Kafka sources, this delay may be up to fifteen minutes.

# Amazon EventBridge Pipes sources
<a name="eb-pipes-event-source"></a>

EventBridge Pipes receives event data from a variety of sources, applies optional filters and enrichments to that data, and sends it to a destination.

If a source enforces order to the events sent to EventBridge Pipes, that order is maintained throughout the entire process to the destination.

The following AWS services can be specified as sources for EventBridge Pipes:
+ [Amazon DynamoDB stream](eb-pipes-dynamodb.md)
+ [Amazon Kinesis stream](eb-pipes-kinesis.md)
+ [Amazon MQ broker](eb-pipes-mq.md)
+ [Amazon MSK stream ](eb-pipes-msk.md)
+ [Amazon SQS queue](eb-pipes-sqs.md)
+ [Apache Kafka stream](eb-pipes-kafka.md)

  When you specify an Apache Kafka stream as a pipe source, you can specify an Apache Kafka stream that you manage yourself, or one managed by a third-party provider such as:
  + [https://www.confluent.io/](https://www.confluent.io/)
  + [https://www.cloudkarafka.com/](https://www.cloudkarafka.com/)
  + [https://redpanda.com/](https://redpanda.com/)

# Amazon DynamoDB stream as a source for EventBridge Pipes
<a name="eb-pipes-dynamodb"></a>

You can use EventBridge Pipes to receive records in a DynamoDB stream. You can then optionally filter or enhance these records before sending them to a target for processing. There are settings specific to Amazon DynamoDB Streams that you can choose when setting up the pipe. EventBridge Pipes maintains the order of records from the data stream when sending that data to the destination.

**Important**  
Disabling a DynamoDB stream that is the source of a pipe results in that pipe becoming unusable, even if you then re-enable the stream. This happens because:  
You cannot stop, start, or update a pipe whose source is disabled.
You cannot update a pipe with a new source after creation. When you re-enable a DynamoDB stream, that stream is assigned a new Amazon Resource Name (ARN), and is no longer associated with your pipe.
If you do re-enable the DynamoDB stream, you will then need to create a new pipe using the stream's new ARN.

**Example event**

The following sample event shows the information that's received by the pipe. You can use this event to create and filter your event patterns , or to define input transformation. Not all of the fields can be filtered. For more information about which fields you can filter, see [Event filtering in Amazon EventBridge Pipes](eb-pipes-event-filtering.md).

```
[
  {
    "eventID": "1",
    "eventVersion": "1.0",
    "dynamodb": {
      "Keys": {
        "Id": {
          "N": "101"
        }
      },
      "NewImage": {
        "Message": {
          "S": "New item!"
        },
        "Id": {
          "N": "101"
        }
      },
      "StreamViewType": "NEW_AND_OLD_IMAGES",
      "SequenceNumber": "111",
      "SizeBytes": 26
    },
    "awsRegion": "us-west-2",
    "eventName": "INSERT",
    "eventSourceARN": "arn:aws:dynamodb:us-east-1:111122223333:table/EventSourceTable",
    "eventSource": "aws:dynamodb"
  },
  {
    "eventID": "2",
    "eventVersion": "1.0",
    "dynamodb": {
      "OldImage": {
        "Message": {
          "S": "New item!"
        },
        "Id": {
          "N": "101"
        }
      },
      "SequenceNumber": "222",
      "Keys": {
        "Id": {
          "N": "101"
        }
      },
      "SizeBytes": 59,
      "NewImage": {
        "Message": {
          "S": "This item has changed"
        },
        "Id": {
          "N": "101"
        }
      },
      "StreamViewType": "NEW_AND_OLD_IMAGES"
    },
    "awsRegion": "us-west-2",
    "eventName": "MODIFY",
    "eventSourceARN": "arn:aws:dynamodb:us-east-1:111122223333:table/EventSourceTable",
    "eventSource": "aws:dynamodb"
  }
]
```

## Polling and batching streams
<a name="pipes-ddb-polling"></a>

EventBridge polls shards in your DynamoDB stream for records at a base rate of four times per second. When records are available, EventBridge processes the event and waits for the result. If processing succeeds, EventBridge resumes polling until it receives more records.

By default, EventBridge invokes your pipe as soon as records are available. If the batch that EventBridge reads from the source has only one record in it, only one event is processed. To avoid processing a small number of records, you can tell the pipe to buffer records for up to five minutes by configuring a batching window. Before processing the events, EventBridge continues to read records from the source until it has gathered a full batch, the batching window expires, or the batch reaches the payload limit of 6 MB.

**Important**  
The pipe will deliver the stream records from DynamoDB to Amazon SQS at least once. To ensure that no records get dropped, we suggest setting a retry policy with a maximum age shorter than the retention period of the DynamoDB stream. Generally, DynamoDB streams retain events for 24 hours.

You can also increase concurrency by processing multiple batches from each shard in parallel. EventBridge can process up to 10 batches in each shard simultaneously. If you increase the number of concurrent batches per shard, EventBridge still ensures in-order processing at the partition key level.

Configure the `ParallelizationFactor` setting to process one shard of a Kinesis or DynamoDB data stream with more than one Pipe execution simultaneously. You can specify the number of concurrent batches that EventBridge polls from a shard via a parallelization factor from 1 (default) to 10. For example, when you set `ParallelizationFactor` to 2, you can have 200 concurrent EventBridge Pipe executions at maximum to process 100 Kinesis data shards. This helps scale up the processing throughput when the data volume is volatile and the `IteratorAge` is high. Note that parallelization factor will not work if you are using Kinesis aggregation. 

## Polling and stream starting position
<a name="pipes-ddb-stream-start-position"></a>

Be aware that stream source polling during pipe creation and updates is eventually consistent.
+ During pipe creation, it may take several minutes to start polling events from the stream.
+ During pipe updates to the source polling configuration, it may take several minutes to stop and restart polling events from the stream. 

This means that if you specify `LATEST` as the starting position for the stream, the pipe could miss events sent during pipe creation or updates. To ensure no events are missed, specify the stream starting position as `TRIM_HORIZON`.

## Reporting batch item failures
<a name="pipes-ddb-batch-failures"></a>

When EventBridge consumes and processes streaming data from an source, by default it checkpoints to the highest sequence number of a batch, but only when the batch is a complete success. To avoid reprocessing successfully processed messages in a failed batch, you can configure your enrichment or target to return an object indicating which messages succeeded and which failed. This is called a partial batch response.

For more information, see [Partial batch failure](eb-pipes-batching-concurrency.md#pipes-partial-batch-failure).

### Success and failure conditions
<a name="pipes-ddb-batch-failures-conditions"></a>

If you return any of the following, EventBridge treats a batch as a complete success:
+ An empty `batchItemFailure` list
+ A null `batchItemFailure` list
+ An empty `EventResponse`
+ A null `EventResponse`

If you return any of the following, EventBridge treats a batch as a complete failure:
+ An empty string `itemIdentifier`
+ A null `itemIdentifier`
+ An `itemIdentifier` with a bad key name

EventBridge retries failures based on your retry strategy.

# Amazon Kinesis stream as a source for EventBridge Pipes
<a name="eb-pipes-kinesis"></a>

You can use EventBridge Pipes to receive records in a Kinesis data stream. You can then optionally filter or enhance these records before sending them to one of the available destinations for processing. There are settings specific to Kinesis that you can choose when setting up the pipe. EventBridge Pipes maintains the order of records from the data stream when sending that data to the destination.

A Kinesis data stream is a set of [shards](https://docs.aws.amazon.com/kinesis/latest/dev/key-concepts.html#shard). Each shard contains a sequence of data records. A **consumer** is an application that processes the data from a Kinesis data stream. You can map an EventBridge Pipe to a shared-throughput consumer (standard iterator), or to a dedicated-throughput consumer with [enhanced fan-out](https://docs.aws.amazon.com/kinesis/latest/dev/enhanced-consumers.html).

For standard iterators, EventBridge uses the HTTP protocol to poll each shard in your Kinesis stream for records. The pipe shares the read throughput with other consumers of the shard.

To minimize latency and maximize read throughput, you can create a data stream consumer with enhanced fan-out. Stream consumers get a dedicated connection to each shard that doesn't impact other applications reading from the stream. The dedicated throughput can help if you have many applications reading the same data, or if you're reprocessing a stream with large records. Kinesis pushes records to EventBridge over HTTP/2. For information about Kinesis data streams, see [ Reading Data from Amazon Kinesis Data Streams](https://docs.aws.amazon.com/kinesis/latest/dev/building-consumers.html).

**Example event**

The following sample event shows the information that is received by the pipe. You can use this event to create and filter your event patterns, or to define input transformation. Not all of the fields can be filtered. For more information about which fields you can filter, see [Event filtering in Amazon EventBridge Pipes](eb-pipes-event-filtering.md).

```
[
  {
    "kinesisSchemaVersion": "1.0",
    "partitionKey": "1",
    "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
    "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
    "approximateArrivalTimestamp": 1545084650.987,
    "eventSource": "aws:kinesis",
    "eventVersion": "1.0",
    "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
    "eventName": "aws:kinesis:record",
    "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
    "awsRegion": "us-east-2",
    "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
  },
  {
    "kinesisSchemaVersion": "1.0",
    "partitionKey": "1",
    "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
    "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
    "approximateArrivalTimestamp": 1545084711.166,
    "eventSource": "aws:kinesis",
    "eventVersion": "1.0",
    "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
    "eventName": "aws:kinesis:record",
    "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
    "awsRegion": "us-east-2",
    "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
  }
]
```

## Polling and batching streams
<a name="pipes-ak-polling"></a>

EventBridge polls shards in your Kinesis stream for records at a base rate of once per second. When records are available, EventBridge processes the event and waits for the result. If processing succeeds, EventBridge resumes polling until it receives more records.

By default, EventBridge invokes your pipe as soon as records are available. If the batch that EventBridge reads from the source has only one record in it, only one event is processed. To avoid processing a small number of records, you can tell the pipe to buffer records for up to five minutes by configuring a batching window. Before processing the events, EventBridge continues to read records from the source until it has gathered a full batch, the batching window expires, or the batch reaches the payload limit of 6 MB.

You can also increase concurrency by processing multiple batches from each shard in parallel. EventBridge can process up to 10 batches in each shard simultaneously. If you increase the number of concurrent batches per shard, EventBridge still ensures in-order processing at the partition key level.

Configure the `ParallelizationFactor` setting to process one shard of a Kinesis or DynamoDB data stream with more than one Pipe execution simultaneously. You can specify the number of concurrent batches that EventBridge polls from a shard via a parallelization factor from 1 (default) to 10. For example, when you set `ParallelizationFactor` to 2, you can have 200 concurrent EventBridge Pipe executions at maximum to process 100 Kinesis data shards. This helps scale up the processing throughput when the data volume is volatile and the `IteratorAge` is high. Note that parallelization factor will not work if you are using Kinesis aggregation. 

## Polling and stream starting position
<a name="pipes-ak-stream-start-position"></a>

Be aware that stream source polling during pipe creation and updates is eventually consistent.
+ During pipe creation, it may take several minutes to start polling events from the stream.
+ During pipe updates to the source polling configuration, it may take several minutes to stop and restart polling events from the stream. 

This means that if you specify `LATEST` as the starting position for the stream, the pipe could miss events sent during pipe creation or updates. To ensure no events are missed, specify the stream starting position as `TRIM_HORIZON` or `AT_TIMESTAMP`.

## Reporting batch item failures
<a name="pipes-ak-batch-failures"></a>

When EventBridge consumes and processes streaming data from an source, by default it checkpoints to the highest sequence number of a batch, but only when the batch is a complete success. To avoid reprocessing successfully processed messages in a failed batch, you can configure your enrichment or target to return an object indicating which messages succeeded and which failed. This is called a partial batch response.

For more information, see [Partial batch failure](eb-pipes-batching-concurrency.md#pipes-partial-batch-failure).

### Success and failure conditions
<a name="pipes-ak-batch-failures-conditions"></a>

If you return any of the following, EventBridge treats a batch as a complete success:
+ An empty `batchItemFailure` list
+ A null `batchItemFailure` list
+ An empty `EventResponse`
+ A null `EventResponse`

If you return any of the following, EventBridge treats a batch as a complete failure:
+ An empty string `itemIdentifier`
+ A null `itemIdentifier`
+ An `itemIdentifier` with a bad key name

EventBridge retries failures based on your retry strategy.

# Amazon MQ message broker as a source in EventBridge Pipes
<a name="eb-pipes-mq"></a>

You can use EventBridge Pipes to receive records from an Amazon MQ message broker. You can then optionally filter or enhance these records before sending them to one of the available destinations for processing. There are settings specific to Amazon MQ that you can choose when setting up a pipe. EventBridge Pipes maintains the order of the records from the message broker when sending that data to the destination.

Amazon MQ is a managed message broker service for [Apache ActiveMQ](https://activemq.apache.org/) and [RabbitMQ](https://www.rabbitmq.com/). A message broker enables software applications and components to communicate using different programming languages, operating systems, and formal messaging protocols with either topics or queues as event destinations.

Amazon MQ can also manage Amazon Elastic Compute Cloud (Amazon EC2) instances on your behalf by installing ActiveMQ or RabbitMQ brokers. After a broker is installed, it provides different network topologies and other infrastructure needs to your instances.

The Amazon MQ source has the following configuration restrictions:
+ **Cross account** – EventBridge doesn’t support cross-account processing. You can’t use EventBridge to process records from an Amazon MQ message broker that is in a different AWS account.
+ **Authentication** – For ActiveMQ, only the [ ActiveMQ SimpleAuthenticationPlugin](https://activemq.apache.org/security#simple-authentication-plugin) is supported. For RabbitMQ, only the [PLAIN](https://www.rabbitmq.com/access-control.html#mechanisms) authentication mechanism is supported. To manage credentials, use AWS Secrets Manager. For more information about ActiveMQ authentication, see [Integrating ActiveMQ brokers with LDAP](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/security-authentication-authorization.html) in the Amazon MQ Developer Guide.
+ **Connection quota** – Brokers have a maximum number of allowed connections for each wire-level protocol. This quota is based on the broker instance type. For more information, see the [Brokers](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/amazon-mq-limits.html#broker-limits) section of ** \$1Quotas in Amazon MQ\$1** in the Amazon MQ Developer Guide.
+ **Connectivity** – You can create brokers in a public or private virtual private cloud (VPC). For private VPCs, your pipe needs access to the VPC to receive messages.
+ **Event destinations** – Only queue destinations are supported. However, you can use a virtual topic, which behaves as both a topic internally and as a queue externally when it interacts with your pipes. For more information, see [ Virtual Destinations](https://activemq.apache.org/virtual-destinations) on the Apache ActiveMQ website, and [Virtual Hosts](https://www.rabbitmq.com/vhosts.html) on the RabbitMQ website.
+ **Network topology** – For ActiveMQ, only one single-instance or standby broker is supported for pipe. For RabbitMQ, only one single-instance broker or cluster deployment is supported for each pipe. Single-instance brokers require a failover endpoint. For more information about these broker deployment modes, see [Active MQ Broker Architecture](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/amazon-mq-broker-architecture.html) and [Rabbit MQ Broker Architecture](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/rabbitmq-broker-architecture.html) in the Amazon MQ Developer Guide.
+ **Protocols** – Supported protocols depend on the Amazon MQ integration that you use.
  + For ActiveMQ integrations, EventBridge uses the OpenWire/Java Message Service (JMS) protocol to consume messages. Message consumption isn’t supported on any other protocol. EventBridge only supports the `TextMessage` and `BytesMessage` operations within the JMS protocol. For more information about the OpenWire protocol, see [OpenWire](https://activemq.apache.org/openwire.html) on the Apache ActiveMQ website.
  + For RabbitMQ integrations, EventBridge uses the AMQP 0-9-1 protocol to consume messages. No other protocols are supported for consuming messages. For more information about RabbitMQ's implementation of the AMQP 0-9-1 protocol, see [AMQP 0-9-1 Complete Reference Guide](https://www.rabbitmq.com/amqp-0-9-1-reference.html) on the RabbitMQ website.

EventBridge automatically supports the latest versions of ActiveMQ and RabbitMQ that Amazon MQ supports. For the latest supported versions, see [Amazon MQ release notes](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/amazon-mq-release-notes.html) in the Amazon MQ Developer Guide.

**Note**  
By default, Amazon MQ has a weekly maintenance window for brokers. During that window of time, brokers are unavailable. For brokers without standby, EventBridge won’t process messages until the window ends.

**Example events**

The following sample event shows the information that is received by the pipe. You can use this event to create and filter your event patterns, or to define input transformation. Not all of the fields can be filtered. For more information about which fields you can filter, see [Event filtering in Amazon EventBridge Pipes](eb-pipes-event-filtering.md).

**ActiveMQ**

```
[
  {
    "eventSource": "aws:amq",
    "eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8",    
    "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1---mq---us-west-2.amazonaws.com.rproxy.govskope.ca-37557-1234520418293-4:1:1:1:1",
    "messageType": "jms/text-message",
    "data": "QUJDOkFBQUE=",
    "connectionId": "myJMSCoID",
    "redelivered": false,
    "destination": {
      "physicalname": "testQueue"
    },
    "timestamp": 1598827811958,
    "brokerInTime": 1598827811958,
    "brokerOutTime": 1598827811959
  },
  {
    "eventSource": "aws:amq",
    "eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8",        
    "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1---mq---us-west-2.amazonaws.com.rproxy.govskope.ca-37557-1234520418293-4:1:1:1:1",
    "messageType": "jms/bytes-message",
    "data": "3DTOOW7crj51prgVLQaGQ82S48k=",
    "connectionId": "myJMSCoID1",
    "persistent": false,
    "destination": {
      "physicalname": "testQueue"
    },
    "timestamp": 1598827811958,
    "brokerInTime": 1598827811958,
    "brokerOutTime": 1598827811959
  }
]
```

**RabbitMQ**

```
[
  {
    "eventSource": "aws:rmq",
    "eventSourceArn": "arn:aws:mq:us-west-2:111122223333:broker:pizzaBroker:b-9bcfa592-423a-4942-879d-eb284b418fc8",
    "eventSourceKey": "pizzaQueue::/",
    "basicProperties": {
      "contentType": "text/plain",
      "contentEncoding": null,
      "headers": {
        "header1": {
          "bytes": [
            118,
            97,
            108,
            117,
            101,
            49
          ]
        },
        "header2": {
          "bytes": [
            118,
            97,
            108,
            117,
            101,
            50
          ]
        },
        "numberInHeader": 10
      },
      "deliveryMode": 1,
      "priority": 34,
      "correlationId": null,
      "replyTo": null,
      "expiration": "60000",
      "messageId": null,
      "timestamp": "Jan 1, 1970, 12:33:41 AM",
      "type": null,
      "userId": "AIDACKCEVSQ6C2EXAMPLE",
      "appId": null,
      "clusterId": null,
      "bodySize": 80
    },
    "redelivered": false,
    "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ=="
  }
]
```

## Consumer group
<a name="pipes-mq-consumer-group"></a>

To interact with Amazon MQ, EventBridge creates a consumer group that can read from your Amazon MQ brokers. The consumer group is created with the same ID as the pipe UUID.

For Amazon MQ sources, EventBridge batches records together and sends them to your function in a single payload. To control behavior, you can configure the batching window and batch size. EventBridge pulls messages until one of the following occurs:
+ The processed records reach the payload size maximum of 6 MB.
+ The batching window expires.
+ The number of records reaches the full batch size. 

EventBridge converts your batch into a single payload and then invokes your function. Messages aren't persisted or deserialized. Instead, the consumer group retrieves them as a BLOB of bytes. It then base64-encodes them into a JSON payload. If the pipe returns an error for any of the messages in a batch, EventBridge retries the entire batch of messages until processing succeeds or the messages expire.

## Network configuration
<a name="pipes-mq-vpc-config"></a>

By default, Amazon MQ brokers are created with the `PubliclyAccessible` flag set to false. It's only when `PubliclyAccessible` is set to true that the broker receives a public IP address. For full access with your pipe, your broker must either use a public endpoint or provide access to the VPC.

If your Amazon MQ broker isn't publicly accessible, EventBridge must have access to the Amazon Virtual Private Cloud (Amazon VPC) resources associated with your broker.
+ To access the VPC of your Amazon MQ brokers, EventBridge can use outbound internet access for the subnets of your source. For public subnets this must be a managed [NAT gateway](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html). For private subnets it can be a NAT gateway, or your own NAT. Ensure that the NAT has a public IP address and can connect to the internet.
+ EventBridge Pipes also supports event delivery through [AWS PrivateLink](https://aws.amazon.com/privatelink/), allowing you to send events from an event source located in an Amazon Virtual Private Cloud (Amazon VPC) to a Pipes target without traversing the public internet. You can use Pipes to poll from Amazon Managed Streaming for Apache Kafka (Amazon MSK), self-managed Apache Kafka, and Amazon MQ sources residing in a private subnet without the need to deploy an internet gateway, configure firewall rules, or set up proxy servers.

  To set up a VPC endpoint, see [Create a VPC endpoint](https://docs.aws.amazon.com/vpc/latest/privatelink/create-interface-endpoint.html#create-interface-endpoint-aws) in the *AWS PrivateLink User Guide*. For service name, select `com.amazonaws.region.pipes-data`.

Configure your Amazon VPC security groups with the following rules (at minimum):
+ Inbound rules – Allow all traffic on the Amazon MQ broker port for the security groups specified for your source.
+ Outbound rules – Allow all traffic on port 443 for all destinations. Allow all traffic on the Amazon MQ broker port for the security groups specified for your source.

  Broker ports include:
  + 9092 for plaintext
  + 9094 for TLS
  + 9096 for SASL
  + 9098 for IAM

**Note**  
Your Amazon VPC configuration is discoverable through the [Amazon MQ API](https://docs.aws.amazon.com/amazon-mq/latest/api-reference/resources.html). You don't need to configure it during setup.

# Amazon Managed Streaming for Apache Kafka topic as a source in EventBridge Pipes
<a name="eb-pipes-msk"></a>

You can use EventBridge Pipes to receive records from an [Amazon Managed Streaming for Apache Kafka (Amazon MSK)](https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html) topic, including from the [Amazon MSK Serverless](https://aws.amazon.com/msk/features/msk-serverless/) cluster type. You can optionally filter or enhance these records before sending them to one of the available destinations for processing. There are settings specific to Amazon MSK that you can choose when setting up a pipe. EventBridge Pipes maintains the order of the records from the message broker when sending that data to the destination.

Amazon MSK is a fully managed service that you can use to build and run applications that use Apache Kafka to process streaming data. Amazon MSK simplifies the setup, scaling, and management of clusters running Apache Kafka. With Amazon MSK, you can configure your application for multiple Availability Zones and for security with AWS Identity and Access Management (IAM). Amazon MSK supports multiple open-source versions of Kafka.

Amazon MSK as an source operates similarly to using Amazon Simple Queue Service (Amazon SQS) or Amazon Kinesis. EventBridge internally polls for new messages from the source and then synchronously invokes the target. EventBridge reads the messages in batches and provides these to your function as an event payload. The maximum batch size is configurable. (The default is 100 messages.)

For Apache Kafka-based sources, EventBridge supports processing control parameters, such as batching windows and batch size.

EventBridge reads the messages sequentially for each partition. After EventBridge processes each batch, it commits the offsets of the messages in that batch. If the pipe's target returns an error for any of the messages in a batch, EventBridge retries the entire batch of messages until processing succeeds or the messages expire.

EventBridge sends the batch of messages in the event when it invokes the target. The event payload contains an array of messages. Each array item contains details of the Amazon MSK topic and partition identifier, together with a timestamp and a base64-encoded message.

**Example events**

The following sample event shows the information that is received by the pipe. You can use this event to create and filter your event patterns, or for to define input transformation. Not all of the fields can be filtered. For more information about which fields you can filter, see [Event filtering in Amazon EventBridge Pipes](eb-pipes-event-filtering.md).

```
[
  {
    "eventSource": "aws:kafka",
    "eventSourceArn": "arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2",    
    "eventSourceKey": "mytopic-0",
    "topic": "mytopic",
    "partition": "0",
    "offset": 15,
    "timestamp": 1545084650987,
    "timestampType": "CREATE_TIME",
    "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", 
    "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
    "headers": [
      {
        "headerKey": [
          104,
          101,
          97,
          100,
          101,
          114,
          86,
          97,
          108,
          117,
          101
        ]
      }
    ]
  }
]
```

## Polling and stream starting position
<a name="pipes-msk-stream-start-position"></a>

Be aware that stream source polling during pipe creation and updates is eventually consistent.
+ During pipe creation, it may take several minutes to start polling events from the stream.
+ During pipe updates to the source polling configuration, it may take several minutes to stop and restart polling events from the stream. 

This means that if you specify `LATEST` as the starting position for the stream, the pipe could miss events sent during pipe creation or updates. To ensure no events are missed, specify the stream starting position as `TRIM_HORIZON`.

## MSK cluster authentication
<a name="pipes-msk-cluster-permissions"></a>

EventBridge needs permission to access the Amazon MSK cluster, retrieve records, and perform other tasks. Amazon MSK supports several options for controlling client access to the MSK cluster. For more information about which authentication method is used when, see [How EventBridge chooses a bootstrap broker](#pipes-msk-bootstrap-brokers).

**Topics**
+ [Unauthenticated access](#pipes-msk-permissions-none)
+ [SASL/SCRAM authentication](#pipes-msk-permissions-add-secret)
+ [IAM role-based authentication](#pipes-msk-permissions-iam-policy)
+ [Mutual TLS authentication](#pipes-msk-permissions-mTLS)
+ [Configuring the mTLS secret](#smaa-auth-secret)
+ [How EventBridge chooses a bootstrap broker](#pipes-msk-bootstrap-brokers)

### Unauthenticated access
<a name="pipes-msk-permissions-none"></a>

We recommend only using unauthenticated access for development. Unauthenticated access will only work if IAM role-based authentication is disabled for the cluster.

### SASL/SCRAM authentication
<a name="pipes-msk-permissions-add-secret"></a>

Amazon MSK supports Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) authentication with Transport Layer Security (TLS) encryption. For EventBridge to connect to the cluster, you store the authentication credentials (sign-in credentials) in an AWS Secrets Manager secret.

For more information about using Secrets Manager, see [User name and password authentication with AWS Secrets Manager](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html) in the *Amazon Managed Streaming for Apache Kafka Developer Guide*.

Amazon MSK doesn't support SASL/PLAIN authentication.

### IAM role-based authentication
<a name="pipes-msk-permissions-iam-policy"></a>

You can use IAM to authenticate the identity of clients that connect to the MSK cluster. If IAM authentication is active on your MSK cluster, and you don't provide a secret for authentication, EventBridge automatically defaults to using IAM authentication. To create and deploy IAM user or role-based policies, use the IAM console or API. For more information, see [IAM access control](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html) in the *Amazon Managed Streaming for Apache Kafka Developer Guide*.

To allow EventBridge to connect to the MSK cluster, read records, and perform other required actions, add the following permissions to your pipes's execution role.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:DescribeGroup",
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeTopic",
                "kafka-cluster:ReadData",
                "kafka-cluster:DescribeClusterDynamicConfiguration"
            ],
            "Resource": [
            "arn:aws:kafka:us-east-1:123456789012:cluster/cluster-name/cluster-uuid",
    "arn:aws:kafka:us-east-1:123456789012:topic/cluster-name/cluster-uuid/topic-name",
    "arn:aws:kafka:us-east-1:123456789012:group/cluster-name/cluster-uuid/consumer-group-id"
            ]
        }
    ]
}
```

------

You can scope these permissions to a specific cluster, topic, and group. For more information, see the [Amazon MSK Kafka actions](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html#kafka-actions) in the *Amazon Managed Streaming for Apache Kafka Developer Guide*.

### Mutual TLS authentication
<a name="pipes-msk-permissions-mTLS"></a>

Mutual TLS (mTLS) provides two-way authentication between the client and server. The client sends a certificate to the server for the server to verify the client, and the server sends a certificate to the client for the client to verify the server. 

For Amazon MSK, EventBridge acts as the client. You configure a client certificate (as a secret in Secrets Manager) to authenticate EventBridge with the brokers in your MSK cluster. The client certificate must be signed by a certificate authority (CA) in the server's trust store. The MSK cluster sends a server certificate to EventBridge to authenticate the brokers with EventBridge. The server certificate must be signed by a CA that's in the AWS trust store. 

Amazon MSK doesn't support self-signed server certificates, because all brokers in Amazon MSK use [public certificates](https://docs.aws.amazon.com/msk/latest/developerguide/msk-encryption.html) signed by [Amazon Trust Services CAs](https://www.amazontrust.com/repository/), which EventBridge trusts by default.



For more information about mTLS for Amazon MSK, see [Mutual TLS Authentication](https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html) in the *Amazon Managed Streaming for Apache Kafka Developer Guide*.

### Configuring the mTLS secret
<a name="smaa-auth-secret"></a>

The CLIENT\$1CERTIFICATE\$1TLS\$1AUTH secret requires a certificate field and a private key field. For an encrypted private key, the secret requires a private key password. Both the certificate and private key must be in PEM format.

**Note**  
EventBridge supports the [PBES1](https://datatracker.ietf.org/doc/html/rfc2898/#section-6.1) (but not PBES2) private key encryption algorithms.

The certificate field must contain a list of certificates, beginning with the client certificate, followed by any intermediate certificates, and ending with the root certificate. Each certificate must start on a new line with the following structure:

```
-----BEGIN CERTIFICATE-----  
        <certificate contents>
-----END CERTIFICATE-----
```

Secrets Manager supports secrets up to 65,536 bytes, which is enough space for long certificate chains.

The private key must be in [PKCS \$18](https://datatracker.ietf.org/doc/html/rfc5208) format, with the following structure:

```
-----BEGIN PRIVATE KEY-----  
         <private key contents>
-----END PRIVATE KEY-----
```

For an encrypted private key, use the following structure:

```
-----BEGIN ENCRYPTED PRIVATE KEY-----  
          <private key contents>
-----END ENCRYPTED PRIVATE KEY-----
```

The following example shows the contents of a secret for mTLS authentication using an encrypted private key. For an encrypted private key, you include the private key password in the secret.

```
{
 "privateKeyPassword": "testpassword",
 "certificate": "-----BEGIN CERTIFICATE-----
MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw
...
j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk
cmUuiAii9R0=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb
...
rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no
c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==
-----END CERTIFICATE-----",
 "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp
...
QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ
zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==
-----END ENCRYPTED PRIVATE KEY-----"
}
```

### How EventBridge chooses a bootstrap broker
<a name="pipes-msk-bootstrap-brokers"></a>

EventBridge chooses a [ bootstrap broker](https://docs.aws.amazon.com/msk/latest/developerguide/msk-get-bootstrap-brokers.html) based on the authentication methods available on your cluster, and whether you provide a secret for authentication. If you provide a secret for mTLS or SASL/SCRAM, EventBridge automatically chooses that authentication method. If you don't provide a secret, EventBridge chooses the strongest authentication method that's active on your cluster. The following is the order of priority in which EventBridge selects a broker, from strongest to weakest authentication:
+ mTLS (secret provided for mTLS)
+ SASL/SCRAM (secret provided for SASL/SCRAM)
+ SASL IAM (no secret provided, and IAM authentication is active)
+ Unauthenticated TLS (no secret provided, and IAM authentication is not active)
+ Plaintext (no secret provided, and both IAM authentication and unauthenticated TLS are not active)

**Note**  
If EventBridge can't connect to the most secure broker type, it doesn't attempt to connect to a different (weaker) broker type. If you want EventBridge to choose a weaker broker type, deactivate all stronger authentication methods on your cluster.

## Network configuration
<a name="pipes-msk-vpc-config"></a>

EventBridge must have access to the Amazon Virtual Private Cloud (Amazon VPC) resources associated with your Amazon MSK cluster.
+ To access the VPC of your Amazon MSK cluster, EventBridge can use outbound internet access for the subnets of your source. For private subnets it can be a NAT gateway, or your own NAT. Ensure that the NAT has a public IP address and can connect to the internet. For public subnets you must use VPC Endpoints (explained below).
+ EventBridge Pipes also supports event delivery through [AWS PrivateLink](https://aws.amazon.com/privatelink/), allowing you to send events from an event source located in an Amazon Virtual Private Cloud (Amazon VPC) to a Pipes target without traversing the public internet. You can use Pipes to poll from Amazon Managed Streaming for Apache Kafka (Amazon MSK), self-managed Apache Kafka, and Amazon MQ sources residing in a private subnet without the need to deploy an internet gateway, configure firewall rules, or set up proxy servers. You can also use VPC Endpoints to support delivery from Kafka clusters in public subnets.

  To set up a VPC endpoint, see [Create a VPC endpoint](https://docs.aws.amazon.com/vpc/latest/privatelink/create-interface-endpoint.html#create-interface-endpoint-aws) in the *AWS PrivateLink User Guide*. For service name, select `com.amazonaws.region.pipes-data`.

Configure your Amazon VPC security groups with the following rules (at minimum):
+ Inbound rules – Allow all traffic on the Amazon MSK broker port for the security groups specified for your source.
+ Outbound rules – Allow all traffic on port 443 for all destinations. Allow all traffic on the Amazon MSK broker port for the security groups specified for your source.

  Broker ports include:
  + 9092 for plaintext
  + 9094 for TLS
  + 9096 for SASL
  + 9098 for IAM

**Note**  
Your Amazon VPC configuration is discoverable through the [Amazon MSK API](https://docs.aws.amazon.com/msk/1.0/apireference/resources.html). You don't need to configure it during setup.

## Customizable consumer group ID
<a name="pipes-msk-consumer-group"></a>

When setting up Apache Kafka as an source, you can specify a consumer group ID. This consumer group ID is an existing identifier for the Apache Kafka consumer group that you want your pipe to join. You can use this feature to migrate any ongoing Apache Kafka record processing setups from other consumers to EventBridge.

If you specify a consumer group ID and there are other active pollers within that consumer group, Apache Kafka distributes messages across all consumers. In other words, EventBridge doesn't receive all messages for the Apache Kafka topic. If you want EventBridge to handle all messages in the topic, turn off any other pollers in that consumer group.

Additionally, if you specify a consumer group ID, and Apache Kafka finds a valid existing consumer group with the same ID, EventBridge ignores the `StartingPosition` parameter for your pipe. Instead, EventBridge begins processing records according to the committed offset of the consumer group. If you specify a consumer group ID, and Apache Kafka can't find an existing consumer group, then EventBridge configures your source with the specified `StartingPosition`.

The consumer group ID that you specify must be unique among all your Apache Kafka event sources. After creating a pipe with the consumer group ID specified, you can't update this value.

## Auto scaling of the Amazon MSK source
<a name="pipes-msk-ops-scaling"></a>

When you initially create an Amazon MSK source, EventBridge allocates one consumer to process all partitions in the Apache Kafka topic. Each consumer has multiple processors running in parallel to handle increased workloads. Additionally, EventBridge automatically scales up or down the number of consumers, based on workload. To preserve message ordering in each partition, the maximum number of consumers is one consumer per partition in the topic.

In one-minute intervals, EventBridge evaluates the consumer offset lag of all the partitions in the topic. If the lag is too high, the partition is receiving messages faster than EventBridge can process them. If necessary, EventBridge adds or removes consumers from the topic. The scaling process of adding or removing consumers occurs within three minutes of evaluation.

If your target is overloaded, EventBridge reduces the number of consumers. This action reduces the workload on the pipe by reducing the number of messages that consumers can retrieve and send to the pipe.

# Apache Kafka streams as a source in EventBridge Pipes
<a name="eb-pipes-kafka"></a>

Apache Kafka is an open-source event streaming platform that supports workloads such as data pipelines and streaming analytics. You can use [Amazon Managed Streaming for Apache Kafka](eb-pipes-msk.md) (Amazon MSK), or a self managed Apache Kafka cluster. In AWS terminology, a *self managed* cluster refers to any Apache Kafka cluster not hosted by AWS. This includes both clusters you manage yourself, as well as those hosted by a third-party provider, such as [https://www.confluent.io/](https://www.confluent.io/), [https://www.cloudkarafka.com/](https://www.cloudkarafka.com/), or [https://redpanda.com/](https://redpanda.com/).

For more information on other AWS hosting options for your cluster, see [Best Practices for Running Apache Kafka on AWS](https://aws.amazon.com/blogs/big-data/best-practices-for-running-apache-kafka-on-aws/) on the *AWS Big Data Blog*.

Apache Kafka as a source operates similarly to using Amazon Simple Queue Service (Amazon SQS) or Amazon Kinesis. EventBridge internally polls for new messages from the source and then synchronously invokes the target. EventBridge reads the messages in batches and provides these to your function as an event payload. The maximum batch size is configurable. (The default is 100 messages.)

For Apache Kafka-based sources, EventBridge supports processing control parameters, such as batching windows and batch size.

EventBridge sends the batch of messages in the event parameter when it invokes your pipe. The event payload contains an array of messages. Each array item contains details of the Apache Kafka topic and Apache Kafka partition identifier, together with a timestamp and a base64-encoded message.

**Example events**

The following sample event shows the information that is received by the pipe. You can use this event to create and filter your event patterns, or to define input transformation. Not all of the fields can be filtered. For more information about which fields you can filter, see [Event filtering in Amazon EventBridge Pipes](eb-pipes-event-filtering.md).

```
[
  {
    "eventSource": "SelfManagedKafka",
    "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
    "eventSourceKey": "mytopic-0",
    "topic": "mytopic",
    "partition": 0,
    "offset": 15,
    "timestamp": 1545084650987,
    "timestampType": "CREATE_TIME",
    "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", 
    "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
    "headers": [
      {
        "headerKey": [
          104,
          101,
          97,
          100,
          101,
          114,
          86,
          97,
          108,
          117,
          101
        ]
      }
    ]
  }
]
```

## Apache Kafka cluster authentication
<a name="pipes-smaa-authentication"></a>

EventBridge Pipes supports several methods to authenticate with your self managed Apache Kafka cluster. Make sure that you configure the Apache Kafka cluster to use one of these supported authentication methods. For more information about Apache Kafka security, see the [Security](http://kafka.apache.org/documentation.html#security) section of the Apache Kafka documentation.

### VPC access
<a name="pipes-smaa-auth-vpc"></a>

If you are using a self managed Apache Kafka environment where only Apache Kafka users within your VPC have access to your Apache Kafka brokers, you must configure the Amazon Virtual Private Cloud (Amazon VPC) in the Apache Kafka source. 

### SASL/SCRAM authentication
<a name="pipes-smaa-auth-sasl"></a>

EventBridge Pipes supports Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) authentication with Transport Layer Security (TLS) encryption. EventBridge Pipes sends the encrypted credentials to authenticate with the cluster. For more information about SASL/SCRAM authentication, see [RFC 5802](https://tools.ietf.org/html/rfc5802).

EventBridge Pipes supports SASL/PLAIN authentication with TLS encryption. With SASL/PLAIN authentication, EventBridge Pipes sends credentials as clear text (unencrypted) to the server.

For SASL authentication, you store the sign-in credentials as a secret in AWS Secrets Manager.

### Mutual TLS authentication
<a name="pipes-smaa-auth-mtls"></a>

Mutual TLS (mTLS) provides two-way authentication between the client and server. The client sends a certificate to the server for the server to verify the client, and the server sends a certificate to the client for the client to verify the server. 

In self managed Apache Kafka, EventBridge Pipes acts as the client. You configure a client certificate (as a secret in Secrets Manager) to authenticate EventBridge Pipes with your Apache Kafka brokers. The client certificate must be signed by a certificate authority (CA) in the server's trust store.

The Apache Kafka cluster sends a server certificate to EventBridge Pipes to authenticate the Apache Kafka brokers with EventBridge Pipes. The server certificate can be a public CA certificate or a private CA/self-signed certificate. The public CA certificate must be signed by a CA that's in the EventBridge Pipes trust store. For a private CA/self-signed certificate, you configure the server root CA certificate (as a secret in Secrets Manager). EventBridge Pipes uses the root certificate to verify the Apache Kafka brokers.

For more information about mTLS, see [ Introducing mutual TLS authentication for Amazon MSK as an source](https://aws.amazon.com/blogs/compute/introducing-mutual-tls-authentication-for-amazon-msk-as-an-event-source).

### Configuring the client certificate secret
<a name="pipes-smaa-auth-secret"></a>

The CLIENT\$1CERTIFICATE\$1TLS\$1AUTH secret requires a certificate field and a private key field. For an encrypted private key, the secret requires a private key password. Both the certificate and private key must be in PEM format.

**Note**  
EventBridge Pipes supports the [PBES1](https://datatracker.ietf.org/doc/html/rfc2898/#section-6.1) (but not PBES2) private key encryption algorithms.

The certificate field must contain a list of certificates, beginning with the client certificate, followed by any intermediate certificates, and ending with the root certificate. Each certificate must start on a new line with the following structure:

```
-----BEGIN CERTIFICATE-----  
        <certificate contents>
-----END CERTIFICATE-----
```

Secrets Manager supports secrets up to 65,536 bytes, which is enough space for long certificate chains.

The private key must be in [PKCS \$18](https://datatracker.ietf.org/doc/html/rfc5208) format, with the following structure:

```
-----BEGIN PRIVATE KEY-----  
         <private key contents>
-----END PRIVATE KEY-----
```

For an encrypted private key, use the following structure:

```
-----BEGIN ENCRYPTED PRIVATE KEY-----  
          <private key contents>
-----END ENCRYPTED PRIVATE KEY-----
```

The following example shows the contents of a secret for mTLS authentication using an encrypted private key. For an encrypted private key, include the private key password in the secret.

```
{
 "privateKeyPassword": "testpassword",
 "certificate": "-----BEGIN CERTIFICATE-----
MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw
...
j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk
cmUuiAii9R0=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb
...
rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no
c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==
-----END CERTIFICATE-----",
 "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp
...
QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ
zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==
-----END ENCRYPTED PRIVATE KEY-----"
}
```

### Configuring the server root CA certificate secret
<a name="pipes-smaa-auth-ca-cert"></a>

You create this secret if your Apache Kafka brokers use TLS encryption with certificates signed by a private CA. You can use TLS encryption for VPC, SASL/SCRAM, SASL/PLAIN, or mTLS authentication.

The server root CA certificate secret requires a field that contains the Apache Kafka broker's root CA certificate in PEM format. The following example shows the structure of the secret.

```
{
     "certificate": "-----BEGIN CERTIFICATE-----       
  MIID7zCCAtegAwIBAgIBADANBgkqhkiG9w0BAQsFADCBmDELMAkGA1UEBhMCVVMx
  EDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNjb3R0c2RhbGUxJTAjBgNVBAoT
  HFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xOzA5BgNVBAMTMlN0YXJmaWVs
  ZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1dG...
  -----END CERTIFICATE-----"
```

## Network configuration
<a name="pipes-kafka-vpc-config"></a>

If you are using a self managed Apache Kafka environment that uses private VPC connectivity, EventBridge must have access to the Amazon Virtual Private Cloud (Amazon VPC) resources associated with your Apache Kafka brokers. 
+ To access the VPC of your Apache Kafka cluster, EventBridge can use outbound internet access for the subnets of your source. For private subnets it can be a NAT gateway, or your own NAT. Ensure that the NAT has a public IP address and can connect to the internet. For public subnets you must use VPC Endpoints (explained below).
+ EventBridge Pipes also supports event delivery through [AWS PrivateLink](https://aws.amazon.com/privatelink/), allowing you to send events from an event source located in an Amazon Virtual Private Cloud (Amazon VPC) to a Pipes target without traversing the public internet. You can use Pipes to poll from Amazon Managed Streaming for Apache Kafka (Amazon MSK), self-managed Apache Kafka, and Amazon MQ sources residing in a private subnet without the need to deploy an internet gateway, configure firewall rules, or set up proxy servers. You can also use VPC Endpoints to support delivery from Kafka clusters in public subnets.

  To set up a VPC endpoint, see [Create a VPC endpoint](https://docs.aws.amazon.com/vpc/latest/privatelink/create-interface-endpoint.html#create-interface-endpoint-aws) in the *AWS PrivateLink User Guide*. For service name, select `com.amazonaws.region.pipes-data`.

Configure your Amazon VPC security groups with the following rules (at minimum):
+ Inbound rules – Allow all traffic on the Apache Kafka broker port for the security groups specified for your source.
+ Outbound rules – Allow all traffic on port 443 for all destinations. Allow all traffic on the Apache Kafka broker port for the security groups specified for your source.

  Broker ports include:
  + 9092 for plaintext
  + 9094 for TLS
  + 9096 for SASL
  + 9098 for IAM

## Consumer auto scaling with Apache Kafka sources
<a name="pipes-kafka-ops-scaling"></a>

When you initially create an Apache Kafka source, EventBridge allocates one consumer to process all partitions in the Kafka topic. Each consumer has multiple processors running in parallel to handle increased workloads. Additionally, EventBridge automatically scales up or down the number of consumers, based on workload. To preserve message ordering in each partition, the maximum number of consumers is one consumer per partition in the topic.

In one-minute intervals, EventBridge evaluates the consumer offset lag of all the partitions in the topic. If the lag is too high, the partition is receiving messages faster than EventBridge can process them. If necessary, EventBridge adds or removes consumers from the topic. The scaling process of adding or removing consumers occurs within three minutes of evaluation.

If your target is overloaded, EventBridge reduces the number of consumers. This action reduces the workload on the function by reducing the number of messages that consumers can retrieve and send to the function.

# Amazon Simple Queue Service as a source in EventBridge Pipes
<a name="eb-pipes-sqs"></a>

You can use EventBridge Pipes to receive records from an Amazon SQS queue. You can then optionally filter or enhance these records before sending them to an available destination for processing.

You can use a pipe to process messages in an Amazon Simple Queue Service (Amazon SQS) queue. EventBridge Pipes support [standard queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html) and [first-in, first-out (FIFO) queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html). With Amazon SQS, you can offload tasks from one component of your application by sending them to a queue and processing them asynchronously.

EventBridge polls the queue and invokes your pipe synchronously with an event that contains queue messages. EventBridge reads messages in batches and invokes your pipe once for each batch. When your pipe successfully processes a batch, EventBridge deletes its messages from the queue.

By default, EventBridge polls up to 10 messages in your queue simultaneously and sends that batch to your pipe. To avoid invoking the pipe with a small number of records, you can tell the event source to buffer records for up to five minutes by configuring a batch window. Before invoking the pipe, EventBridge continues to poll messages from the Amazon SQS standard queue until one of these things occurs:
+ The batch window expires.
+ The invocation payload size quota is reached.
+ The configured maximum batch size is reached.

**Note**  
If you're using a batch window and your Amazon SQS queue contains low traffic, EventBridge might wait for up to 20 seconds before invoking your pipe. This is true even if you set a batch window for fewer than 20 seconds. For FIFO queues, records contain additional attributes that are related to deduplication and sequencing.

When EventBridge reads a batch, the messages stay in the queue but are hidden for the length of the queue's [visibility timeout](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html). If your pipe successfully processes the batch, EventBridge deletes the messages from the queue. By default, if your pipe encounters an error while processing a batch, all messages in that batch become visible in the queue again. For this reason, your pipe code must be able to process the same message multiple times without unintended side effects. You can modify this reprocessing behavior by including batch item failures in your pipe response. The following example shows an event for a batch of two messages.

**Example events**

The following sample event shows the information that is received by the pipe. You can use this event to create and filter your event patterns, or to define input transformation. Not all of the fields can be filtered. For more information about which fields you can filter, see [Event filtering in Amazon EventBridge Pipes](eb-pipes-event-filtering.md).

**Standard queue**

```
[
  {
    "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
    "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
    "body": "Test message.",
    "attributes": {
      "ApproximateReceiveCount": "1",
      "SentTimestamp": "1545082649183",
      "SenderId": "AIDAIENQZJOLO23YVJ4VO",
      "ApproximateFirstReceiveTimestamp": "1545082649185"
    },
    "messageAttributes": {},
    "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
    "eventSource": "aws:sqs",
    "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
    "awsRegion": "us-east-2"
  },
  {
    "messageId": "2e1424d4-f796-459a-8184-9c92662be6da",
    "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
    "body": "Test message.",
    "attributes": {
      "ApproximateReceiveCount": "1",
      "SentTimestamp": "1545082650636",
      "SenderId": "AIDAIENQZJOLO23YVJ4VO",
      "ApproximateFirstReceiveTimestamp": "1545082650649"
    },
    "messageAttributes": {},
    "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
    "eventSource": "aws:sqs",
    "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
    "awsRegion": "us-east-2"
  }
]
```

**FIFO queue**

```
[
  {
    "messageId": "11d6ee51-4cc7-4302-9e22-7cd8afdaadf5",
    "receiptHandle": "AQEBBX8nesZEXmkhsmZeyIE8iQAMig7qw...",
    "body": "Test message.",
    "attributes": {
      "ApproximateReceiveCount": "1",
      "SentTimestamp": "1573251510774",
      "SequenceNumber": "18849496460467696128",
      "MessageGroupId": "1",
      "SenderId": "AIDAIO23YVJENQZJOL4VO",
      "MessageDeduplicationId": "1",
      "ApproximateFirstReceiveTimestamp": "1573251510774"
    },
    "messageAttributes": {},
    "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
    "eventSource": "aws:sqs",
    "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:fifo.fifo",
    "awsRegion": "us-east-2"
  }
]
```

## Scaling and processing
<a name="pipes-sqs-scaling"></a>

For standard queues, EventBridge uses [long polling](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html) to poll a queue until it becomes active. When messages are available, EventBridge reads up to five batches and sends them to your pipe. If messages are still available, EventBridge increases the number of processes that are reading batches by up to 300 more instances per minute. The maximum number of batches that a pipe can process simultaneously is 1,000.

For FIFO queues, EventBridge sends messages to your pipe in the order that it receives them. When you send a message to a FIFO queue, you specify a [message group ID](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html). Amazon SQS facilitates delivering messages in the same group to EventBridge, in order. EventBridge sorts the received messages into groups and sends only one batch at a time for a group. If your pipe returns an error, the pipe attempts all retries on the affected messages before EventBridge receives additional messages from the same group.

## Configuring a queue to use with EventBridge Pipes
<a name="pipes-sqs-configure-queue"></a>

[Create an Amazon SQS queue](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configure-create-queue.html) to serve as an source for your pipe. Then configure the queue to allow time for your pipe to process each batch of events—and for EventBridge to retry in response to throttling errors as it scales up.

To allow your pipe time to process each batch of records, set the source queue's visibility timeout to at least six times the combined runtime of the pipe enrichment and target components. The extra time allows for EventBridge to retry if your pipe is throttled while processing a previous batch.

If your pipe fails to process a message multiple times, Amazon SQS can send it to a [dead-letter queue](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html). When your pipe returns an error, EventBridge keeps it in the queue. After the visibility timeout occurs, EventBridge receives the message again. To send messages to a second queue after a number of receives, configure a dead-letter queue on your source queue.

**Note**  
Make sure that you configure the dead-letter queue on the source queue, not on the pipe. The dead-letter queue that you configure on a pipe is used for the pipe's asynchronous invocation queue, not for source queues.

If your pipe returns an error, or can't be invoked because it's at maximum concurrency, processing might succeed with additional attempts. To give messages more chances to be processed before sending them to the dead-letter queue, set the `maxReceiveCount` on the source queue's redrive policy to at least **5**.

## Reporting batch item failures
<a name="pipes-sqs-batch-failures"></a>

When EventBridge consumes and processes streaming data from an source, by default it checkpoints to the highest sequence number of a batch, but only when the batch is a complete success. To avoid reprocessing successfully processed messages in a failed batch, you can configure your enrichment or target to return an object indicating which messages succeeded and which failed. This is called a partial batch response.

For more information, see [Partial batch failure](eb-pipes-batching-concurrency.md#pipes-partial-batch-failure).

### Success and failure conditions
<a name="pipes-sqs-batch-failures-conditions"></a>

If you return any of the following, EventBridge treats a batch as a complete success:
+ An empty `batchItemFailure` list
+ A null `batchItemFailure` list
+ An empty `EventResponse`
+ A null `EventResponse`

If you return any of the following, EventBridge treats a batch as a complete failure:
+ An empty string `itemIdentifier`
+ A null `itemIdentifier`
+ An `itemIdentifier` with a bad key name

EventBridge retries failures based on your retry strategy.

# Event filtering in Amazon EventBridge Pipes
<a name="eb-pipes-event-filtering"></a>

With EventBridge Pipes, you can filter a given source's events and process only a subset of them. This filtering works in the same way as filtering on an EventBridge event bus or Lambda event source mapping, by using event patterns. For more information about event patterns, see [Creating Amazon EventBridge event patterns](eb-event-patterns.md).

A filter criteria `FilterCriteria` object is a structure that consists of a list of filters (`Filters`). Each filter is a structure that defines a filtering pattern (`Pattern`). A `Pattern` is a string representation of a JSON filter rule. A `FilterCriteria` object looks like the following example:

```
{
  "Filters": [
    {"Pattern": "{ \"Metadata1\": [ pattern1 ], \"data\": { \"Data1\": [ pattern2 ] }}"
    }
  ]
}
```

For added clarity, here is the value of the filter's `Pattern` expanded in plain JSON:

```
{
  "Metadata1": [ pattern1 ],
  "data": {"Data1": [ pattern2 ]}
}
```

Amazon Kinesis, Amazon MQ, Amazon MSK, and self managed Apache Kafka apply Base64 encoding to the payload, but not the metadata fields. For example, suppose your Kinesis stream contains an event like this:

```
{
  "kinesisSchemaVersion": "1.0",
  "partitionKey": "1",
  "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
  "data": {"City": "Seattle",
    "State": "WA",
    "Temperature": "46",
    "Month": "December"
  },
  "approximateArrivalTimestamp": 1545084650.987
}
```

When the event flows through your pipe, it'll look like the following with the `data` field base64-encoded:

```
{
  "kinesisSchemaVersion": "1.0",
  "partitionKey": "1",
  "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
  "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
  "approximateArrivalTimestamp": 1545084650.987,
  "eventSource": "aws:kinesis",
  "eventVersion": "1.0",
  "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
  "eventName": "aws:kinesis:record",
  "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
  "awsRegion": "us-east-2",
  "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
}
```

When you're creating event filters, EventBridge Pipes can access event content. This content is either JSON-escaped, such as the Amazon SQS `body` field, or base64-encoded, such as the Kinesis `data` field. If your data is valid JSON, your input templates or JSON paths for target parameters can reference the content directly, as EventBridge Pipes will automatically decode it. For example, if a Kinesis event source is valid JSON, you can reference a variable using `<$.data.someKey>`.

Continuing our example, to filter on the non-encoded `partitionKey` metadata outside the `data` object and the base64 encoded `City` property inside the `data` object, you would use the following filter:

```
{
  "partitionKey": [ "1" ],
  "data": {
    "City": [ "Seattle" ]
  }
}
```

When creating event patterns, you can filter based on the fields sent by the source API, and not on fields added by the polling operation. The following fields can't be used in event patterns:
+ `awsRegion`
+ `eventSource`
+ `eventSourceARN`
+ `eventVersion`
+ `eventID`
+ `eventName`
+ `invokeIdentityArn`
+ `eventSourceKey`

The following sections explain filtering behavior for each supported event source type.

## Filtering Amazon SQS messages
<a name="pipes-filter-sqs"></a>

If an Amazon SQS message doesn't satisfy your filter criteria, EventBridge automatically removes the message from the queue. You don't have to delete these messages manually in Amazon SQS. Connecting multiple pipes to one SQS queue is unlikely to be a useful setup - the pipes would be competing for messages that'll be dropped if unmatched.

An Amazon SQS message body can contain any string, not just JSON. EventBridge Pipes expects your `FilterCriteria` format to match the format of the incoming messages, either valid JSON or a plain string. If there is a mismatch, EventBridge Pipes drops the message. If your `FilterCriteria` don't include `body`, meaning you filter only by metadata, EventBridge Pipes skips this check. The following table summarizes the evaluation:


| Filter pattern format | Incoming format | Result | 
| --- | --- | --- | 
|  Plain string  |  Plain string  |  EventBridge filters based on your filter criteria.  | 
|  Plain string  |  Valid JSON  |  EventBridge drops the message.  | 
|  Valid JSON  |  Plain string  |  EventBridge drops the message.  | 
|  Valid JSON  |  Valid JSON  |  EventBridge filters based on your filter criteria.  | 
|  No filter pattern for `body`  |  Plain string  |  EventBridge filters based on your filter criteria.  | 
|  No filter pattern for `body`  |  Valid JSON  |  EventBridge filters based on your filter criteria.  | 

## Filtering Kinesis and DynamoDB messages
<a name="pipes-filter-kinesis-dynamobd"></a>

After your filter criteria processes a Kinesis or DynamoDB record, the streams iterator advances past this record. If the record doesn't satisfy your filter criteria, you don't have to delete the record manually from your event source. After the retention period, Kinesis and DynamoDB automatically delete these old records. If you want records to be deleted sooner, see [Changing the Data Retention Period](https://docs.aws.amazon.com/kinesis/latest/dev/kinesis-extended-retention.html).

To properly filter events from stream event sources, both the data field and your filter criteria for the data field must be in valid JSON format. (For Kinesis, the data field is `data`. For DynamoDB, the data field is `dynamodb`.) If either field isn't in a valid JSON format, EventBridge drops the message or throws an exception. The following table summarizes the specific behavior:


| Filter pattern format | Incoming format | Result | 
| --- | --- | --- | 
|  Valid JSON  |  Valid JSON  |  EventBridge filters based on your filter criteria.  | 
|  Valid JSON  |  Non-JSON  |  EventBridge drops the message.  | 
|  No filter pattern for `data` (Kinesis) or `dynamodb` (DynamoDB)  |  Valid JSON  |  EventBridge filters based on your filter criteria.  | 
|  No filter pattern for `data` (Kinesis) or `dynamodb` (DynamoDB)  |  Non-JSON  |  EventBridge filters based on your filter criteria.  | 
|  Non-JSON  |  Any  |  EventBridge throws an exception at the time of Pipe creation or update. The filter pattern must be valid JSON format.  | 

## Filtering Amazon Managed Streaming for Apache Kafka, self managed Apache Kafka, and Amazon MQ messages
<a name="pipes-filter-poller"></a>

**Note**  
After you attach filter criteria to a pipe with an Apache Kafka or Amazon MQ event source, it can take up to 15 minutes to apply your filtering rules to events.

For [Amazon MQ sources](eb-pipes-mq.md), the message field is `data`. For Apache Kafka sources ([Amazon MSK](eb-pipes-msk.md) and [self managed Apache Kafka](eb-pipes-kafka.md)), there are two message fields: `key` and `value`.

EventBridge drops messages that don't match all fields included in the filter. For Apache Kafka, EventBridge commits offsets for matched and unmatched messages after successfully invoking the target. For Amazon MQ, EventBridge acknowledges matched messages after successfully invoking the function and acknowledges unmatched messages when filtering them.

Apache Kafka and Amazon MQ messages must be UTF-8 encoded strings, either plain strings or in JSON format. That's because EventBridge decodes Apache Kafka and Amazon MQ byte arrays into UTF-8 before applying filter criteria. If your messages use another encoding, such as UTF-16 or ASCII, or if the message format doesn't match the `FilterCriteria` format, EventBridge processes metadata filters only. The following table summarizes the specific behavior:


| Filter pattern format | Incoming format | Result | 
| --- | --- | --- | 
|  Plain string  |  Plain string  |  EventBridge filters based on your filter criteria.  | 
|  Plain string  |  Valid JSON  |  EventBridge filters on metadata only, ignoring the `data` field (Amazon MQ) or `key` and `value` fields (Apache Kafka)  | 
|  Valid JSON  |  Plain string  |  EventBridge filters on metadata only, ignoring the `data` field (Amazon MQ) or `key` and `value` fields (Apache Kafka)  | 
|  Valid JSON  |  Valid JSON  |  EventBridge filters based on your filter criteria.  | 
|  No filter pattern for `data` (Amazon MQ) or `key` and `value` (Apache Kafka)  |  Plain string  |  EventBridge filters on metadata only, ignoring the `data` field (Amazon MQ) or `key` and `value` fields (Apache Kafka)  | 
|  No filter pattern for `data` (Amazon MQ) or `key` and `value` (Apache Kafka)  |  Valid JSON  |  EventBridge filters on metadata only, ignoring the `data` field (Amazon MQ) or `key` and `value` fields (Apache Kafka)  | 
|  Any  |  Non-UTF encoded string  |  EventBridge filters on metadata only, ignoring the `data` field (Amazon MQ) or `key` and `value` fields (Apache Kafka)  | 

## Differences between Lambda ESM and EventBridge Pipes
<a name="pipes-filter-esm-diff"></a>

When filtering events, Lambda ESM and EventBridge Pipes operate generally the same way. The main difference is that `eventSourceKey` field isn't present in ESM payloads.

## Using comparison operators in pipe filters
<a name="pipes-filter-comparison-operators"></a>

Comparison operators enable you to construct event patterns that match against field values in events.

For a complete list of the comparison operators supported for use in pipe filters, see [Comparison operators](eb-create-pattern-operators.md).

# Event enrichment in Amazon EventBridge Pipes
<a name="pipes-enrichment"></a>

With the enrichment step of EventBridge Pipes, you can enhance the data from the source before sending it to the target. For example, you might receive *Ticket created* events that don’t include the full ticket data. Using enrichment, you can have a Lambda function call the `get-ticket` API for the full ticket details. Pipes can then send that information to a [target](eb-pipes-event-target.md).

You can configure the following enrichments when setting up a pipe in EventBridge:
+ API destination
+ Amazon API Gateway
+ Lambda function
+ Step Functions state machine
**Note**  
EventBridge Pipes only supports [Express workflows](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-standard-vs-express.html) as enrichments.

EventBridge invokes enrichments synchronously because it must wait for a response from the enrichment before invoking the target.

Enrichment responses are limited to a maximum size of 6MB.

You can also transform the data you receive from the source before sending it for enhancement. For more information, see [Amazon EventBridge Pipes input transformation](eb-pipes-input-transformation.md).

## Filtering events using enrichment
<a name="pipes-enrichment-filtering"></a>

EventBridge Pipes passes the enrichment responses directly to the configured target. This includes array responses for targets that support batches. For more information about batch behavior, see [Amazon EventBridge Pipes batching and concurrency](eb-pipes-batching-concurrency.md). You can also use your enrichment as a filter and pass fewer events than were received from the source. If you don’t want to invoke the target, return an empty response, such as `""`, `{}`, or `[]`.

**Note**  
If you want to invoke the target with an empty payload, return an array with empty JSON `[{}]`.

## Invoking enrichments
<a name="pipes-invocation"></a>

EventBridge invokes enrichments synchronously (invocation type set to `REQUEST_RESPONSE`) because it must wait for a response from the enrichment before invoking the target.

**Note**  
For Step Functions state machines, EventBridge only supports [Express workflows](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-standard-vs-express.html) as enrichments, because they can be invoked synchronously.

# Amazon EventBridge Pipes targets
<a name="eb-pipes-event-target"></a>

You can send data in your pipe to a specific target. You can configure the following targets when setting up a pipe in EventBridge:
+ [API destination](eb-api-destinations.md)
+ [API Gateway](eb-api-gateway-target.md)
+ [Batch job queue](#pipes-targets-specifics-batch)
+ [CloudWatch log group](#pipes-targets-specifics-cwl)
+ [ECS task](#pipes-targets-specifics-ecs-task)
+ [Event bus in the same account and Region](#pipes-targets-specifics-eventbridge)
+ Firehose delivery stream
+ Inspector assessment template
+ Kinesis stream
+ [Lambda function (SYNC or ASYNC)](#pipes-targets-specifics-lambda-stepfunctions)
+ Redshift cluster data API queries
+ SageMaker AI Pipeline
+ Amazon SNS topic (SNS FIFO topics not supported)
+ Amazon SQS queue
+ [Step Functions state machine](#pipes-targets-specifics-lambda-stepfunctions)
  + Express workflows (SYNC or ASYNC)
  + Standard workflows (ASYNC)
+ [Timestream for LiveAnalytics table](#pipes-targets-specifics-timestream)

## Target parameters
<a name="pipes-targets-specific-parms"></a>

Some target services don't send the event payload to the target, instead, they treat the event as a trigger for invoking a specific API. EventBridge uses the [https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetParameters.html](https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetParameters.html) to specify what information gets sent to that API. These include the following:
+ API destinations (The data sent to an API destination must match the structure of the API. You must use the [https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetParameters.html#pipes-Type-PipeTargetParameters-InputTemplate](https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetParameters.html#pipes-Type-PipeTargetParameters-InputTemplate) object to make sure the data is structured correctly. If you want to include the original event payload, reference it in the [https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetParameters.html#pipes-Type-PipeTargetParameters-InputTemplate](https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetParameters.html#pipes-Type-PipeTargetParameters-InputTemplate).)
+ API Gateway (The data sent to API Gateway must match the structure of the API. You must use the [https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetParameters.html#pipes-Type-PipeTargetParameters-InputTemplate](https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetParameters.html#pipes-Type-PipeTargetParameters-InputTemplate) object to make sure the data is structured correctly. If you want to include the original event payload, reference it in the [https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetParameters.html#pipes-Type-PipeTargetParameters-InputTemplate](https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetParameters.html#pipes-Type-PipeTargetParameters-InputTemplate).)
+ [https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetRedshiftDataParameters.html](https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetRedshiftDataParameters.html) (Amazon Redshift Data API clusters)
+ [https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetSageMakerPipelineParameters.html](https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetSageMakerPipelineParameters.html) (Amazon SageMaker Runtime Model Building Pipelines)
+ [https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetBatchJobParameters.html](https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetBatchJobParameters.html) (AWS Batch)

**Note**  
EventBridge does not support all JSON Path syntax and evaluate it at runtime. Supported syntax includes:   
dot notation (for example,`$.detail`)
dashes
underscores
alphanumeric characters
array indices
wildcards (\$1)
forward slashes

### Dynamic path parameters
<a name="pipes-targets-dynamic-parms"></a>

EventBridge Pipes target parameters support optional dynamic JSON path syntax. You can use this syntax to specify JSON paths instead of static values (for example `$.detail.state`). The entire value has to be a JSON path, not only part of it. For example, `RedshiftParameters.Sql` can be `$.detail.state` but it can't be `"SELECT * FROM $.detail.state"`. These paths are replaced dynamically at runtime with data from the event payload itself at the specified path. Dynamic path parameters can't reference new or transformed values resulting from input transformation. The supported syntax for dynamic parameter JSON paths is the same as when transforming input. For more information, see [Amazon EventBridge Pipes input transformation](eb-pipes-input-transformation.md).

Dynamic syntax can be used on all string, non-enum fields of all EventBridge Pipes enrichment and target parameters except:
+ [https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetCloudWatchLogsParameters.html](https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetCloudWatchLogsParameters.html)
+ [https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetEventBridgeEventBusParameters.html](https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetEventBridgeEventBusParameters.html)
+ [https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeEnrichmentHttpParameters.html](https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeEnrichmentHttpParameters.html)
+ [https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetHttpParameters.html](https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetHttpParameters.html)

For example, to set the `PartitionKey` of a pipe Kinesis target to a custom key from your source event, set the [KinesisTargetParameter.PartitionKey](https://docs.aws.amazon.com/) to: 
+ `"$.data.someKey"` for a Kinesis source
+ `"$.body.someKey"` for an Amazon SQS source

Then, if the event payload is a valid JSON string, such as `{"someKey":"someValue"}`, EventBridge extracts the value from the JSON path and uses it as the target parameter. In this example, EventBridge would set the Kinesis `PartitionKey` to "*someValue*".

## Permissions
<a name="pipes-targets-permissions"></a>

To make API calls on the resources that you own, EventBridge Pipes needs appropriate permission. EventBridge PIpes uses the IAM role that you specify on the pipe for enrichment and target calls using the IAM principal `pipes.amazonaws.com`.

## Invoking targets
<a name="pipes-targets-invocation"></a>

EventBridge has the following ways to invoke a target:
+ **Synchronously** (invocation type set to `REQUEST_RESPONSE`) – EventBridge waits for a response from the target before proceeding.
+ **Asynchronously** (invocation type set to `FIRE_AND_FORGET`) – EventBridge doesn't wait for a response before proceeding.

By default, for pipes with ordered sources, EventBridge invokes targets synchronously because a response from the target is needed before proceeding to the next event. 

If an source doesn't enforce order, such as a standard Amazon SQS queue, EventBridge can invoke a supported target synchronously or asynchronously. 

With Lambda functions and Step Functions state machines, you can configure the invocation type.

**Note**  
For Step Functions state machines, [Standard workflows](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-standard-vs-express.html) must be invoked asynchronously.

## Payload size limits
<a name="pipes-targets-payload-size"></a>

EventBridge Pipes supports payloads up to 6 MB. However, the effective payload size limit is determined by whichever is smaller: the Pipes limit of 6 MB or the target service's maximum payload size. For example:
+ Lambda functions support payloads up to 6 MB, so the effective limit for a pipe targeting Lambda is 6 MB.
+ EventBridge event buses support payloads up to 1 MB, so the effective limit for a pipe targeting an event bus is 1 MB.
+ Step Functions state machines support payloads up to 256 KB, so the effective limit for a pipe targeting Step Functions is 256 KB.

When configuring your pipe, ensure your payload size, including any transformations applied by enrichment or input transformation, does not exceed the target's maximum payload size.

## AWS Batch job queues target specifics
<a name="pipes-targets-specifics-batch"></a>

All AWS Batch `submitJob` parameters are configured explicitly with `BatchParameters`, and as with all Pipe parameters, these can be dynamic using a JSON path to your incoming event payload.

## CloudWatch Logs group target specifics
<a name="pipes-targets-specifics-cwl"></a>

Whether you use an input transformer or not, the event payload is used as the log message. You can set the `Timestamp` (or the explicit `LogStreamName` of your destination) through `CloudWatchLogsParameters` in `PipeTarget`. As with all pipe parameters, these parameters can be dynamic when using a JSON path to your incoming event payload.

## Amazon ECS task target specifics
<a name="pipes-targets-specifics-ecs-task"></a>

All Amazon ECS `runTask` parameters are configured explicitly through `EcsParameters`. As with all pipe parameters, these parameters can be dynamic when using a JSON path to your incoming event payload.

## Lambda functions and Step Functions workflow target specifics
<a name="pipes-targets-specifics-lambda-stepfunctions"></a>

Lambda and Step Functions do not have a batch API. To process batches of events from a pipe source, the batch is converted to a JSON array and passed to as input to the Lambda or Step Functions target. For more information, see [Amazon EventBridge Pipes batching and concurrency](eb-pipes-batching-concurrency.md). 

## Timestream for LiveAnalytics table target specifics
<a name="pipes-targets-specifics-timestream"></a>

Considerations when specifying a Timestream for LiveAnalytics table as a pipe target include:
+ Apache Kafka streams (including fromAmazon MSK or third-party providers) are not currently supported as a pipe source.
+ If you have specified a Kinesis or DynamoDB stream as the pipe source, you must specify the number of retry attempts.

  For more information, see [Configuring the pipe settings](eb-pipes-create.md#pipes-configure-pipe-settings).

## EventBridge event bus target specifics
<a name="pipes-targets-specifics-eventbridge"></a>

When you configure an EventBridge event bus as a pipe target, the payload from your pipe is automatically placed in the `detail` section of the EventBridge event. Use [https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetEventBridgeEventBusParameters.html](https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_PipeTargetEventBridgeEventBusParameters.html) to configure the event's `source` and `detail-type` fields. Both fields support dynamic JSON path syntax to extract values from your event payload. For example, set `Source` to `$.body.source` or `DetailType` to `$.data.eventType`. You can also use input transformers to modify the event structure before it's placed in the `detail` field. For more information, see [Amazon EventBridge Pipes input transformation](eb-pipes-input-transformation.md).

# Amazon EventBridge Pipes batching and concurrency
<a name="eb-pipes-batching-concurrency"></a>

## Batching behavior
<a name="pipes-batching"></a>

EventBridge Pipes supports batching from the source and to targets that support it. In addition, batching to enrichment is supported for AWS Lambda and AWS Step Functions. Because different services support different levels of batching, you can’t configure a pipe with a larger batch size than the target supports. For example, Amazon Kinesis stream sources support a maximum batch size of 10,000 records, but Amazon Simple Queue Service supports a maximum of 10 messages per batch as a target. Therefore, a pipe from a Kinesis stream to an Amazon SQS queue can have a maximum configured batch size on the source of 10.

If you configure a pipe with an enrichment or target that doesn’t support batching, you won’t be able to activate batching on the source.

When batching is activated on the source, arrays of JSON records are passed through the pipe and then mapped to the batch API of a supported enrichment or target. [Input transformers](eb-pipes-input-transformation.md) are applied separately on each individual JSON record in the array, not the array as a whole. For examples of these arrays, see [Amazon EventBridge Pipes sources](eb-pipes-event-source.md) and select a specific source. Pipes will use the batch API for the supported enrichment or target even if the batch size is 1. If the enrichment or target doesn’t have a batch API but receives full JSON payloads, such as Lambda and Step Functions, the entire JSON array is sent in one request. The request will be sent as a JSON array even if the batch size is 1.

If a pipe is configured for batching at the source, and the target supports batching, you can return an array of JSON items from your enrichment. This array can include a shorter or longer array than the original source. However, if the array is larger than the batch size supported by the target, the pipe won’t invoke the target.

### Supported batchable targets
<a name="pipes-batchable-target"></a>


| Target | Maximum batch size | 
| --- | --- | 
| CloudWatch Logs | 10,000 | 
| EventBridge event bus | 10 | 
| Firehose stream | 500 | 
| Kinesis stream | 500 | 
| Lambda function | customer defined | 
| Step Functions state machine | customer defined | 
| Amazon SNS topic | 10 | 
| Amazon SQS queue | 10 | 

The following enrichments and targets receive the full batch event payload for processing and are constrained by the total payload size of the event, rather than the size of the batch:
+ Step Functions state machine (262144 characters)
+ Lambda function (6MB)

### Partial batch failure
<a name="pipes-partial-batch-failure"></a>

For Amazon SQS and stream sources, such as Kinesis and DynamoDB, EventBridge Pipes supports partial batch failure handling of target failures. If the target supports batching and only part of the batch succeeds, EventBridge automatically retries batching the remainder of the payload. For the most up-to-date enriched content, this retry occurs through the entire pipe, including re-invoking any configured enrichment.

Partial batch failure handling of the enrichment is not supported.

For Lambda and Step Functions targets, you can also specify a partial failure by returning a payload with defined structure from the target. This indicates events that need to be retried. 

**Example partial failure payload structure**

```
{ 
  "batchItemFailures": [ 
    {
      "itemIdentifier": "id2"
    },
    {
      "itemIdentifier": "id4"
    }
]
```

In the example, the `itemIdentifier` match the ID of the events handled by your target from their original source. For Amazon SQS, this is the `messageId`. For Kinesis and DynamoDB, this is the `eventID`. For EventBridge Pipes to adequately handle partial batch failures from the targets, these fields need to be included in any array payload returned by the enrichment.

## Throughput and concurrency behavior
<a name="pipes-concurrency"></a>

Every event or batch of events received by a pipe that travel to an enrichment or target is considered a pipe *execution*. A pipe in `STARTED` state continuously polls for events from the source, scaling up and down depending on the available backlog and configured batching settings. 

For quotas on concurrent pipe executions, and number of pipes per account and Region, see [EventBridge Pipes quotas](eb-quota.md#eb-pipes-limits).

By default, a single pipe will scale to the following maximum concurrent executions, depending on the source:
+ **DynamoDB** – The concurrent executions can climb as high as the `ParallelizationFactor` configured on the pipe multiplied by the number of shards in the stream.
+ **Apache Kafka** – The concurrent executions can climb as high the number of partitions on the topic, up to 1000.
+ **Kinesis** – The concurrent executions can climb as high as the `ParallelizationFactor` configured on the pipe multiplied by the number of shards in the stream.
+ **Amazon MQ** – 5
+ **Amazon SQS** – 1250

If you have requirements for higher maximum polling throughputs or concurrency limits, [contact support](https://console.aws.amazon.com/support/home?#/case/create?issueType=technical).

**Note**  
The execution limits are considered best-effort safety limitations. Although polling isn't throttled below these values, a pipe or account might burst higher than these recommend values.

Pipe executions are limited to a maximum of 5 minutes including the enrichment and target processing. This limit currently can't be increased.

Pipes with strictly ordered sources, such as Amazon SQS FIFO queues, Kinesis and DynamoDB Streams, or Apache Kafka topics) are further limited in concurrency by the configuration of the source, such as the number of message group IDs for FIFO queues or the number of shards for Kinesis queues. Because ordering is strictly guaranteed within these constraints, a pipe with an ordered source can't exceed those concurrency limits. 

# Amazon EventBridge Pipes input transformation
<a name="eb-pipes-input-transformation"></a>

Amazon EventBridge Pipes support optional input transformers when passing data to the enrichment and the target. You can use Input transformers to reshape the JSON event input payload to serve the needs of the enrichment or target service. For Amazon API Gateway and API destinations, this is how you shape the input event to the RESTful model of your API. Input transformers are modeled as an `InputTemplate` parameter. They can be free text, a JSON path to the event payload, or a JSON object that includes inline JSON paths to the event payload. For enrichment, the event payload is coming from the source. For targets, the event payload is what is returned from the enrichment, if one is configured on the pipe. In addition to the service-specific data in the event payload, you can use [reserved variables](#input-transform-reserved) in your `InputTemplate` to reference data for your pipe.

To access items in an array, use square bracket notation.

**Note**  
EventBridge does not support all JSON Path syntax and evaluate it at runtime. Supported syntax includes:   
dot notation (for example,`$.detail`)
dashes
underscores
alphanumeric characters
array indices
wildcards (\$1)
forward slashes

The following are sample `InputTemplate` parameters referencing an Amazon SQS event payload:

**Static string**

```
InputTemplate: "Hello, sender"
```

**JSON Path**

```
InputTemplate: <$.attributes.SenderId>
```

**Dynamic string**

```
InputTemplate: "Hello, <$.attributes.SenderId>"
```

**Static JSON**

```
InputTemplate: >
{
  "key1": "value1",
  "key2": "value2",
  "key3": "value3",
}
```

**Dynamic JSON**

```
InputTemplate: >
{
  "key1": "value1"
  "key2": <$.body.key>,
  "d": <aws.pipes.event.ingestion-time>
}
```

Using square bracket notation to access an item in an array:

```
InputTemplate: >
{
  "key1": "value1"
  "key2": <$.body.Records[3]>,
  "d": <aws.pipes.event.ingestion-time>
}
```

**Note**  
EventBridge replaces input transformers at runtime to ensure a valid JSON output. Because of this, put quotes around variables that refer to JSON path parameters, but do not put quotes around variables that refer to JSON objects or arrays.

## Reserved variables
<a name="input-transform-reserved"></a>

Input templates can use the following reserved variables:
+ `<aws.pipes.pipe-arn>` – The Amazon Resource Name (ARN) of the pipe.
+ `<aws.pipes.pipe-name>` – The name of the pipe.
+ `<aws.pipes.source-arn>` – The ARN of the event source of the pipe.
+ `<aws.pipes.enrichment-arn>` – The ARN of the enrichment of the pipe.
+ `<aws.pipes.target-arn>` – The ARN of the target of the pipe.
+ `<aws.pipes.event.ingestion-time>` – The time at which the event was received by the input transformer. This is an ISO 8601 timestamp. This time is different for the enrichment input transformer and the target input transformer, depending on when the enrichment completed processing the event.
+ `<aws.pipes.event>` – The event as received by the input transformer.

  For an enrichment input transformer, this is the event from the source. This contains the original payload from the source, plus additional service-specific metadata. See the topics in [Amazon EventBridge Pipes sources](eb-pipes-event-source.md) for service-specific examples.

  For a target input transformer, this is the event returned by the enrichment, if one is configured, with no additional metadata. As such, an enrichment-returned payload may be non-JSON. If no enrichment is configured on the pipe, this is the event from the source with metadata.
+ `<aws.pipes.event.json>` – The same as `aws.pipes.event`, but the variable only has a value if the original payload, either from the source or returned by the enrichment, is JSON. If the pipe has an encoded field, such as the Amazon SQS `body` field or the Kinesis `data`, those fields are decoded and turned into valid JSON. Because it isn't escaped, the variable can only be used as a value for a JSON field. For more information, see [Implicit body data parsing](#input-transform-implicit).

## Input transform example
<a name="input-transform-example"></a>

The following is an example Amazon EC2 event that we can use as our *sample event*.

```
{
  "version": "0",
  "id": "7bf73129-1428-4cd3-a780-95db273d1602",
  "detail-type": "EC2 Instance State-change Notification",
  "source": "aws.ec2",
  "account": "123456789012",
  "time": "2015-11-11T21:29:54Z",
  "region": "us-east-1",
  "resources": [
    "arn:aws:ec2:us-east-1:123456789012:instance/i-abcd1111"
  ],
  "detail": {
    "instance-id": "i-0123456789",
    "state": "RUNNING"
  }
}
```

Let's use the following JSON as our *Transformer*.

```
{
  "instance" : <$.detail.instance-id>,
  "state": <$.detail.state>,
  "pipeArn" : <aws.pipes.pipe-arn>,
  "pipeName" : <aws.pipes.pipe-name>,
  "originalEvent" : <aws.pipes.event.json>
}
```

The following will be the resulting *Output*:

```
{
  "instance" : "i-0123456789",
  "state": "RUNNING",
  "pipeArn" : "arn:aws:pipe:us-east-1:123456789012:pipe/example",
  "pipeName" : "example",
  "originalEvent" : {
    ... // commented for brevity
  }
}
```

## Implicit body data parsing
<a name="input-transform-implicit"></a>

The following fields in the incoming payload may be JSON-escaped, such as the Amazon SQS `body` object, or base64-encoded, such as the Kinesis `data` object. For both [filtering](eb-pipes-event-filtering.md) and input transformation, EventBridge transforms these fields into valid JSON so sub-values can be referenced directly. For example, `<$.data.someKey>` for Kinesis.

To have the target receive the original payload without any additional metadata, use an input transformer with this body data, specific to the source. For example, `<$.body>` for Amazon SQS, or `<$.data>` for Kinesis. If the original payload is a valid JSON string (for example `{"key": "value"}`), then use of the input transformer with source specific body data will result in the quotes within the original source payload being removed. For example, `{"key": "value"}` will become `"{key: value}"` when delivered to the target. If your target requires valid JSON payloads (for example, EventBridge Lambda or Step Functions), this will cause delivery failure. To have the target receive the original source data without generating invalid JSON, wrap the source body data input transformer in JSON. For example, `{"data": <$.data>}`.

Implicit body parsing can also be used to dynamically populate values for most pipe target or enrichment parameters. For more information, see [Dynamic path parameters](eb-pipes-event-target.md#pipes-targets-dynamic-parms)

**Note**  
If the original payload is valid JSON, this field will contain the unescaped, non-base64-encoded JSON. However, if the payload is not valid JSON, EventBridge base64-encodes for the fields listed below, with the exception of Amazon SQS.
+ **Active MQ** – `data`
+ **Kinesis** – `data`
+ **Amazon MSK** – `key` and `value`
+ **Rabbit MQ** – `data`
+ **Self managed Apache Kafka;** – `key` and `value`
+ **Amazon SQS** – `body`

## Common issues with transforming input
<a name="eb-pipes-transform-input-issues"></a>

These are some common issues when transforming input in EventBridge pipes:
+  For Strings, quotes are required.
+  There is no validation when creating JSON path for your template.
+  If you specify a variable to match a JSON path that doesn't exist in the event, that variable isn't created and won't appear in the output.
+ JSON properties like `aws.pipes.event.json` can only be used as the value of a JSON field, not inline in other strings.
+  EventBridge doesn't escape values extracted by *Input Path*, when populating the *Input Template* for a target.
+ If a JSON path references a JSON object or array, but the variable is referenced in a string, EventBridge removes any internal quotes to ensure a valid string. For example, "Body is <\$1.body>" would result in EventBridge removing quotes from the object. 

  Therefore, if you want to output a JSON object based on a single JSON path variable, you must place it as a key. In this example, `{"body": <$.body>}`.
+ Quotes are not required for variables that represent strings. They are permitted, but EventBridge Pipes automatically adds quotes to string variable values during transformation, to ensure the transformation output is valid JSON. EventBridge Pipes does not add quotes to variables that represent JSON objects or arrays. Do not add quotes for variables that represent JSON objects or arrays.

  For example, the following input template includes variables that represent both strings and JSON objects:

  ```
  {
    "pipeArn" : <aws.pipes.pipe-arn>,
    "pipeName" : <aws.pipes.pipe-name>,
    "originalEvent" : <aws.pipes.event.json>
  }
  ```

  Resulting in valid JSON with proper quotation:

  ```
  {
    "pipeArn" : "arn:aws:events:us-east-2:123456789012:pipe/example",
    "pipeName" : "example",
    "originalEvent" : {
      ... // commented for brevity
    }
  }
  ```
+ For Lambda or Step Functions enrichments or targets, batches are delivered to the target as JSON arrays, even if the batch size is 1. However, input transformers will still be applied to individual records in the JSON Array, not the array as a whole. For more information, see [Amazon EventBridge Pipes batching and concurrency](eb-pipes-batching-concurrency.md).
+ Input transformers and filtering can extract JSON values that have been string-encoded once, but not values that have been string-encoded twice. This commonly occurs when an Amazon SNS message is sent to Amazon SQS. When Amazon SQS receives the Amazon SNS message, it stringifies the entire message. When Pipes then receives this Amazon SQS message, the Amazon SNS message content appears in the `body` field and is accessible. However, if the Amazon SNS `Message` field itself contains stringified JSON, that nested content is double-encoded and cannot be accessed by input transformers or filters. For example, `<$.body.TopicArn>` is accessible, but `<$.body.Message.operation>` is not if the `Message` field contains stringified JSON such as `"{\\\"operation\\\":\\\"UPDATE\\\",\\\"email\\\":\\\"user@example.com\\\"}"`.

  To work around this limitation, use an enrichment step with a Lambda function to parse the double-encoded content and extract the nested values. For more information about enrichment, see [Enrichment](pipes-enrichment.md).

# Logging Amazon EventBridge Pipes performance
<a name="eb-pipes-logs"></a>

EventBridge Pipes logging enables you to have EventBridge Pipes send records detailing pipe performance to supported AWS services. Use logs to gain insight into your pipe’s execution performance, and to help with troubleshooting and debugging.

You can select the following AWS services as *log destinations* to which EventBridge Pipes delivers records:
+ CloudWatch Logs

  EventBridge delivers log records to the specified CloudWatch Logs log group. 

  Use CloudWatch Logs to centralize the logs from all of your systems, applications, and AWS services that you use, in a single, highly scalable service. For more information, see [Working with log groups and log streams](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html) in the *Amazon CloudWatch Logs User Guide*.
+ Firehose stream logs

  EventBridge delivers log records to a Firehose delivery stream. 

  Amazon Data Firehose is a fully-managed service for delivering real-time streaming data to destinations such as certain AWS services, as well as any custom HTTP endpoint or HTTP endpoints owned by supported third-party service providers. For more information, see [Creating an Amazon Data Firehose delivery stream](https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html) in the *Amazon Data Firehose User Guide*.
+ Amazon S3 logs

  EventBridge delivers log records as Amazon S3 objects to the specified bucket.

  Amazon S3 is an object storage service that offers industry-leading scalability, data availability, security, and performance. For more information, see [Uploading, downloading, and working with objects in Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/uploading-downloading-objects.html) in the *Amazon Simple Storage Service User Guide*.

## How Amazon EventBridge Pipes logging works
<a name="eb-pipes-logs-overview"></a>

A pipe *execution* is an event or batch of events received by a pipe that travel to an enrichment and/or target. If enabled, EventBridge generates a log record for each execution step it performs as the event batch is processed. The information contained in the record applies to the event batch, be it a single event or up to 10,000 events.

You can configure the size of the event batch on the pipe source and target. For more information, see [Amazon EventBridge Pipes batching and concurrency](eb-pipes-batching-concurrency.md).

The record data sent to each log destination is the same.

If a Amazon CloudWatch Logs destination is configured, the log records delivered to all destinations have a limit of 256kb. Fields will be truncated as necessary.

You can customize the records EventBridge sends to the selected log destinations in the following way:
+ You can specify the *log level*, which determines the execution steps for which EventBridge sends records to the selected log destinations. For more information, see [Specifying EventBridge Pipes log level](#eb-pipes-logs-level).
+ You can specify whether EventBridge Pipes includes execution data in records for execution steps where it is relevant. This data includes: 
  + The payload of the event batch
  + The request sent to the AWS enrichment or target service
  + The response returned by the AWS enrichment or target service

  For more information, see [Including execution data in EventBridge Pipes logs](#eb-pipes-logs-execution-data).

## Specifying EventBridge Pipes log level
<a name="eb-pipes-logs-level"></a>

You can specify the types of execution steps for which EventBridge sends records to the selected log destinations. 

Choose from the following levels of detail to include in log records. The log level applies to all log destinations specified for the pipe. Each log level includes the execution steps of the previous log levels.
+ **OFF** – EventBridge does not send any records to any specified log destinations. This is the default setting.
+ **ERROR** – EventBridge sends any records related to errors generated during pipe execution to the specified log destinations.
+ **INFO** – EventBridge sends any records related to errors, as well as select other steps performed during pipe execution to the specified log destinations.
+ **TRACE** – EventBridge sends any records generated during any steps in the pipe execution to the specified log destinations.

In the EventBridge console, CloudWatch logs is selected as a log destination by default, as is the `ERROR` log level. So, by default, EventBridge Pipes creates a new CloudWatch log group to which it sends log records containing the `ERROR` level of detail. No default is selected when you configure logs programmatically. 

The following table lists the execution steps included in each log level.


****  

| Step | TRACE | INFO | ERROR | OFF | 
| --- | --- | --- | --- | --- | 
|  Execution Failed  | x | x | x |   | 
|  Execution Partially Failed  | x | x | x |  | 
|  Execution Started  | x | x |   |   | 
|  Execution Succeeded  | x | x |   |   | 
|  Execution Throttled  | x | x | x |   | 
|  Execution Timeout  | x | x | x |   | 
|  Enrichment Invocation Failed  | x | x | x |  | 
|  Enrichment Invocation Skipped  | x | x |  |  | 
|  Enrichment Invocation Started  | x |  |  |  | 
|  Enrichment Invocation Succeeded  | x |  |  |  | 
|  Enrichment Stage Entered  | x | x |  |  | 
|  Enrichment Stage Failed  | x | x | x |  | 
|  Enrichment Stage Succeeded  | x | x |  |  | 
|  Enrichment Transformation Failed  | x | x | x |  | 
|  Enrichment Transformation Started  | x |  |  |  | 
|  Enrichment Transformation Succeeded  | x |  |  |  | 
|  Target Invocation Failed  | x | x | x |  | 
|  Target Invocation Partially Failed  | x | x | x |  | 
|  Target Invocation Skipped  | x |  |  |  | 
|  Target Invocation Started  | x |  |  |  | 
|  Target Invocation Succeeded  | x |  |  |  | 
|  Target Stage Entered  | x | x |  |  | 
|  Target Stage Failed  | x | x | x |  | 
|  Target Stage Partially Failed  | x | x | x |  | 
|  Target Stage Skipped  | x |  |  |  | 
|  Target Stage Succeeded  | x | x |  |  | 
|  Target Transformation Failed  | x | x | x |  | 
|  Target Transformation Started  | x |  |  |  | 
|  Target Transformation Succeeded  | x |  |  |  | 

## Including execution data in EventBridge Pipes logs
<a name="eb-pipes-logs-execution-data"></a>

You can specify for EventBridge to include *execution data* in the records it generates. Execution data includes fields representing the event batch payload, as well as the request sent to and the response from the enrichment and target.

Execution data is useful for troubleshooting and debugging. The `payload` field contains the actual contents of each event included in the batch, enabling you to correlate individual events to a specific pipe execution.

If you choose to include execution data, it is included for all log destinations specified for the pipe.

**Important**  
These fields may contain sensitive information. EventBridge makes no attempt to redact the contents of these fields during logging.

When including execution data, EventBridge adds the following fields to the relevant records: 
+ **`payload`**

  Represents the contents of the event batch being processed by the pipe.

  EventBridge includes the `payload` field in records generated at steps where the event batch contents may have been updated. This includes the following steps:
  + `EXECUTION_STARTED`
  + `ENRICHMENT_TRANSFORMATION_SUCCEEDED`
  + `ENRICHMENT_STAGE_SUCCEEDED`
  + `TARGET_TRANSFORMATION_SUCCEEDED`
  + `TARGET_STAGE_SUCCEEDED`
+ **`awsRequest`**

  Represents the request sent to the enrichment or target as a JSON string. For requests sent to an API destination, this represents the HTTP request sent to that endpoint.

  EventBridge includes the `awsRequest` field in records generated at the final steps of enrichment and targeting; that is, after EventBridge has executed or attempted to execute the request against the specified enrichment or target service. This includes the following steps:
  + `ENRICHMENT_INVOCATION_FAILED`
  + `ENRICHMENT_INVOCATION_SUCCEEDED`
  + `TARGET_INVOCATION_FAILED`
  + `TARGET_INVOCATION_PARTIALLY_FAILED`
  + `TARGET_INVOCATION_SUCCEEDED`
+ **`awsResponse`**

  Represents the response returned by the enrichment or target, in JSON format. For requests sent to an API destination, this represents the HTTP response returned from that endpoint.

  As with `awsRequest`, EventBridge includes the `awsResponse` field in records generated at the final steps of enrichment and targeting; that is, after EventBridge has executed or attempted to execute a request against the specified enrichment or target service and received a response. This includes the following steps:
  + `ENRICHMENT_INVOCATION_FAILED`
  + `ENRICHMENT_INVOCATION_SUCCEEDED`
  + `TARGET_INVOCATION_FAILED`
  + `TARGET_INVOCATION_PARTIALLY_FAILED`
  + `TARGET_INVOCATION_SUCCEEDED`

For a discussion of pipe execution steps, see [EventBridge Pipes execution steps](eb-pipes-logs-execution-steps.md).

### Truncating execution data in EventBridge Pipes log records
<a name="eb-pipes-logs-execution-data-truncation"></a>

If you choose to have EventBridge include execution data in a pipe's log records, there is a possibility that a record may exceed the 256 KB size limit. To prevent this, EventBridge automatically truncates the execution data fields, in the following order. EventBridge truncates each field entirely before progressing to truncate the next field. EventBridge truncates field data simply by removing characters from the end of the data string; no attempt is made to truncate based on data importance, and truncation will invalidate JSON formatting.
+ `payload`
+ `awsRequest`
+ `awsResponse`

If EventBridge does truncate fields in the event, the `truncatedFields` field includes a list of the truncated data fields.

## Error reporting in EventBridge Pipes log records
<a name="eb-pipes-logs-errors"></a>

EventBridge also includes error data, where available, in pipe execution steps that represent failure states. These steps include:
+ `ExecutionThrottled`
+ `ExecutionTimeout`
+ `ExecutionFailed`
+ `ExecutionPartiallyFailed`
+ `EnrichmentTransformationFailed`
+ `EnrichmentInvocationFailed`
+ `EnrichmentStageFailed`
+ `TargetTransformationFailed`
+ `TargetInvocationFailed`
+ `TargetInvocationPartiallyFailed`
+ `TargetStageFailed`
+ `TargetStagePartiallyFailed`

# EventBridge Pipes execution steps
<a name="eb-pipes-logs-execution-steps"></a>

Understanding the flow of pipe execution steps can aid you in troubleshooting or debugging your pipe's performance using logs.

A pipe *execution* is an event or batch of events received by a pipe that travel to an enrichment or target. If enabled, EventBridge generates a log record for each execution step it performs as the event batch is processed. 

At a high level, the execution contains two *stages*, or collection of steps: enrichment, and target. Each of these stages consists of transformation and invocation steps.

The main steps of a successful pipe execution follows this flow:
+ The pipe execution starts.
+ The execution enters the enrichment stage if you have specified an enrichment for the events. If you haven't specified an enrichment, the execution proceeds to the target stage.

  In the enrichment stage, the pipe performs any transformation you have specified, then invokes the enrichment.
+ In the target stage, the pipe performs any transformation you have specified, then invokes the target. 

  If you haven't specified transformation or target, the execution skips the target stage.
+ The pipe execution completes successfully.

The diagram below demonstrates this flow. Diverging paths are formatted as dotted lines.

![\[A pipe execution including enrichment and target stages, with transformation and invocation steps.\]](http://docs.aws.amazon.com/eventbridge/latest/userguide/images/pipes-logging-overview_eventbridge_architecture.svg)


The diagram below presents a detailed view of the pipe execution flow, with all possible execution steps represented. Again, diverging paths are formatted as dotted lines

For a complete list of pipe execution steps, see [Specifying EventBridge Pipes log level](eb-pipes-logs.md#eb-pipes-logs-level).

![\[The pipe execution flow, including all stages and steps with all possible outcomes.\]](http://docs.aws.amazon.com/eventbridge/latest/userguide/images/pipes-logging-detailed_eventbridge_architecture.svg)


Note that target invocation may result in a partial failure of the batch. For more information, see [Batching behavior](eb-pipes-batching-concurrency.md#pipes-batching).

# EventBridge Pipes log schema reference
<a name="eb-pipes-logs-schema"></a>

The following reference details the schema for EventBridge Pipes log records.

Each log record represents a pipe execution step, and may contain up to 10,000 events if the pipe source and target have been configured for batching.

For more information, see [Logging Amazon EventBridge Pipes performance](eb-pipes-logs.md).

```
{
    "executionId": "guid",
    "timestamp": "date_time",
    "messageType": "execution_step",
    "resourceArn": "arn:aws:pipes:region:account:pipe/pipe-name",
    "logLevel": "TRACE | INFO | ERROR",
    "payload": "{}",
    "awsRequest": "{}"
    "awsResponse":"{}"
    "truncatedFields": ["awsRequest","awsResponse","payload"],
    "error": {
        "httpStatusCode": code,
        "message": "error_message",
        "details": "",
        "awsService": "service_name",
        "requestId": "service_request_id"
    }
}
```

**executionId**  <a name="pipe-log-schema-execution-id"></a>
The ID of the pipe execution.  
A pipe execution is an event or batch of events received by a pipe that travel to an enrichment or target. For more information, see [How Amazon EventBridge Pipes logging works](eb-pipes-logs.md#eb-pipes-logs-overview).

**timestamp**  <a name="pipe-log-schema-timestamp"></a>
The date and time the log event was emitted.  
Unit: millisecond

**messageType**  <a name="pipe-log-schema-message-type"></a>
The pipe execution step for which the record was generated.  
For more information on pipe execution steps, see [EventBridge Pipes execution steps](eb-pipes-logs-execution-steps.md).

**resourceArn**  <a name="pipe-log-schema-resource-arn"></a>
The Amazon Resource Name (ARN) for the pipe.

**logLevel**  <a name="pipe-log-schema-loglevel"></a>
The level of detail specified for the pipe log.  
*Valid values*: `ERROR` \$1 `INFO` \$1 `TRACE`  
For more information, see [Specifying EventBridge Pipes log level](eb-pipes-logs.md#eb-pipes-logs-level).

**payload**  <a name="pipe-log-schema-payload"></a>
The contents of the event batch being processed by the pipe.  
EventBridge includes this field only if you have specified to include execution data in the logs for this pipe. For more information, see [Including execution data in EventBridge Pipes logs](eb-pipes-logs.md#eb-pipes-logs-execution-data)  
These fields may contain sensitive information. EventBridge makes no attempt to redact the contents of these fields during logging.
For more information, see [Including execution data in EventBridge Pipes logs](eb-pipes-logs.md#eb-pipes-logs-execution-data).

**awsRequest**  <a name="pipe-log-schema-aws-request"></a>
The request sent to the enrichment or target, in JSON format. For requests sent to an API destination, this represents the HTTP request sent to that endpoint.  
EventBridge includes this field only if you have specified to include execution data in the logs for this pipe. For more information, see [Including execution data in EventBridge Pipes logs](eb-pipes-logs.md#eb-pipes-logs-execution-data)  
These fields may contain sensitive information. EventBridge makes no attempt to redact the contents of these fields during logging.
For more information, see [Including execution data in EventBridge Pipes logs](eb-pipes-logs.md#eb-pipes-logs-execution-data).

**awsResponse**  <a name="pipe-log-schema-aws-response"></a>
The response returned by the enrichment or target, in JSON format. For requests sent to an API destination, this represents the HTTP response returned from that endpoint, and not the response returned by the API Destination service itself.  
EventBridge includes this field only if you have specified to include execution data in the logs for this pipe. For more information, see [Including execution data in EventBridge Pipes logs](eb-pipes-logs.md#eb-pipes-logs-execution-data)  
These fields may contain sensitive information. EventBridge makes no attempt to redact the contents of these fields during logging.
For more information, see [Including execution data in EventBridge Pipes logs](eb-pipes-logs.md#eb-pipes-logs-execution-data).

**truncatedFields**  <a name="pipe-log-schema-truncated-fields"></a>
A list of any execution data fields EventBridge has truncated to keep the record below the 256 KB size limitation.  
If EventBridge did not have to truncate any of the execution data fields, this field is present but `null`.  
For more information, see [Truncating execution data in EventBridge Pipes log records](eb-pipes-logs.md#eb-pipes-logs-execution-data-truncation).

**error**  <a name="pipe-log-schema-error"></a>
Contains information for any error generated during this pipe execution step.   
If no error was generated during this pipe execution step, this field is present but `null`.    
**httpStatusCode**  <a name="pipe-log-schema-http-status-code"></a>
The HTTP status code returned by the called service.  
**message**  <a name="pipe-log-schema-message"></a>
The error message returned by the called service.  
**details**  <a name="pipe-log-schema-details"></a>
Any detailed error information returned by the called service.  
**awsService**  <a name="pipe-log-schema-aws-service"></a>
The name of the service called.  
**requestId**  <a name="pipe-log-schema-request-id"></a>
The request ID for this request from the called service.

# Logging and monitoring Amazon EventBridge Pipes using Amazon CloudWatch Logs
<a name="eb-pipes-monitoring"></a>

You can log EventBridge Pipes invocations using CloudTrail and monitor the health of your pipes using CloudWatch metrics.

## CloudWatch metrics
<a name="pipes-monitoring-cloudwatch"></a>

EventBridge Pipes sends metrics to Amazon CloudWatch every minute for everything from a pipe executions being throttled to a target successfully being invoked.


| Metric | Description | Dimensions | Units | 
| --- | --- | --- | --- | 
|  `Concurrency`  |  The number of concurrent executions of a pipe.  | AwsAccountId | None | 
| `Duration` |  Length of time the pipe execution took.  | PipeName | Milliseconds | 
|  `EventCount`  |  The number of events a pipe has processed.  | PipeName | None | 
|  `EventSize`  |  The size of the payload of the event that invoked the pipe.  | PipeName | Bytes | 
|  `ExecutionThrottled`  |  How many executions of a pipe were throttled.  This value will be `0` if no executions were throttled.   | AwsAccountId, PipeName | None | 
|  `ExecutionTimeout`  |  How many executions of a pipe timed out before completing execution.  This value will be `0` if no executions timed out.   | PipeName | None | 
|  `ExecutionFailed`  |  How many executions of a pipe failed.  This value will be `0` if no executions failed.   | PipeName | None | 
|  `ExecutionPartiallyFailed`  |  How many executions of a pipe partially failed.  This value will be `0` if no executions partially failed.   | PipeName | None | 
|  `EnrichmentStageDuration`  |  How long the enrichment stage took to complete.  | PipeName | Milliseconds | 
|  `EnrichmentStageFailed`  |  How many executions of a pipe's enrichment stage failed.  This value will be `0` if no executions failed.   | PipeName | None | 
| `Invocations` |  Total number of invocations.  | AwsAccountId, PipeName | None | 
|  `TargetStageDuration`  |  How long the target stage took to complete.  | PipeName | Milliseconds | 
|  `TargetStageFailed`  |  How many executions of a pipe's target stage failed.  This value will be `0` if no executions failed.   | PipeName | None | 
| `TargetStagePartiallyFailed` | How many executions of a pipe's target stage partially failed.  This value will be `0` if no target stage executions partially failed.   | PipeName | None | 
| `TargetStageSkipped` | How many executions of a pipe's target stage were skipped (for example, due to the enrichment returning an empty payload).  | PipeName | Count | 

## Dimensions for CloudWatch metrics
<a name="pipes-monitoring-cloudwatch-dimensions"></a>

CloudWatch metrics have *dimensions*, or sortable attributes, which are listed below.


|  Dimension  |  Description  | 
| --- | --- | 
|  AwsAccountId  |  Filters the available metrics by account ID.  | 
|  PipeName  |  Filters the available metrics by pipe name.  | 

# Amazon EventBridge Pipes error handling and troubleshooting
<a name="eb-pipes-error-troubleshooting"></a>

Understanding the types of errors EventBridge Pipes may encounter, and how EventBridge handles those errors, can help you troubleshoot issues with your pipes.

## Retry behavior and error handling
<a name="eb-pipes-error-handling"></a>

EventBridge Pipes automatically retries enrichment and target invocation on any retryable AWS failures with the source service, the enrichment or target services, or EventBridge. However, if there are failures returned by enrichment or target customer implementations, the pipe polling throughput will gradually back off. For nearly continuous 4xx errors (such as authorization problems with IAM or missing resources), the pipe can be automatically disabled with an explanatory message in the `StateReason`.

## Pipe invocation errors and retry behavior
<a name="eb-pipes-error-invoke"></a>

When you invoke a pipe, two main types of errors can occur: *pipe internal errors* and *customer invocation errors*.

### Pipe internal errors
<a name="eb-pipes-error-invoke-internal"></a>

Pipe internal errors are errors resulting by aspects of the invocation managed by the EventBridge Pipes service. 

These types of errors can include issues such as:
+ A HTTP connection failure when attempting to invoke the customer target service
+ A transient drop in availability on the pipe service itself.

In general, EventBridge Pipes retries internal errors an indefinite number of times, and stops only when the record expires in the source. 

For pipes with a stream source, EventBridge Pipes does not count retries for internal errors against the maximum number of retries specified on the retry policy for the stream source. For pipes with an Amazon SQS source, EventBridge Pipes does not count retries for internal errors against the maximum receive count for the Amazon SQS source. 

### Customer invocation errors
<a name="eb-pipes-error-invoke-customer"></a>

Customer invocation errors are errors resulting from configuration or code managed by the user. 

These types of errors can include issues such as:
+ Insufficient permissions on the pipe to invoke the target.
+ A logic error in a synchronously-invoked customer Lambda, Step Functions, API destination, or API Gateway endpoint.

For customer invocation errors, EventBridge Pipes does the following:
+ For pipes with a stream source, EventBridge Pipes retries up to the maximum retry times configured on the pipe retry policy or until the maximum record age expires, whichever comes first.
+ For pipes with an Amazon SQS source, EventBridge Pipes retries a customer error up to the maximum receive count on the source queue.
+ For pipes with a Apache Kafka or Amazon MQ source, EventBridge retries customer errors the same as it retries internal errors.

For pipes with compute targets, you must invoke the pipe synchronously in order for EventBridge Pipes to be aware of any runtime errors that are thrown from the customer compute logic and retry on such errors. Pipes cannot retry on errors thrown from the logic of a Step Functions standard workflow, as this target must be invoked asynchronously.

For Amazon SQS and stream sources, such as Kinesis and DynamoDB, EventBridge Pipes supports partial batch failure handling of target failures. For more information, see [Partial batch failure](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-batching-concurrency.html#pipes-partial-batch-failure).

## Amazon SQS source retry timing with maxBatchWindowInSeconds
<a name="eb-pipes-sqs-retry-timing"></a>

When using an Amazon SQS queue as a pipe source, the retry timing behavior differs depending on whether you configure the `maxBatchWindowInSeconds` parameter:
+ **Without maxBatchWindowInSeconds** – The SQS poller uses the queue's visibility timeout setting for retries. When processing fails, messages remain hidden for the visibility timeout duration before becoming available for retry. To reduce retry delays, configure the queue's visibility timeout to an appropriate value for your use case.
+ **With maxBatchWindowInSeconds** – The SQS poller dynamically sets the visibility timeout for polled messages using the formula: `functionTimeout + maxBatchWindowInSeconds + 30 seconds`. For EventBridge Pipes, the function timeout is 7 minutes, resulting in a visibility timeout of approximately 7.5 minutes plus your configured `maxBatchWindowInSeconds` value. When processing fails, messages remain hidden for this extended duration before becoming available for retry.

This behavior is particularly relevant when using [partial batch responses](eb-pipes-batching-concurrency.md#pipes-partial-batch-failure). If you require faster retry timing, avoid setting `maxBatchWindowInSeconds` and instead rely on the queue's configured visibility timeout.

## Pipe DLQ behavior
<a name="eb-pipes-dlq-behavior"></a>

A pipe inherits dead-letter queue (DLQ) behavior from the source: 
+ If the source Amazon SQS queue has a configured DLQ, messages are automatically delivered there after the specified number of attempts. 
+ For stream sources, such as DynamoDB and Kinesis streams, you can configure a DLQ for the pipe and route events. DynamoDB and Kinesis stream sources support Amazon SQS queues and Amazon SNS topics as DLQ targets.

If you specify a `DeadLetterConfig` for a pipe with a Kinesis or DynamoDB source, make sure that the `MaximumRecordAgeInSeconds` property on the pipe is less than the `MaximumRecordAge` of the source event. `MaximumRecordAgeInSeconds` controls when the pipe poller will give up on the event and deliver it to the DLQ and the `MaximumRecordAge` controls how long the message will be visible in the source stream before it gets deleted. Therefore, set `MaximumRecordAgeInSeconds` to a value that is less than the source `MaximumRecordAge` so that there's adequate time between when the event gets sent to the DLQ, and when it gets automatically deleted by the source for you to determine why the event went to the DLQ.

The `MaximumRecordAgeInSeconds` parameter applies independently of retry behavior. When polling a stream source, if a record's age exceeds the `MaximumRecordAgeInSeconds` value, EventBridge Pipes will not process that record, regardless of whether a retry situation exists. These records are sent directly to the DLQ (if configured) without any processing attempt.

For Amazon MQ sources, the DLQ can be configured directly on the message broker.

EventBridge Pipes does not support first-in first-out (FIFO) DLQs for stream sources.

EventBridge Pipes does not support DLQ for Amazon MSK stream and Self managed Apache Kafka stream sources.

## Pipe failure states
<a name="eb-pipes-failure-states"></a>

Creating, deleting, and updating pipes are asynchronous operations that might result in a failure state. Likewise, a pipe might be automatically disabled due to failures. In all cases, the pipe `StateReason` provides information to help troubleshoot the failure.

The following is a sample of the possible `StateReason` values:
+ Stream not found. To resume processing please delete the pipe and create a new one.
+ Pipes does not have required permissions to perform Queue operations (sqs:ReceiveMessage, sqs:DeleteMessage and sqs:GetQueueAttributes)
+ Connection error. Your VPC must be able to connect to pipes. You can provide access by configuring a NAT Gateway or a VPC Endpoint to pipes-data. For how to setup NAT gateway or VPC Endpoint to pipes-data, please check AWS documentation.
+ MSK cluster does not have security groups associated with it

A pipe may be automatically stopped with an updated `StateReason`. Possible reasons include:
+ A Step Functions standard workflow configured as an [enrichment](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html#pipes-enrichment).
+ A Step Functions standard workflow configured as as a target to be [invoked synchronously](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html#pipes-invocation).

## Custom encryption failures
<a name="eb-pipes-error-handling-cms"></a>

If you configure a source to use an AWS KMS custom encryption key (CMK), rather than an AWS-managed AWS KMS key, you must explicitly give your pipe's Execution Role decryption permission. To do so, include the following additional permission in the custom CMK policy:

```
  {
      "Sid": "Allow Pipes access",
      "Effect": "Allow",
      "Principal": {
          "AWS": "arn:aws:iam::01234567890:role/service-role/Amazon_EventBridge_Pipe_DDBStreamSourcePipe_12345678"
      },
      "Action": "kms:Decrypt",
      "Resource": "*"
  }
```

Replace the above role with your pipe's Execution Role.

Next, ensure that the same permissions for KMS are added to your Pipe execution role.

This is true for all pipe sources with AWS KMS CMK, including:
+ Amazon DynamoDB Streams
+ Amazon Kinesis Data Streams
+ Amazon MQ
+ Amazon MSK
+ Amazon SQS

# Tutorial: Create an EventBridge pipe that filters source events
<a name="pipes-tutorial-create-dynamodb-sqs"></a>

In this tutorial, you'll create a pipe that connects a DynamoDB stream source to an Amazon SQS queue target. This includes specifying an event pattern for the pipe to use when filtering events to deliver to the queue. You'll then test the pipe to ensure that only the desired events are being delivered.

## Prerequisites: Create the source and target
<a name="pipes-tutorial-create-dynamodb-sqs-prereqs"></a>

Before you create the pipe, you'll need to create the source and target that the pipe is to connect. In this case, an Amazon DynamoDB data stream to act as the pipe source, and an Amazon SQS queue as the pipe target.

To simplify this step, you can use AWS CloudFormation to deploy the source and target resources. To do this, you'll create a CloudFormation template defining the following resources:
+ The pipe source 

  An Amazon DynamoDB table, named `pipe-tutorial-source`, with a stream enabled to provide an ordered flow of information about changes to items in the DynamoDB table.
+ The pipe target 

  An Amazon SQS queue, named `pipe-tutorial-target`, to receive the DynamoDB stream of events from your pipe.

**To create the CloudFormation template for provisioning pipe resources**

1. Copy the JSON template text in the [CloudFormation template for generating prerequisites](#pipes-tutorial-create-cfn-template) section, below.

1. Save the template as a JSON file (for example, `~/pipe-tutorial-resources.json`).

Next, use the template file you just created to provision a CloudFormation stack.

**Note**  
Once you create your CloudFormation stack, you will be charged for the AWS resources it provisions.

**Provision the tutorial prerequisites using the AWS CLI**
+ Run the following CLI command, where `--template-body` specifies the location of your template file:

  ```
  aws cloudformation create-stack --stack-name pipe-tuturial-resources --template-body file://~/pipe-tutorial-resources.json
  ```

**Provision tutorial prerequisites using the CloudFormation console**

1. Open the CloudFormation console at [https://console.aws.amazon.com/cloudformation](https://console.aws.amazon.com/cloudformation/).

1. Select **Stacks**, then select **Create stack**, and choose **with new resources (standard)**.

   CloudFormation displays the **Create stack** wizard.

1. For **Prerequisite - Prepare template**, leave the default, **Template is ready**, selected.

1. Under **Specify template**, select **Upload a template file**, and then choose the file and select **Next**.

1. Configure the stack and the resources it will provision:
   + For **Stack name**, enter `pipe-tuturial-resources`.
   + For **Parameters**, leave the default names for the DynamoDB table and Amazon SQS queue.
   + Choose **Next**.

1. Choose **Next**, then choose **Submit**.

   CloudFormation creates the stack and provisions the resources defined in the template.

For more information about CloudFormation, see [What is CloudFormation?](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/Welcome.html) in the *CloudFormation User Guide*.

## Step 1: Create the pipe
<a name="pipes-tutorial-create-dynamodb-sqs-create-pipe"></a>

With the pipe source and target provisioned, you can now create the pipe to connect the two services.

**Create the pipe using the EventBridge console**

1. Open the Amazon EventBridge console at [https://console.aws.amazon.com/events/](https://console.aws.amazon.com/events/).

1. On the navigation pane, choose **Pipes**.

1. Choose **Create pipe**.

1. For **Name**, name your pipe `pipe-tutorial`.

1. Specify the DynamoDB data stream source:

   1. Under **Details**, for **Source**, select **DynamoDB data stream** .

      EventBridge displays DynamoDB-specific source configuration settings.

   1. For **DynamoDB stream**, select `pipe-tutorial-source`.

      Leave **Starting position** set to the default, `Latest`.

   1. Choose **Next**.

1. Specify and test an event pattern to filter events:

   Filtering enables you to control which events the pipes sends to enrichment or the target. The pipe only sends events that match the event pattern on to enrichment or the target.

   For more information, see [Event filtering in Amazon EventBridge Pipes](eb-pipes-event-filtering.md).
**Note**  
You are only billed for those events sent to enrichment or the target.

   1. Under **Sample event - *optional***, leave **AWS events** selected, and make sure that **DynamoDB Stream Sample event 1** is selected.

      This is the sample event which you'll use to test our event pattern.

   1. Under **Event pattern**, enter the following event pattern:

      ```
      {
        "eventName": ["INSERT", "MODIFY"]
      }
      ```

   1. Choose **Test pattern**.

      EventBridge displays a message that the sample event matches the event pattern. This is because the sample event has an `eventName` value of `INSERT`.

   1. Choose **Next**.

1. Choose **Next** to skip specifying an enrichment. 

   In this example, you won’t select an enrichment. Enrichments enable you to select a service to enhance the data from the source before sending it to the target. For more details, see [Event enrichment in Amazon EventBridge Pipes](pipes-enrichment.md).

1. Specify your Amazon SQS queue as the pipe target:

   1. Under **Details**, for **Target service**, select **Amazon SQS queue**.

   1. For **Queue**, select `pipe-tutorial-target`.

   1. Leave the **Target Input transformer** section empty.

      For more information, see [Amazon EventBridge Pipes input transformation](eb-pipes-input-transformation.md).

1. Choose **Create Pipe**

   EventBridge creates the pipe and displays the pipe detail page. The pipe is ready once its status updates to `Running`.

## Step 2: Confirm the pipe filters events
<a name="pipes-tutorial-create-dynamodb-sqs-test"></a>

Pipe is set up, but has yet to receive events from table.

To test the pipe, you'll update entries in the DynamoDB table. Each update will generate events that the DynamoDB stream sends to our pipe. Some will match the event pattern you specified, some will not. You can then examine the Amazon SQS queue to ensure that the pipe only delivered those event that matched our event pattern.

**Update table items to generate events**

1. Open the DynamoDB console at [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/).

1. From the left navigation, select **Tables**. Select the `pipe-tutorial-source` table.

   DynamoDB displays the table details page for `pipe-tutorial-source`.

1. Select **Explore table items**, and then choose **Create item**.

   DynamoDB displays the **Create item** page.

1. Under **Attributes**, create a new table item:

   1. For **Album** enter `Album A`.

   1. For **Artist** enter `Artist A`.

   1. Choose **Create item**.

1. Update the table item:

   1. Under **Items returned**, choose **Album A**.

   1. Select **Add new attribute**, then select **String**.

   1. Enter a new value of `Song`, with a value of `Song A`.

   1. Choose **Save changes**.

1. Delete the table item:

   1. Under **Items returned**, check **Album A**.

   1. From the **Actions** menu, select **Delete items**.

You have made three updates to the table item; this generates three events for the DynamoDB data stream:
+ An `INSERT` event when you created the item.
+ A `MODIFY` event when you added an attribute to the item.
+ A `REMOVE` event when you deleted the item.

However, the event pattern you specified for the pipe should filter out any events that are not `INSERT` or `MODIFY` events. Next, confirm that the pipe delivered the expected events to the queue.

**Confirm the expected events were delivered to the queue**

1. Open the Amazon SQS console at [https://console.aws.amazon.com/sqs/](https://console.aws.amazon.com/sqs/).

1. Choose the `pipe-tutorial-target` queue.

   Amazon SQS displays the queue details page.

1. Select **Send and receive messages**, then under **Receive messages** choose **Poll for messages**.

   The queue polls the pipe and then lists the events it receives.

1. Choose the event name to see the event JSON that was delivered.

 There should be two events in the queue: one with an `eventName` of `INSERT`, and one with an `eventName` of `MODIFY`. However, the pipe did not deliver the event for deleting the table item, since that event had an `eventName` of `REMOVE`, which did not match the event pattern you specified in the pipe.

## Step 3: Clean up your resources
<a name="pipes-tutorial-create-dynamodb-sqs-cleanup"></a>

First, delete the pipe itself.

**Delete the pipe using the EventBridge console**

1. Open the Amazon EventBridge console at [https://console.aws.amazon.com/events/](https://console.aws.amazon.com/events/).

1. On the navigation pane, choose **Pipes**.

1. Select the `pipe-tutorial` pipe, and choose **Delete**.

Then, delete the CloudFormation stack, to prevent being billed for the continued usage of the resources provisioned within it.

**Delete the tutorial prerequisites using the AWS CLI**
+ Run the following CLI command, where `--stack-name` specifies the name of your stack:

  ```
  aws cloudformation delete-stack --stack-name pipe-tuturial-resources
  ```

**Delete the tutorial prerequisites using the CloudFormation console**

1. Open the CloudFormation console at [https://console.aws.amazon.com/cloudformation](https://console.aws.amazon.com/cloudformation/).

1. On the **Stacks** page, select the stack and then select **Delete**.

1. Select **Delete** to confirm your action.

## CloudFormation template for generating prerequisites
<a name="pipes-tutorial-create-cfn-template"></a>

Use the JSON below to create a CloudFormation template for provisioning the source and target resources necessary for this tutorial.

```
{
  "AWSTemplateFormatVersion": "2010-09-09",

  "Description" : "Provisions resources to use with the EventBridge Pipes tutorial. You will be billed for the AWS resources used if you create a stack from this template.",

  "Parameters" : {
    "SourceTableName" : {
      "Type" : "String",
      "Default" : "pipe-tutorial-source",
      "Description" : "Specify the name of the table to provision as the pipe source, or accept the default."
    },
    "TargetQueueName" : {
      "Type" : "String",
      "Default" : "pipe-tutorial-target",
      "Description" : "Specify the name of the queue to provision as the pipe target, or accept the default."
    }
  },
  "Resources": {
    "PipeTutorialSourceDynamoDBTable": {
      "Type": "AWS::DynamoDB::Table",
      "Properties": {
        "AttributeDefinitions": [{
            "AttributeName": "Album",
            "AttributeType": "S"
          },
          {
            "AttributeName": "Artist",
            "AttributeType": "S"
          }

        ],
        "KeySchema": [{
            "AttributeName": "Album",
            "KeyType": "HASH"

          },
          {
            "AttributeName": "Artist",
            "KeyType": "RANGE"
          }
        ],
        "ProvisionedThroughput": {
          "ReadCapacityUnits": 10,
          "WriteCapacityUnits": 10
        },
        "StreamSpecification": {
          "StreamViewType": "NEW_AND_OLD_IMAGES"
        },
        "TableName": { "Ref" : "SourceTableName" }
      }
    },
    "PipeTutorialTargetQueue": {
      "Type": "AWS::SQS::Queue",
      "Properties": {
        "QueueName": { "Ref" : "TargetQueueName" }
      }
    }
  }
}
```

# Generating an CloudFormation template from EventBridge Pipes
<a name="pipes-generate-template"></a>

AWS CloudFormation enables you to configure and manage your AWS resources across accounts and regions in a centralized and repeatable manner by treating infrastructure as code. CloudFormation does this by letting you create *templates*, which define the resources you want to provision and manage.

EventBridge enables you to generate templates from the existing pipes in your account, as an aid to help you jumpstart developing CloudFormation templates. You can select a single pipe, or multiple pipes to include in the template. You can then use these templates as the basis for [creating stacks](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/cfn-console-create-stack.html) of resources under CloudFormation management.

For more information on CloudFormation, see [*The CloudFormation User Guide*.](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/Welcome.html)

For event buses, you can generate CloudFormation templates from [event buses](eb-generate-event-bus-template.md) and [event bus rules](rule-generate-template.md).

## Resources included in EventBridge Pipe templates
<a name="pipes-generate-template-resources"></a>

When EventBridge generates the CloudFormation template, it creates an [AWS::Pipes::Pipe](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-pipes-pipe.html) resource for each selected pipe. In addition, EventBridge includes the following resources under the described conditions:
+ [AWS::Events::ApiDestination](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-events-apidestination.html)

  If your pipes include API destinations, either as enrichments or targets, EventBridge includes them in the CloudFormation template as AWS::Events::ApiDestination resources.
+ [AWS::Events::EventBus](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-events-eventbus.html)

  If your pipes includes an event bus as a target, EventBridge includes it in the CloudFormation template as an AWS::Events::EventBus resource.
+ [AWS::IAM::Role](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-iam-role.html)

  If you had EventBridge create a new execution role when you [configured the pipe](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-create.html#pipes-configure-pipe-settings), you can choose to have EventBridge include that role in the template as an AWS::IAM::Role resource. EventBridge does not include roles you create. (In either case, the `RoleArn` property of the AWS::Pipes::Pipe resource contains the ARN of the role.)

## Considerations when using CloudFormation templates generated from EventBridge Pipes
<a name="pipes-generate-template-considerations"></a>

Consider the following factors when using a CloudFormation template you generated from EventBridge:
+ EventBridge does not include any passwords in the generate template.

  You can edit the template to include [template parameters](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/parameters-section-structure.html) that enable users to specify passwords or other sensitive information when using the template to create or update a CloudFormation stack.

  In addition, users can use Secrets Manager to create a secret in the desired region and then edit the generated template to employ [dynamic parameters](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/dynamic-references.html#dynamic-references-secretsmanager).
+ Targets in the generated template remain exactly as they were specified in the original pipe. This can lead to cross-region issues if you do not appropriately edit the template before using it to create stacks in other regions.

  Additionally, the generated template does not create the downstream targets automatically.

## Generating a CloudFormation template from EventBridge Pipes
<a name="pipes-generate-template-console"></a>

To generate a CloudFormation template from one or more pipes using the EventBridge console, do the following: 

**To generate an CloudFormation template from one or more pipes**

1. Open the Amazon EventBridge console at [https://console.aws.amazon.com/events/](https://console.aws.amazon.com/events/).

1. In the navigation pane, choose **Pipes**.

1. Under **Pipes**, choose one or more pipes you want to include in the generated CloudFormation template.

   For a single pipe, you can also choose the pipe name to display the pipe's details page.

1. Choose **CloudFormation Template**, and then choose which format you want EventBridge to generate the template in: **JSON** or **YAML**.

   EventBridge displays the template, generated in the selected format.

1. If you had EventBridge create a new execution role for any of the selected pipes, and you want EventBridge to include those roles in the template, choose **Include IAM roles created by console on your behalf**.

1. EventBridge gives you the option of downloading the template file, or copying the template to the clipboard.
   + To download the template file, choose **Download**.
   + To copy the template to the clipboard, choose **Copy**.

1. To exit the template, choose **Cancel**. 