

# Integrating Amazon OpenSearch Ingestion pipelines with other services and applications
<a name="configure-client"></a>

To successfully ingest data into an Amazon OpenSearch Ingestion pipeline, you must configure your client application (the *source*) to send data to the pipeline endpoint. Your source might be clients like Fluent Bit logs, the OpenTelemetry Collector, or a simple S3 bucket. The exact configuration differs for each client.

The important differences during source configuration (compared to sending data directly to an OpenSearch Service domain or OpenSearch Serverless collection) are the AWS service name (`osis`) and the host endpoint, which must be the pipeline endpoint.

## Constructing the ingestion endpoint
<a name="configure-client-endpoint"></a>

To ingest data into a pipeline, send it to the ingestion endpoint. To locate the ingestion URL, navigate to the **Pipeline settings** page and copy the **Ingestion URL**.

![\[Pipeline settings page showing details like status, capacity, and ingestion URL for data input.\]](http://docs.aws.amazon.com/opensearch-service/latest/developerguide/images/pipeline-endpoint.png)


To construct the full ingestion endpoint for pull-based sources like [OTel trace](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/otel-trace/) and [OTel metrics](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/otel-metrics-source/), add the ingestion path from your pipeline configuration to the ingestion URL.

For example, say that your pipeline configuration has the following ingestion path:

![\[Input field for HTTP source path with example "/my/test_path" entered.\]](http://docs.aws.amazon.com/opensearch-service/latest/developerguide/images/ingestion-path.png)


The full ingestion endpoint, which you specify in your client configuration, will take the following format: `https://ingestion-pipeline-abcdefg.us-east-1.osis.amazonaws.com/my/test_path`.

## Creating an ingestion role
<a name="configure-client-auth"></a>

All requests to OpenSearch Ingestion must be signed with [Signature Version 4](https://docs.aws.amazon.com/general/latest/gr/signature-version-4.html). At minimum, the role that signs the request must be granted permission for the `osis:Ingest` action, which allows it to send data to an OpenSearch Ingestion pipeline.

For example, the following AWS Identity and Access Management (IAM) policy allows the corresponding role to send data to a single pipeline:

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "osis:Ingest",
      "Resource": "arn:aws:osis:us-east-1:111122223333:pipeline/pipeline-name"
    }
  ]
}
```

------

**Note**  
To use the role for *all* pipelines, replace the ARN in the `Resource` element with a wildcard (\$1).

### Providing cross-account ingestion access
<a name="configure-client-cross-account"></a>

**Note**  
You can only provide cross-account ingestion access for public pipelines, not VPC pipelines.

You might need to ingest data into a pipeline from a different AWS account, such as an account that houses your source application. If the principal that is writing to a pipeline is in a different account than the pipeline itself, you need to configure the principal to trust another IAM role to ingest data into the pipeline.

**To configure cross-account ingestion permissions**

1. Create the ingestion role with `osis:Ingest` permission (described in the previous section) within the same AWS account as the pipeline. For instructions, see [Creating IAM roles](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create.html).

1. Attach a [trust policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/roles-managingrole-editing-console.html#roles-managingrole_edit-trust-policy) to the ingestion role that allows a principal in another account to assume it:

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [{
        "Effect": "Allow",
        "Principal": {
          "AWS": "arn:aws:iam::111122223333:root"
         },
        "Action": "sts:AssumeRole"
     }]
   }
   ```

------

1. In the other account, configure your client application (for example, Fluent Bit) to assume the ingestion role. In order for this to work, the application account must grant permissions to the application user or role to assume the ingestion role.

   The following example identity-based policy allows the attached principal to assume `ingestion-role` from the pipeline account:

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Action": "sts:AssumeRole",
         "Resource": "arn:aws:iam::111122223333:role/ingestion-role"
       }
     ]
   }
   ```

------

The client application can then use the [AssumeRole](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html) operation to assume `ingestion-role` and ingest data into the associated pipeline.

# Using an OpenSearch Ingestion pipeline with Atlassian Services
<a name="configure-client-atlassian"></a>

You can use the Atlassian Jira and Confluence source plugins to ingest data from Atlassian services into your OpenSearch Ingestion pipeline. These integrations enable you to create a unified searchable knowledge base by synchronizing complete Jira projects and Confluence spaces, while maintaining real-time relevance through continuous monitoring and automatic synchronization of updates.

------
#### [ Integrating with Jira ]

Transform your Jira experience with powerful contextual search capabilities by integrating your Jira content into OpenSearch. The Data Prepper [Atlassian Jira](https://www.atlassian.com/software/jira) source plugin enables you to create a unified searchable knowledge base by synchronizing complete Jira projects, while maintaining real-time relevance through continuous monitoring and automatic synchronization of updates. This integration allows for data synchronization with flexible filtering options for specific projects, issue types, and status, ensuring that only the information you need is imported. 

To ensure secure and reliable connectivity, the plugin supports multiple authentication methods, including basic API key authentication and OAuth2 authentication, with the added security of managing credentials using a secret stored in AWS Secrets Manager. It also features automatic token renewal for uninterrupted access, ensuring continuous operation. Built on Atlassian's [API version 2](https://developer.atlassian.com/cloud/jira/platform/rest/v2/intro/#version%22%3Eapi-version-2), this integration empowers teams to unlock valuable insights from their Jira data through OpenSearch's advanced search capabilities.

------
#### [ Integrating with Confluence ]

Enhance your team's knowledge management and collaboration capabilities by integrating [Atlassian Confluence](https://www.atlassian.com/software/confluence) content into OpenSearch through Data Prepper's Confluence source plugin. This integration enables you to create a centralized, searchable repository of collective knowledge, improving information discovery and team productivity. By synchronizing Confluence content and continuously monitoring for updates, the plugin ensures that your OpenSearch index remains up-to-date and comprehensive. 

The integration offers flexible filtering options, allowing you to selectively import content from specific spaces or page types, tailoring the synchronized content to your organization's needs. The plugin supports both basic API key and OAuth2 authentication methods, with the option of securely managing credentials through AWS Secrets Manager. The plugin's automatic token renewal feature ensures uninterrupted access and seamless operation. Built on Atlassian's Confluence [API](https://developer.atlassian.com/cloud/confluence/rest/v1/intro/#auth), this integration enables teams to leverage OpenSearch's advanced search capabilities across their Confluence content, enhancing information accessibility and utilization within the organization.

------

**Topics**
+ [Prerequisites](#atlassian-prerequisites)
+ [Configure a pipeline role](#atlassian-pipeline-role)
+ [Jira connector pipeline configuration](#jira-connector-pipeline)
+ [Confluence connector pipeline configuration](#confluence-connector-pipeline)
+ [Data consistency](#data-consistency)
+ [Limitations](#limitations)
+ [Metrics in CloudWatch for Atlassian connectors](#metrics)
+ [Connecting an Amazon OpenSearch Ingestion pipeline to Atlassian Jira or Confluence using OAuth 2.0](configure-client-atlassian-OAuth2-setup.md)

## Prerequisites
<a name="atlassian-prerequisites"></a>

Before you create your OpenSearch Ingestion pipeline, complete the following steps:

1. Prepare credentials for your Jira site by choosing one of the following options. OpenSearch Ingestion requires only `ReadOnly` authorization to the content.

   1. **Option 1: API key** – Log in to your Atlassian account and use the information in the following topic to generate your API key:
      + [Manage API tokens for your Atlassian account](https://support.atlassian.com/atlassian-account/docs/manage-api-tokens-for-your-atlassian-account/)

   1. **Option 2: OAuth2** – Log in to your Atlassian account and use the information in [Connecting an Amazon OpenSearch Ingestion pipeline to Atlassian Jira or Confluence using OAuth 2.0](configure-client-atlassian-OAuth2-setup.md).

1. [Create a secret in AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/create_secret.html) to store the credentials created in the previous step. Make the following choices as you follow the procedure: 
   + For **Secret type**, choose **Other type of secret**.
   + For **Key/value pairs**, create the following pairs, depending on your selected authorization type: 

------
#### [ API key ]

   ```
   {
      "username": user-name-usualy-email-id,
      "password": api-key
   }
   ```

------
#### [ OAuth 2.0 ]

   ```
   {
      "clientId": client-id
      "clientSecret": client-secret
      "accessKey": access-key
      "refreshKey": refresh-key
   }
   ```

------

   After you've created the secret, copy the Amazon Resource Name (ARN) of the secret. You will include it in the pipeline role permissions policy.

## Configure a pipeline role
<a name="atlassian-pipeline-role"></a>

The role passed in the pipeline must have the following policy attached to read and write to the secret created in the prerequisites section.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "SecretReadWrite",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetResourcePolicy",
                "secretsmanager:GetSecretValue",
                "secretsmanager:DescribeSecret",
                "secretsmanager:PutSecretValue",
                "secretsmanager:ListSecretVersionIds"
            ],
            "Resource": "arn:aws:secretsmanager:us-east-1:111122223333:secret:secret-name-random-6-characters"
        }
    ]
}
```

------

The role should also have a policy attached to access and write to your chosen sink. For example, if you choose OpenSearch as your sink, the policy looks similar to the following:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "OpenSearchWritePolicy",
            "Effect": "Allow",
            "Action": "aoss:*",
            "Resource": "arn:aws:aoss:us-east-1:111122223333:collection/collection-id"
        }
    ]
}
```

------

## Jira connector pipeline configuration
<a name="jira-connector-pipeline"></a>

You can use a preconfigured Atlassian Jira blueprint to create this pipeline. For more information, see [Working with blueprints](pipeline-blueprint.md).

Replace the *placeholder values* with your own information.

```
version: "2"
extension:
  aws:
    secrets:
      jira-account-credentials:
        secret_id: "secret-arn"
        region: "secret-region"
        sts_role_arn: "arn:aws:iam::123456789012:role/Example-Role"
atlassian-jira-pipeline:
  source:
    jira:
      # We only support one host url for now
      hosts: ["jira-host-url"]
      acknowledgments: true
      authentication:
        # Provide one of the authentication method to use. Supported methods are 'basic' and 'oauth2'.
        # For basic authentication, password is the API key that you generate using your jira account
        basic:
          username: ${{aws_secrets:jira-account-credentials:username}}
          password: ${{aws_secrets:jira-account-credentials:password}}
        # For OAuth2 based authentication, we require the following 4 key values stored in the secret
        # Follow atlassian instructions at the below link to generate these keys.
        # https://developer.atlassian.com/cloud/confluence/oauth-2-3lo-apps/
        # If you are using OAuth2 authentication, we also require, write permission to your AWS secret to
        # be able to write the renewed tokens back into the secret.
        # oauth2:
          # client_id: ${{aws_secrets:jira-account-credentials:clientId}}
          # client_secret: ${{aws_secrets:jira-account-credentials:clientSecret}}
          # access_token: ${{aws_secrets:jira-account-credentials:accessToken}}
          # refresh_token: ${{aws_secrets:jira-account-credentials:refreshToken}}
      filter:
        project:
          key:
            include:
              # This is not project name.
              # It is an alphanumeric project key that you can find under project details in Jira.
              - "project-key"
              - "project-key"
            # exclude:
              # - "project-key"
              # - "project-key"
        issue_type:
          include:
            - "issue-type"
            # - "Story"
            # - "Bug"
            # - "Task"
         # exclude:
             # - "Epic"
        status:
          include:
            - "ticket-status"
            # - "To Do"
            # - "In Progress"
            # - "Done"
         # exclude:
           # - "Backlog"

  sink:
    - opensearch:
        # Provide an Amazon OpenSearch Service domain endpoint
        hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ]
        index: "index_${getMetadata(\"project\")}"
        # Ensure adding unique document id which is the unique ticket id in this case
        document_id: '${/id}'
        aws:
          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
          sts_role_arn: "arn:aws:iam::123456789012:role/Example-Role"
          # Provide the region of the domain.
          region: "us-east-1"
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection
          serverless: false
          # serverless_options:
            # Specify a name here to create or update network policy for the serverless collection
            # network_policy_name: "network-policy-name"
        # Enable the 'distribution_version' setting if the Amazon OpenSearch Service domain is of version Elasticsearch 6.x
        # distribution_version: "es6"
        # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. 
        # See Compressing HTTP requests in Amazon OpenSearch Service
        # enable_request_compression: true/false
        # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ.
        dlq:
          s3:
            # Provide an S3 bucket
            bucket: "your-dlq-bucket-name"
            # Provide a key path prefix for the failed requests
            # key_path_prefix: "kinesis-pipeline/logs/dlq"
            # Provide the region of the bucket.
            region: "us-east-1"
            # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
            sts_role_arn: "arn:aws:iam::123456789012:role/Example-Role"
```

Key to attributes in the Jira source:

1. **hosts**: Your Jira cloud or on-premises URL. Generally, it looks like `https://your-domain-name.atlassian.net/`.

1. **acknowledgments**: To guarantee the delivery of data all the way to the sink.

1. **authentication**: Describes how you want the pipeline to access your Jira instance. Choose `Basic` or `OAuth2` and specify the corresponding key attributes referencing the keys in your AWS secret..

1. **filter**: This section helps you select which portion of your Jira data to extract and synchronize.

   1. **project**: List the project keys that you want to sync in the `include` section. Otherwise, list the projects that you want to exclude under the `exclude` section. Provide only one of the include or exclude options at any given time.

   1. **issue\$1type**: Specific issue types that you want to sync. Follow the similar `include` or `exclude` pattern that suits your needs. Note that attachments will appear as anchor links to the original attachment, but the attachment content won't be extracted.

   1. **status**: Specific status filter you want to apply for the data extraction query. If you specify `include`, only tickets with those statuses will be synced. If you specify `exclude`, then all tickets except those with the listed excluded statuses will be synced.

## Confluence connector pipeline configuration
<a name="confluence-connector-pipeline"></a>

You can use a preconfigured Atlassian Confluence blueprint to create this pipeline. For more information, see [Working with blueprints](pipeline-blueprint.md).

```
version: "2"
extension:
  aws:
    secrets:
      confluence-account-credentials:
        secret_id: "secret-arn"
        region: "secret-region"
        sts_role_arn: "arn:aws:iam::123456789012:role/Example-Role"
atlassian-confluence-pipeline:
  source:
    confluence:
      # We currently support only one host URL.
      hosts: ["confluence-host-url"]
      acknowledgments: true
      authentication:
        # Provide one of the authentication method to use. Supported methods are 'basic' and 'oauth2'.
        # For basic authentication, password is the API key that you generate using your Confluence account
        basic:
          username: ${{aws_secrets:confluence-account-credentials:confluenceId}}
          password: ${{aws_secrets:confluence-account-credentials:confluenceCredential}}
        # For OAuth2 based authentication, we require the following 4 key values stored in the secret
        # Follow atlassian instructions at the following link to generate these keys:
        # https://developer.atlassian.com/cloud/confluence/oauth-2-3lo-apps/
        # If you are using OAuth2 authentication, we also require write permission to your AWS secret to
        # be able to write the renewed tokens back into the secret.
        # oauth2:
          # client_id: ${{aws_secrets:confluence-account-credentials:clientId}}
          # client_secret: ${{aws_secrets:confluence-account-credentials:clientSecret}}
          # access_token: ${{aws_secrets:confluence-account-credentials:accessToken}}
          # refresh_token: ${{aws_secrets:confluence-account-credentials:refreshToken}}
      filter:
        space:
          key:
            include:
              # This is not space name.
              # It is a space key that you can find under space details in Confluence.
              - "space key"
              - "space key"
           # exclude:
             #  - "space key"
             #  - "space key"
        page_type:
          include:
            - "content type"
            # - "page"
            # - "blogpost"
            # - "comment"
         # exclude:
            # - "attachment"

  sink:
    - opensearch:
        # Provide an Amazon OpenSearch Service domain endpoint
        hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ]
         index: "index_${getMetadata(\"space\")}"
        # Ensure adding unique document id which is the unique ticket ID in this case.
        document_id: '${/id}'
        aws:
          # Provide the Amazon Resource Name (ARN) for a role with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com.
          sts_role_arn: "arn:aws:iam::123456789012:role/Example-Role"
          # Provide the Region of the domain.
          region: "us-east-1"
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection
          serverless: false
          # serverless_options:
            # Specify a name here to create or update network policy for the serverless collection.
            # network_policy_name: "network-policy-name"
        # Enable the 'distribution_version' setting if the Amazon OpenSearch Service domain is of version Elasticsearch 6.x
        # distribution_version: "es6"
        # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. 
        # For more information, see Compressing HTTP requests in Amazon OpenSearch Service.
        # enable_request_compression: true/false
        # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ.
        dlq:
          s3:
            # Provide an S3 bucket
            bucket: "your-dlq-bucket-name"
            # Provide a key path prefix for the failed requests
            # key_path_prefix: "kinesis-pipeline/logs/dlq"
            # Provide the Rregion of the bucket.
            region: "us-east-1"
            # Provide the Amazon Resource Name (ARN) for a role with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
            sts_role_arn: "arn:aws:iam::123456789012:role/Example-Role"
```

Key attributes in the Confluence source:

1. **hosts**: Your Confluence cloud or on-premises URL. Generally, it looks like `https://your-domain-name.atlassian.net/`

1. **acknowledgments**: To guarantee the delivery of data all the way to the sink.

1. **authentication**: Describes how you want the pipeline to access your Confluence instance. Choose `Basic` or `OAuth2` and specify the corresponding key attributes referencing the keys in your AWS secret.

1. **filter**: This section helps you select which portion of your Confluence data to extract and synchronize.

   1. **space**: List the space keys that you want to sync in the `include` section. Otherwise, list the spaces that you want to exclude under the `exclude` section. Provide only one of the include or exclude options at any given time.

   1. **page\$1type**: Specific page types (like page, blogpost, or attachments) that you want to sync. Follow the similar `include` or `exclude` pattern that suits your needs. Note that attachments will appear as anchor links to the original attachment, but the attachment content won't be extracted.

## Data consistency
<a name="data-consistency"></a>

Based on the filters specified in the pipeline YAML, selected projects (or spaces) will be extracted once and fully synced to the target sink. Then continuous change monitoring will capture changes as they occur and update the data in the sink. One exception is that the change monitoring syncs only `create` and `update` actions, not `delete` actions.

## Limitations
<a name="limitations"></a>
+ User delete actions won't be synced. Data once recorded in the sink will remain in the sink. Updates will overwrite the existing content with new changes if the ID mapping is specified in the sink settings.
+ On-premises instances using older versions of Atlassian software that don't support the following APIs are not compatible with this source:
  + Jira Search API version 3
    + `rest/api/3/search`
    + `rest/api/3/issue`
  + Confluence
    + `wiki/rest/api/content/search`
    + `wiki/rest/api/content`
    + `wiki/rest/api/settings/systemInfo`

## Metrics in CloudWatch for Atlassian connectors
<a name="metrics"></a>

**Type: Jira connector metrics**


| Source | Metric | Metric Type | 
| --- | --- | --- | 
| acknowledgementSetSuccesses.count | Counter | If acknowledgments are enabled, this metric provides the number of tickets successfully synced. | 
| acknowledgementSetFailures.count | Counter | If acknowledgments are enabled, this metric provides the number of tickets that failed to sync. | 
| crawlingTime.avg | Timer | The time it took to crawl through all the new changes. | 
| ticketFetchLatency.avg | Timer | The ticket fetch API latency average. | 
| ticketFetchLatency.max | Timer | The ticket fetch API latency maximum. | 
| ticketsRequested.count | Counter | Number of ticket fetch requests made. | 
| ticketRequestedFailed.count | Counter | Number of ticket fetch requests failed. | 
| ticketRequestedSuccess.count | Counter | Number of ticket fetch requests succeeded. | 
| searchCallLatency.avg | Timer | Search API call latency average. | 
| searchCallLatency.max | Timer | Search API call latency maximum. | 
| searchResultsFound.count | Counter | Number of items found in a given search call. | 
| searchRequestFailed.count | Counter | Search API call failures count. | 
| authFailures.count | Counter | Authentication failure count. | 

**Type: Confluence connector metrics**


| Source | Metric | Metric Type | 
| --- | --- | --- | 
| acknowledgementSetSuccesses.count | Counter | If acknowledgments are enabled, this metric provides the number of pages successfully synced. | 
| acknowledgementSetFailures.count | Counter | If acknowledgments are enabled, this metric provides the number of pages that failed to sync. | 
| crawlingTime.avg | Timer | The time it took to crawl through all the new changes. | 
| pageFetchLatency.avg | Timer | Content fetching API latency (average). | 
| pageFetchLatency.max | Timer | Content fetching API latency (maximum). | 
| pagesRequested.count | Counter | Number of invocations of content fetching API. | 
| pageRequestFailed.count | Counter | Number of failed requests of content fetching API. | 
| pageRequestedSuccess.count | Counter | Number of successful requests of content fetching API. | 
| searchCallLatency.avg | Timer | Search API call latency average. | 
| searchCallLatency.max | Timer | Search API call latency max. | 
| searchResultsFound.count | Counter | Number of items found in a given search call. | 
| searchRequestsFailed.count | Counter | Search API call failures count. | 
| authFailures.count | Counter | Authentication failure count. | 

# Connecting an Amazon OpenSearch Ingestion pipeline to Atlassian Jira or Confluence using OAuth 2.0
<a name="configure-client-atlassian-OAuth2-setup"></a>

Use the information in this topic to help you configure and connect an Amazon OpenSearch Ingestion pipeline to a Jira or Confluence account using OAuth 2.0 authentication. Perform this task when are are completing the [Prerequisites](configure-client-atlassian.md#atlassian-prerequisites) for using an OpenSearch Ingestion pipeline with Atlassian Services but choose not to use API key credentials.

**Topics**
+ [Create an OAuth 2.0 integration app](#create-OAuth2-integration-app)
+ [Generating and refreshing an Atlassian Developer access token](#generate-and-refresh-jira-access-token)

## Create an OAuth 2.0 integration app
<a name="create-OAuth2-integration-app"></a>

Use the following procedure to help you create an OAuth 2.0 integration app on the Atlassian Developer website.

**To create an OAuth 2.0 integration app**

1. Log in to your Atlassian Developer account at [ https://developer.atlassian.com/console/myapps/](https://developer.atlassian.com/console/myapps/).

1. Choose **Create**, **OAuth 2.0 integration**.

1. For **Name**, enter a name to identify the purpose of the app.

1. Select the **I agree to be bound by Atlassian's developer terms** check box, and then choose **Create**.

1. In the left navigation, choose **Authorization**, and then choose **Add**.

1. For **Callback URL**, enter any URL, such as **https://www.amazon.com** or **https://www.example.com**, and then choose **Save changes**.

1. In the left navigation, choose **Permissions** page, and then in the row for Jira API, choose **Add**, and then choose **Configure**. and select all the Classic Scopes Read permissions (list given below) and then select Save

1. Choose the **Granular scopes** tab, and then choose **Edit Scopes** to open the **Edit Jira API** dialog box.

1. Select the permissions for source plugin you are using:

------
#### [ Jira ]

   ```
   read:audit-log:jira
   read:issue:jira
   read:issue-meta:jira
   read:attachment:jira
   read:comment:jira
   read:comment.property:jira
   read:field:jira
   read:field.default-value:jira
   read:field.option:jira
   read:field-configuration-scheme:jira
   read:field-configuration:jira
   read:issue-link:jira
   read:issue-link-type:jira
   read:issue-link-type:jira
   read:issue.remote-link:jira
   read:issue.property:jira
   read:resolution:jira
   read:issue-details:jira
   read:issue-type:jira
   read:issue-worklog:jira
   read:issue-field-values:jira
   read:issue.changelog:jira
   read:issue.transition:jira
   read:issue.vote:jira
   read:jira-expressions:jira
   ```

------
#### [ Confluence ]

   ```
   read:content:confluence
   read:content-details:confluence
   read:space-details:confluence
   read:audit-log:confluence
   read:page:confluence
   read:blogpost:confluence
   read:custom-content:confluence
   read:comment:confluence
   read:space:confluence
   read:space.property:confluence
   read:space.setting:confluence
   read:content.property:confluence
   read:content.metadata:confluence
   read:task:confluence
   read:whiteboard:confluence
   read:app-data:confluence
   manage:confluence-configuration
   ```

------

1. Choose **Save**.

For related information, see [Implementing OAuth 2.0 (3LO)](https://developer.atlassian.com/cloud/oauth/getting-started/implementing-oauth-3lo/) and [Determining the scopes required for an operation](https://developer.atlassian.com/cloud/oauth/getting-started/determining-scopes/) on the Atlassian Developer website.

## Generating and refreshing an Atlassian Developer access token
<a name="generate-and-refresh-jira-access-token"></a>

Use the following procedure to help you generate and refresh an Atlassian Developer access token on the Atlassian Developer website.

**To generate and refresh a Jira access token**

1. Log in to your Atlassian Developer account at [ https://developer.atlassian.com/console/myapps/](https://developer.atlassian.com/console/myapps/).

1. Choose the app you created in [Create an OAuth 2.0 integration app](#create-OAuth2-integration-app).

1. In the left navigation, choose **Authorization.**

1. Copy the granular Atlassian API authorization URL value from the bottom of the page and paste it into the text editor of your choice.

   The format of the URL is as follows:

   ```
   https://auth.atlassian.com/authorize?
   audience=api.atlassian.com 
   &client_id=YOUR_CLIENT_ID
   &scope=REQUESTED_SCOPE%20REQUESTED_SCOPE_TWO
   &redirect_uri=https://YOUR_APP_CALLBACK_URL
   &state=YOUR_USER_BOUND_VALUE 
   &response_type=code
   &prompt=consent
   ```

1. For `state=YOUR_USER_BOUND_VALUE`, change the parameter value to anything you choose, such as state="**sample\$1text**".

   For more information, see [What is the state parameter used for?](https://developer.atlassian.com/cloud/jira/platform/oauth-2-3lo-apps/#what-is-the-state-parameter-used-for-) on the Atlassian Developer website.

1. Note that the `scope` section lists the granular scopes you selected in an earlier task. For example: `scope=read%3Ajira-work%20read%3Ajira-user%20offline_access`

   `offline_access` indicates that you want to generate a `refresh_token`.

1. Open a web browser window and enter the authorization URL you copied into the browser window's address bar.

1. When the target page opens, verify that the information is correct, and then choose **Accept** to be redirected to your Jira or Confluence homepage.

1. After the homepage has loaded, copy the URL of this page. It contains the authorization code for your application. You use this code to generate your access token. The entire section after `code=` is the authorization code.

1. Use the following cURL command to generate the access token. Replace the *placeholder values* with your own information.
**Tip**  
You can also use a third-party service such as Postman.

   ```
   curl --request POST --url 'https://auth.atlassian.com/oauth/token' \
   --header 'Content-Type: application/json' \
   --data '{"grant_type": "authorization_code",
   "client_id": "YOUR_CLIENT_ID",
   "client_secret": "YOUR_CLIENT_SECRET",
   "code": "AUTHORIZATION_CODE",
   "redirect_uri": "YOUR_CALLBACK_URL"}'
   ```

   The response to this command includes the values for `access_code` and `refresh_token`.

# Using an OpenSearch Ingestion pipeline with Amazon Aurora
<a name="configure-client-aurora"></a>

You can use an OpenSearch Ingestion pipeline with Amazon Aurora to export existing data and stream changes (such as create, update, and delete) to Amazon OpenSearch Service domains and collections. The OpenSearch Ingestion pipeline incorporates change data capture (CDC) infrastructure to provide a high-scale, low-latency way to continuously stream data from Amazon Aurora. Aurora MySQL and Aurora PostgreSQL are supported.

There are two ways that you can use Amazon Aurora as a source to process data—with or without a full initial snapshot. A full initial snapshot is a snapshot of specified tables and this snapshot is exported to Amazon S3. From there, an OpenSearch Ingestion pipeline sends it to one index in a domain, or partitions it to multiple indexes in a domain. To keep the data in Amazon Aurora and OpenSearch consistent, the pipeline syncs all of the create, update, and delete events in the tables in Amazon Aurora clusters with the documents saved in the OpenSearch index or indexes.

When you use a full initial snapshot, your OpenSearch Ingestion pipeline first ingests the snapshot and then starts reading data from Amazon Aurora change streams. It eventually catches up and maintains near real-time data consistency between Amazon Aurora and OpenSearch. 

You can also use the OpenSearch Ingestion integration with Amazon Aurora to track change data capture and ingest all updates in Aurora to OpenSearch. Choose this option if you already have a full snapshot from some other mechanism, or if you just want to capture all changes to the data in Amazon Aurora cluster. 

When you choose this option you need to [configure binary logging for Aurora MySQL](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/USER_LogAccess.MySQL.BinaryFormat.html) or [set up logical replication for Aurora PostgreSQL on the cluster](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/AuroraPostgreSQL.Replication.Logical.Configure.html). 

**Topics**
+ [Aurora MySQL](aurora-mysql.md)
+ [Aurora PostgreSQL](aurora-PostgreSQL.md)

# Aurora MySQL
<a name="aurora-mysql"></a>

Complete the following steps to configure an OpenSearch Ingestion pipeline with Amazon Aurora for Aurora MySQL.

**Topics**
+ [Aurora MySQL prerequisites](#aurora-mysql-prereqs)
+ [Step 1: Configure the pipeline role](#aurora-mysql-pipeline-role)
+ [Step 2: Create the pipeline](#aurora-mysql-pipeline)
+ [Data consistency](#aurora-mysql-pipeline-consistency)
+ [Mapping data types](#aurora-mysql-pipeline-mapping)
+ [Limitations](#aurora-mysql-pipeline-limitations)
+ [Recommended CloudWatch Alarms](#aurora-mysql-pipeline-metrics)

## Aurora MySQL prerequisites
<a name="aurora-mysql-prereqs"></a>

Before you create your OpenSearch Ingestion pipeline, perform the following steps:

1. [Create a custom Aurora DB cluster parameter group in Amazon Aurora to configure binary logging](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/zero-etl.setting-up.html#zero-etl.parameters).

   ```
   aurora_enhanced_binlog=1
   binlog_backup=0
   binlog_format=ROW
   binlog_replication_globaldb=0
   binlog_row_image=full
   binlog_row_metadata=full
   ```

   Additionally, make sure the `binlog_transaction_compression` parameter is not set to `ON`, and that the `binlog_row_value_options` parameter is not set to `PARTIAL_JSON`.

1. [Select or create an Aurora MySQL DB cluster](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/CHAP_GettingStartedAurora.CreatingConnecting.Aurora.html) and associate the parameter group created in the previous step with the DB cluster.

1. [Configure binary log retention to 24 hours or longer](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/mysql-stored-proc-configuring.html). 

1. Set up username and password authentication on your Amazon Aurora cluster using [password management with Aurora and AWS Secrets Manager](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/rds-secrets-manager.html). You can also create a username/password combination by [creating a Secrets Manager secret](https://docs.aws.amazon.com/secretsmanager/latest/userguide/create_secret.html).

1. If you use the full initial snapshot feature, create an AWS KMS key and an IAM role for exporting data from Amazon Aurora to Amazon S3.

   The IAM role should have the following permission policy:

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ExportPolicy",
               "Effect": "Allow",
               "Action": [
                   "s3:PutObject*",
                   "s3:ListBucket",
                   "s3:GetObject*",
                   "s3:DeleteObject*",
                   "s3:GetBucketLocation"
               ],
               "Resource": [
                   "arn:aws:s3:::s3-bucket-used-in-pipeline",
                   "arn:aws:s3:::s3-bucket-used-in-pipeline/*"
               ]
           }
       ]
   }
   ```

------

   The role should also have the following trust relationships:

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Principal": {
                   "Service": "export.rds.amazonaws.com"
               },
               "Action": "sts:AssumeRole"
           }
       ]
   }
   ```

------

1. Select or create an OpenSearch Service domain or OpenSearch Serverless collection. For more information, see [Creating OpenSearch Service domains](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/createupdatedomains.html#createdomains) and [Creating collections](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-manage.html#serverless-create).

1. Attach a [resource-based policy](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource) to your domain or a [data access policy](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-data-access.html) to your collection. These access policies allow OpenSearch Ingestion to write data from your Amazon Aurora DB cluster to your domain or collection.

## Step 1: Configure the pipeline role
<a name="aurora-mysql-pipeline-role"></a>

After you have your Amazon Aurora pipeline prerequisites set up, [configure the pipeline role](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-security-overview.html#pipeline-security-sink) to use in your pipeline configuration. Also add the following permissions for Amazon Aurora source to the role:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
    {
    "Sid": "allowReadingFromS3Buckets",
    "Effect": "Allow",
    "Action": [
    "s3:GetObject",
    "s3:DeleteObject",
    "s3:GetBucketLocation",
    "s3:ListBucket",
    "s3:PutObject"
    ],
    "Resource": [
    "arn:aws:s3:::s3_bucket",
    "arn:aws:s3:::s3_bucket/*"
    ]
    },
    {
    "Sid": "allowNetworkInterfacesActions",
    "Effect": "Allow",
    "Action": [
    "ec2:AttachNetworkInterface",
    "ec2:CreateNetworkInterface",
    "ec2:CreateNetworkInterfacePermission",
    "ec2:DeleteNetworkInterface",
    "ec2:DeleteNetworkInterfacePermission",
    "ec2:DetachNetworkInterface",
    "ec2:DescribeNetworkInterfaces"
    ],
    "Resource": [
    "arn:aws:ec2:*:111122223333:network-interface/*",
    "arn:aws:ec2:*:111122223333:subnet/*",
    "arn:aws:ec2:*:111122223333:security-group/*"
    ]
    },
    {
    "Sid": "allowDescribeEC2",
    "Effect": "Allow",
    "Action": [
    "ec2:Describe*"
    ],
    "Resource": "*"
    },
    {
    "Sid": "allowTagCreation",
    "Effect": "Allow",
    "Action": [
    "ec2:CreateTags"
    ],
    "Resource": "arn:aws:ec2:*:111122223333:network-interface/*",
    "Condition": {
    "StringEquals": {
    "aws:RequestTag/OSISManaged": "true"
    }
    }
    },
    {
    "Sid": "AllowDescribeInstances",
    "Effect": "Allow",
    "Action": [
    "rds:DescribeDBInstances"
    ],
    "Resource": [
    "arn:aws:rds:us-east-2:111122223333:db:*"
    ]
    },
    {
    "Sid": "AllowDescribeClusters",
    "Effect": "Allow",
    "Action": [
    "rds:DescribeDBClusters"
    ],
    "Resource": [
    "arn:aws:rds:us-east-2:111122223333:cluster:DB-id"
    ]
    },
    {
    "Sid": "AllowSnapshots",
    "Effect": "Allow",
    "Action": [
    "rds:DescribeDBClusterSnapshots",
    "rds:CreateDBClusterSnapshot",
    "rds:AddTagsToResource"
    ],
    "Resource": [
    "arn:aws:rds:us-east-2:111122223333:cluster:DB-id",
    "arn:aws:rds:us-east-2:111122223333:cluster-snapshot:DB-id*"
    ]
    },
    {
    "Sid": "AllowExport",
    "Effect": "Allow",
    "Action": [
    "rds:StartExportTask"
    ],
    "Resource": [
    "arn:aws:rds:us-east-2:111122223333:cluster:DB-id",
    "arn:aws:rds:us-east-2:111122223333:cluster-snapshot:DB-id*"
    ]
    },
    {
    "Sid": "AllowDescribeExports",
    "Effect": "Allow",
    "Action": [
    "rds:DescribeExportTasks"
    ],
    "Resource": "*",
    "Condition": {
    "StringEquals": {
    "aws:RequestedRegion": "us-east-2",
    "aws:ResourceAccount": "111122223333"
    }
    }
    },
    {
    "Sid": "AllowAccessToKmsForExport",
    "Effect": "Allow",
    "Action": [
    "kms:Decrypt",
    "kms:Encrypt",
    "kms:DescribeKey",
    "kms:RetireGrant",
    "kms:CreateGrant",
    "kms:ReEncrypt*",
    "kms:GenerateDataKey*"
    ],
    "Resource": [
    "arn:aws:kms:us-east-2:111122223333:key/export-key-id"
    ]
    },
    {
    "Sid": "AllowPassingExportRole",
    "Effect": "Allow",
    "Action": "iam:PassRole",
    "Resource": [
    "arn:aws:iam::111122223333:role/export-role"
    ]
    },
    {
    "Sid": "SecretsManagerReadAccess",
    "Effect": "Allow",
    "Action": [
    "secretsmanager:GetSecretValue"
    ],
    "Resource": [
    "arn:aws:secretsmanager:*:111122223333:secret:*"
    ]
    }
    ]
    }
```

------

## Step 2: Create the pipeline
<a name="aurora-mysql-pipeline"></a>

Configure an OpenSearch Ingestion pipeline similar to the following. The example pipeline specifies an Amazon Aurora cluster as the source. 

```
version: "2"
aurora-mysql-pipeline:
  source:
    rds:
      db_identifier: "cluster-id"
      engine: aurora-mysql
      database: "database-name"
      tables:
        include:
          - "table1"
          - "table2"
      s3_bucket: "bucket-name"
      s3_region: "bucket-region"
      s3_prefix: "prefix-name"
      export:
        kms_key_id: "kms-key-id"
        iam_role_arn: "export-role-arn"
      stream: true
      aws:
        sts_role_arn: "arn:aws:iam::account-id:role/pipeline-role"
        region: "us-east-1"
      authentication:
        username: ${{aws_secrets:secret:username}}
        password: ${{aws_secrets:secret:password}}
  sink:
    - opensearch:
        hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"]
        index: "${getMetadata(\"table_name\")}"
        index_type: custom
        document_id: "${getMetadata(\"primary_key\")}"
        action: "${getMetadata(\"opensearch_action\")}"
        document_version: "${getMetadata(\"document_version\")}"
        document_version_type: "external"
        aws:
          sts_role_arn: "arn:aws:iam::account-id:role/pipeline-role"
          region: "us-east-1"
extension:
  aws:
    secrets:
      secret:
        secret_id: "rds-secret-id"
        region: "us-east-1"
        sts_role_arn: "arn:aws:iam::account-id:role/pipeline-role"
        refresh_interval: PT1H
```

You can use a preconfigured Amazon Aurora blueprint to create this pipeline. For more information, see [Working with blueprints](pipeline-blueprint.md).

To use Amazon Aurora as a source, you need to configure VPC access for the pipeline. The VPC you choose should be the same VPC your Amazon Aurora source uses. Then choose one or more subnets and one or more VPC security groups. Note that the pipeline needs network access to a Aurora MySQL database, so you should also verify that your Aurora cluster is configured with a VPC security group that allows inbound traffic from the pipeline's VPC security group to the database port. For more information, see [Controlling access with security groups](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/Overview.RDSSecurityGroups.html).

If you're using the AWS Management Console to create your pipeline, you must also attach your pipeline to your VPC in order to use Amazon Aurora as a source. To do so, find the **Network configuration** section, select the **Attach to VPC** checkbox, and choose your CIDR from one of the provided default options, or select your own. You can use any CIDR from a private address space as defined in the [RFC 1918 Best Current Practice](https://datatracker.ietf.org/doc/html/rfc1918).

To provide a custom CIDR, select **Other** from the dropdown menu. To avoid a collision in IP addresses between OpenSearch Ingestion and Amazon Aurora, ensure that the Amazon Aurora VPC CIDR is different from the CIDR for OpenSearch Ingestion.

For more information, see [Configuring VPC access for a pipeline](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-security.html#pipeline-vpc-configure).

## Data consistency
<a name="aurora-mysql-pipeline-consistency"></a>

The pipeline ensures data consistency by continuously polling or receiving changes from the Amazon Aurora cluster and updating the corresponding documents in the OpenSearch index.

OpenSearch Ingestion supports end-to-end acknowledgement to ensure data durability. When a pipeline reads snapshots or streams, it dynamically creates partitions for parallel processing. The pipeline marks a partition as complete when it receives an acknowledgement after ingesting all records in the OpenSearch domain or collection. If you want to ingest into an OpenSearch Serverless search collection, you can generate a document ID in the pipeline. If you want to ingest into an OpenSearch Serverless time series collection, note that the pipeline doesn't generate a document ID, so you must omit `document_id: "${getMetadata(\"primary_key\")}"` in your pipeline sink configuration. 

An OpenSearch Ingestion pipeline also maps incoming event actions into corresponding bulk indexing actions to help ingest documents. This keeps data consistent, so that every data change in Amazon Aurora is reconciled with the corresponding document changes in OpenSearch.

## Mapping data types
<a name="aurora-mysql-pipeline-mapping"></a>

OpenSearch Ingestion pipeline maps MySQL data types to representations that are suitable for OpenSearch Service domains or collections to consume. If no mapping template is defined in OpenSearch, OpenSearch automatically determines field types with [dynamic mapping](https://opensearch.org/docs/latest/field-types/#dynamic-mapping) based on the first sent document. You can also explicitly define the field types that work best for you in OpenSearch through a mapping template. 

The table below lists MySQL data types and corresponding OpenSearch field types. The *Default OpenSearch Field Type* column shows the corresponding field type in OpenSearch if no explicit mapping is defined. In this case, OpenSearch automatically determines field types with dynamic mapping. The *Recommended OpenSearch Field Type* column is the corresponding field type that is recommended to explicitly specify in a mapping template. These field types are more closely aligned with the data types in MySQL and can usually enable better search features available in OpenSearch.


| MySQL Data Type | Default OpenSearch Field Type | Recommended OpenSearch Field Type | 
| --- | --- | --- | 
| BIGINT | long | long | 
| BIGINT UNSIGNED | long | unsigned long | 
| BIT | long | byte, short, integer, or long depending on number of bits | 
| DECIMAL | text | double or keyword | 
| DOUBLE | float | double | 
| FLOAT | float | float | 
| INT | long | integer | 
| INT UNSIGNED | long | long | 
| MEDIUMINT | long | integer | 
| MEDIUMINT UNSIGNED | long | integer | 
| NUMERIC | text | double or keyword | 
| SMALLINT | long | short | 
| SMALLINT UNSIGNED | long | integer | 
| TINYINT | long | byte | 
| TINYINT UNSIGNED | long | short | 
| BINARY | text | binary | 
| BLOB | text | binary | 
| CHAR | text | text | 
| ENUM | text | keyword | 
| LONGBLOB | text | binary | 
| LONGTEXT | text | text | 
| MEDIUMBLOB | text | binary | 
| MEDIUMTEXT | text | text | 
| SET | text | keyword | 
| TEXT | text | text | 
| TINYBLOB | text | binary | 
| TINYTEXT | text | text | 
| VARBINARY | text | binary | 
| VARCHAR | text | text | 
| DATE | long (in epoch milliseconds) | date | 
| DATETIME | long (in epoch milliseconds) | date | 
| TIME | long (in epoch milliseconds) | date | 
| TIMESTAMP | long (in epoch milliseconds) | date | 
| YEAR | long (in epoch milliseconds) | date | 
| GEOMETRY | text (in WKT format) | geo\$1shape | 
| GEOMETRYCOLLECTION | text (in WKT format) | geo\$1shape | 
| LINESTRING | text (in WKT format) | geo\$1shape | 
| MULTILINESTRING | text (in WKT format) | geo\$1shape | 
| MULTIPOINT | text (in WKT format) | geo\$1shape | 
| MULTIPOLYGON | text (in WKT format) | geo\$1shape | 
| POINT | text (in WKT format) | geo\$1point or geo\$1shape | 
| POLYGON | text (in WKT format) | geo\$1shape | 
| JSON | text | object | 

We recommend that you configure the dead-letter queue (DLQ) in your OpenSearch Ingestion pipeline. If you've configured the queue, OpenSearch Service sends all failed documents that can't be ingested due to dynamic mapping failures to the queue.

If automatic mappings fail, you can use `template_type` and `template_content` in your pipeline configuration to define explicit mapping rules. Alternatively, you can create mapping templates directly in your search domain or collection before you start the pipeline.

## Limitations
<a name="aurora-mysql-pipeline-limitations"></a>

Consider the following limitations when you set up an OpenSearch Ingestion pipeline for Aurora MySQL:
+ The integration only supports one MySQL database per pipeline.
+ The integration does not currently support cross-region data ingestion; your Amazon Aurora cluster and OpenSearch domain must be in the same AWS Region.
+ The integration does not currently support cross-account data ingestion; your Amazon Aurora cluster and OpenSearch Ingestion pipeline must be in the same AWS account. 
+ Ensure that the Amazon Aurora cluster has authentication enabled using Secrets Manager, which is the only supported authentication mechanism.
+ The existing pipeline configuration can't be updated to ingest data from a different database and/or a different table. To update the database and/or table name of a pipeline, you have to stop the pipeline and restart it with an updated configuration, or create a new pipeline.
+ Data Definition Language (DDL) statements are generally not supported. Data consistency will not be maintained if:
  + Primary keys are changed (add/delete/rename).
  + Tables are dropped/truncated.
  + Column names or data types are changed.
+ If the MySQL tables to sync don't have primary keys defined, data consistency are not guaranteed. You will need to define custom `document_id` option in OpenSearch sink configuration properly to be able to sync updates/deletes to OpenSearch.
+ Foreign key references with cascading delete actions are not supported and can result in data inconsistency between Aurora MySQL and OpenSearch.
+ Supported versions: Aurora MySQL version 3.05.2 and higher.

## Recommended CloudWatch Alarms
<a name="aurora-mysql-pipeline-metrics"></a>

The following CloudWatch metrics are recommended for monitoring the performance of your ingestion pipeline. These metrics can help you identify the amount of data processed from exports, the number of events processed from streams, the errors in processing exports and stream events, and the number of documents written to the destination. You can setup CloudWatch alarms to perform an action when one of these metrics exceed a specified value for a specified amount of time.


| Metric | Description | 
| --- | --- | 
| pipeline-name.rds.credentialsChanged | This metric indicates how often AWS secrets are rotated. | 
| pipeline-name.rds.executorRefreshErrors | This metric indicates failures to refresh AWS secrets. | 
| pipeline-name.rds.exportRecordsTotal | This metric indicates the number of records exported from Amazon Aurora. | 
| pipeline-name.rds.exportRecordsProcessed | This metric indicates the number of records processed by OpenSearch Ingestion pipeline. | 
| pipeline-name.rds.exportRecordProcessingErrors | This metric indicates number of processing errors in an OpenSearch Ingestion pipeline while reading the data from an Amazon Aurora cluster. | 
| pipeline-name.rds.exportRecordsSuccessTotal | This metric indicates the total number of export records processed successfully. | 
| pipeline-name.rds.exportRecordsFailedTotal | This metric indicates the total number of export records that failed to process. | 
| pipeline-name.rds.bytesReceived | This metrics indicates the total number of bytes received by an OpenSearch Ingestion pipeline. | 
| pipeline-name.rds.bytesProcessed | This metrics indicates the total number of bytes processed by an OpenSearch Ingestion pipeline. | 
| pipeline-name.rds.streamRecordsSuccessTotal | This metric indicates the number of records successfully processed from the stream. | 
| pipeline-name.rds.streamRecordsFailedTotal | This metrics indicates the total number of records failed to process from the stream. | 

# Aurora PostgreSQL
<a name="aurora-PostgreSQL"></a>

Complete the following steps to configure an OpenSearch Ingestion pipeline with Amazon Aurora for Aurora PostgreSQL.

**Topics**
+ [Aurora PostgreSQL prerequisites](#aurora-PostgreSQL-prereqs)
+ [Step 1: Configure the pipeline role](#aurora-mysql-pipeline-role)
+ [Step 2: Create the pipeline](#aurora-PostgreSQL-pipeline)
+ [Data consistency](#aurora-mysql-pipeline-consistency)
+ [Mapping data types](#aurora-PostgreSQL-pipeline-mapping)
+ [Limitations](#aurora-PostgreSQL-pipeline-limitations)
+ [Recommended CloudWatch Alarms](#aurora-mysql-pipeline-metrics)

## Aurora PostgreSQL prerequisites
<a name="aurora-PostgreSQL-prereqs"></a>

Before you create your OpenSearch Ingestion pipeline, perform the following steps:

1. [Create a custom DB cluster parameter group](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/CHAP_GettingStartedAurora.CreatingConnecting.Aurora.html) in Amazon Aurora to configure logical replication.

   ```
   rds.logical_replication=1
       aurora.enhanced_logical_replication=1
       aurora.logical_replication_backup=0
       aurora.logical_replication_globaldb=0
   ```

1. [Select or create an Aurora PostgreSQL DB cluster](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/CHAP_GettingStartedAurora.CreatingConnecting.Aurora.html) and associate the parameter group created in step 1 with the DB cluster.

1. Set up username and password authentication on your Amazon Aurora cluster using [password management with Aurora and AWS Secrets Manager](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/rds-secrets-manager.html). You can also create a username/password combination by [creating a Secrets Manager secret](https://docs.aws.amazon.com/secretsmanager/latest/userguide/create_secret.html).

1. If you use the full initial snapshot feature, create an AWS KMS key and an IAM role for exporting data from Amazon Aurora to Amazon S3.

   The IAM role should have the following permission policy:

------
#### [ JSON ]

****  

   ```
   {
           "Version":"2012-10-17",		 	 	 
           "Statement": [
               {
                   "Sid": "ExportPolicy",
                   "Effect": "Allow",
                   "Action": [
                       "s3:PutObject*",
                       "s3:ListBucket",
                       "s3:GetObject*",
                       "s3:DeleteObject*",
                       "s3:GetBucketLocation"
                   ],
                   "Resource": [
                       "arn:aws:s3:::s3-bucket-used-in-pipeline",
                       "arn:aws:s3:::s3-bucket-used-in-pipeline/*"
                   ]
               }
           ]
       }
   ```

------

   The role should also have the following trust relationships:

------
#### [ JSON ]

****  

   ```
   {
           "Version":"2012-10-17",		 	 	 
           "Statement": [
               {
                   "Effect": "Allow",
                   "Principal": {
                       "Service": "export.rds.amazonaws.com"
                   },
                   "Action": "sts:AssumeRole"
               }
           ]
       }
   ```

------

1. Select or create an OpenSearch Service domain or OpenSearch Serverless collection. For more information, see [Creating OpenSearch Service domains](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/createupdatedomains.html#createdomains) and [Creating collections](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-manage.html#serverless-create).

1. Attach a [resource-based policy](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource) to your domain or a [data access policy](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-data-access.html) to your collection. These access policies allow OpenSearch Ingestion to write data from your Amazon Aurora DB cluster to your domain or collection.

## Step 1: Configure the pipeline role
<a name="aurora-mysql-pipeline-role"></a>

After you have your Amazon Aurora pipeline prerequisites set up, [configure the pipeline role](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-security-overview.html#pipeline-security-sink) to use in your pipeline configuration. Also add the following permissions for Amazon Aurora source to the role:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
    {
    "Sid": "allowReadingFromS3Buckets",
    "Effect": "Allow",
    "Action": [
    "s3:GetObject",
    "s3:DeleteObject",
    "s3:GetBucketLocation",
    "s3:ListBucket",
    "s3:PutObject"
    ],
    "Resource": [
    "arn:aws:s3:::s3_bucket",
    "arn:aws:s3:::s3_bucket/*"
    ]
    },
    {
    "Sid": "allowNetworkInterfacesActions",
    "Effect": "Allow",
    "Action": [
    "ec2:AttachNetworkInterface",
    "ec2:CreateNetworkInterface",
    "ec2:CreateNetworkInterfacePermission",
    "ec2:DeleteNetworkInterface",
    "ec2:DeleteNetworkInterfacePermission",
    "ec2:DetachNetworkInterface",
    "ec2:DescribeNetworkInterfaces"
    ],
    "Resource": [
    "arn:aws:ec2:*:111122223333:network-interface/*",
    "arn:aws:ec2:*:111122223333:subnet/*",
    "arn:aws:ec2:*:111122223333:security-group/*"
    ]
    },
    {
    "Sid": "allowDescribeEC2",
    "Effect": "Allow",
    "Action": [
    "ec2:Describe*"
    ],
    "Resource": "*"
    },
    {
    "Sid": "allowTagCreation",
    "Effect": "Allow",
    "Action": [
    "ec2:CreateTags"
    ],
    "Resource": "arn:aws:ec2:*:111122223333:network-interface/*",
    "Condition": {
    "StringEquals": {
    "aws:RequestTag/OSISManaged": "true"
    }
    }
    },
    {
    "Sid": "AllowDescribeInstances",
    "Effect": "Allow",
    "Action": [
    "rds:DescribeDBInstances"
    ],
    "Resource": [
    "arn:aws:rds:us-east-2:111122223333:db:*"
    ]
    },
    {
    "Sid": "AllowDescribeClusters",
    "Effect": "Allow",
    "Action": [
    "rds:DescribeDBClusters"
    ],
    "Resource": [
    "arn:aws:rds:us-east-2:111122223333:cluster:DB-id"
    ]
    },
    {
    "Sid": "AllowSnapshots",
    "Effect": "Allow",
    "Action": [
    "rds:DescribeDBClusterSnapshots",
    "rds:CreateDBClusterSnapshot",
    "rds:AddTagsToResource"
    ],
    "Resource": [
    "arn:aws:rds:us-east-2:111122223333:cluster:DB-id",
    "arn:aws:rds:us-east-2:111122223333:cluster-snapshot:DB-id*"
    ]
    },
    {
    "Sid": "AllowExport",
    "Effect": "Allow",
    "Action": [
    "rds:StartExportTask"
    ],
    "Resource": [
    "arn:aws:rds:us-east-2:111122223333:cluster:DB-id",
    "arn:aws:rds:us-east-2:111122223333:cluster-snapshot:DB-id*"
    ]
    },
    {
    "Sid": "AllowDescribeExports",
    "Effect": "Allow",
    "Action": [
    "rds:DescribeExportTasks"
    ],
    "Resource": "*",
    "Condition": {
    "StringEquals": {
    "aws:RequestedRegion": "us-east-2",
    "aws:ResourceAccount": "111122223333"
    }
    }
    },
    {
    "Sid": "AllowAccessToKmsForExport",
    "Effect": "Allow",
    "Action": [
    "kms:Decrypt",
    "kms:Encrypt",
    "kms:DescribeKey",
    "kms:RetireGrant",
    "kms:CreateGrant",
    "kms:ReEncrypt*",
    "kms:GenerateDataKey*"
    ],
    "Resource": [
    "arn:aws:kms:us-east-2:111122223333:key/export-key-id"
    ]
    },
    {
    "Sid": "AllowPassingExportRole",
    "Effect": "Allow",
    "Action": "iam:PassRole",
    "Resource": [
    "arn:aws:iam::111122223333:role/export-role"
    ]
    },
    {
    "Sid": "SecretsManagerReadAccess",
    "Effect": "Allow",
    "Action": [
    "secretsmanager:GetSecretValue"
    ],
    "Resource": [
    "arn:aws:secretsmanager:*:111122223333:secret:*"
    ]
    }
    ]
    }
```

------

## Step 2: Create the pipeline
<a name="aurora-PostgreSQL-pipeline"></a>

Configure an OpenSearch Ingestion pipeline like the following, which specifies Aurora PostgreSQL cluster as the source. 

```
version: "2"
aurora-postgres-pipeline:
  source:
    rds:
      db_identifier: "cluster-id"
      engine: aurora-postgresql
      database: "database-name"
      tables:
        include:
          - "schema1.table1"
          - "schema2.table2"
      s3_bucket: "bucket-name"
      s3_region: "bucket-region"
      s3_prefix: "prefix-name"
      export:
        kms_key_id: "kms-key-id"
        iam_role_arn: "export-role-arn"
      stream: true
      aws:
        sts_role_arn: "arn:aws:iam::account-id:role/pipeline-role"
        region: "us-east-1"
      authentication:
        username: ${{aws_secrets:secret:username}}
        password: ${{aws_secrets:secret:password}}
  sink:
    - opensearch:
        hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"]
        index: "${getMetadata(\"table_name\")}"
        index_type: custom
        document_id: "${getMetadata(\"primary_key\")}"
        action: "${getMetadata(\"opensearch_action\")}"
        document_version: "${getMetadata(\"document_version\")}"
        document_version_type: "external"
        aws:
          sts_role_arn: "arn:aws:iam::account-id:role/pipeline-role"
          region: "us-east-1"
extension:
  aws:
    secrets:
      secret:
        secret_id: "rds-secret-id"
        region: "us-east-1"
        sts_role_arn: "arn:aws:iam::account-id:role/pipeline-role"
        refresh_interval: PT1H
```

**Note**  
You can use a preconfigured Amazon Aurora blueprint to create this pipeline. For more information, see [Working with blueprints](pipeline-blueprint.md).

To use Amazon Aurora as a source, you need to configure VPC access for the pipeline. The VPC you choose should be the same VPC your Amazon Aurora source uses. Then choose one or more subnets and one or more VPC security groups. Note that the pipeline needs network access to a Aurora MySQL database, so you should also verify that your Aurora cluster is configured with a VPC security group that allows inbound traffic from the pipeline's VPC security group to the database port. For more information, see [Controlling access with security groups](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/Overview.RDSSecurityGroups.html).

If you're using the AWS Management Console to create your pipeline, you must also attach your pipeline to your VPC in order to use Amazon Aurora as a source. To do this, find the **Network configuration** section, choose **Attach to VPC**, and choose your CIDR from one of the provided default options, or select your own. You can use any CIDR from a private address space as defined in the [RFC 1918 Best Current Practice](https://datatracker.ietf.org/doc/html/rfc1918).

To provide a custom CIDR, select Other from the dropdown menu. To avoid a collision in IP addresses between OpenSearch Ingestion and Amazon Aurora, ensure that the Amazon Aurora VPC CIDR is different from the CIDR for OpenSearch Ingestion.

For more information, see [Configuring VPC access for a pipeline](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-security.html#pipeline-vpc-configure).

## Data consistency
<a name="aurora-mysql-pipeline-consistency"></a>

The pipeline ensures data consistency by continuously polling or receiving changes from the Amazon Aurora cluster and updating the corresponding documents in the OpenSearch index.

OpenSearch Ingestion supports end-to-end acknowledgement to ensure data durability. When a pipeline reads snapshots or streams, it dynamically creates partitions for parallel processing. The pipeline marks a partition as complete when it receives an acknowledgement after ingesting all records in the OpenSearch domain or collection. If you want to ingest into an OpenSearch Serverless search collection, you can generate a document ID in the pipeline. If you want to ingest into an OpenSearch Serverless time series collection, note that the pipeline doesn't generate a document ID, so you must omit `document_id: "${getMetadata(\"primary_key\")}"` in your pipeline sink configuration. 

An OpenSearch Ingestion pipeline also maps incoming event actions into corresponding bulk indexing actions to help ingest documents. This keeps data consistent, so that every data change in Amazon Aurora is reconciled with the corresponding document changes in OpenSearch.

## Mapping data types
<a name="aurora-PostgreSQL-pipeline-mapping"></a>

OpenSearch Ingestion pipeline maps Aurora PostgreSQL data types to representations that are suitable for OpenSearch Service domains or collections to consume. If no mapping template is defined in OpenSearch, OpenSearch automatically determine field types with a [dynamic mapping](https://opensearch.org/docs/latest/field-types/#dynamic-mapping) based on the first sent document. You can also explicitly define the field types that work best for you in OpenSearch through a mapping template. 

The table below lists Aurora PostgreSQL data types and corresponding OpenSearch field types. The *Default OpenSearch Field Type* column shows the corresponding field type in OpenSearch if no explicit mapping is defined. In this case, OpenSearch automatically determines field types with dynamic mapping. The *Recommended OpenSearch Field Type* column is the corresponding recommended field type to explicitly specify in a mapping template. These field types are more closely aligned with the data types in Aurora PostgreSQL and can usually enable better search features available in OpenSearch.


| Aurora PostgreSQL Data Type | Default OpenSearch Field Type | Recommended OpenSearch Field Type | 
| --- | --- | --- | 
| smallint | long | short | 
| integer | long | integer | 
| bigint | long | long | 
| decimal | text | double or keyword | 
| numeric[ (p, s) ] | text | double or keyword | 
| real | float | float | 
| double precision | float | double | 
| smallserial | long | short | 
| serial | long | integer | 
| bigserial | long | long | 
| money | object | object | 
| character varying(n) | text | text | 
| varchar(n) | text | text | 
| character(n) | text | text | 
| char(n) | text | text | 
| bpchar(n) | text | text | 
| bpchar | text | text | 
| text | text | text | 
| enum | text | text | 
| bytea | text | binary | 
| timestamp [ (p) ] [ without time zone ] | long (in epoch milliseconds) | date | 
| timestamp [ (p) ] with time zone | long (in epoch milliseconds) | date | 
| date | long (in epoch milliseconds) | date | 
| time [ (p) ] [ without time zone ] | long (in epoch milliseconds) | date | 
| time [ (p) ] with time zone | long (in epoch milliseconds) | date | 
| interval [ fields ] [ (p) ] | text (ISO8601 format) | text | 
| boolean | boolean | boolean | 
| point | text (in WKT format) | geo\$1shape | 
| line | text (in WKT format) | geo\$1shape | 
| lseg | text (in WKT format) | geo\$1shape | 
| box | text (in WKT format) | geo\$1shape | 
| path | text (in WKT format) | geo\$1shape | 
| polygon | text (in WKT format) | geo\$1shape | 
| circle | object | object | 
| cidr | text | text | 
| inet | text | text | 
| macaddr | text | text | 
| macaddr8 | text | text | 
| bit(n) | long | byte, short, integer, or long (depending on number of bits) | 
| bit varying(n) | long | byte, short, integer, or long (depending on number of bits) | 
| json | object | object | 
| jsonb | object | object | 
| jsonpath | text | text | 

We recommend that you configure the dead-letter queue (DLQ) in your OpenSearch Ingestion pipeline. If you've configured the queue, OpenSearch Service sends all failed documents that can't be ingested due to dynamic mapping failures to the queue.

In case automatic mappings fail, you can use `template_type` and `template_content` in your pipeline configuration to define explicit mapping rules. Alternatively, you can create mapping templates directly in your search domain or collection before you start the pipeline.

## Limitations
<a name="aurora-PostgreSQL-pipeline-limitations"></a>

Consider the following limitations when you set up an OpenSearch Ingestion pipeline for Aurora PostgreSQL:
+ The integration only supports one Aurora PostgreSQL database per pipeline.
+ The integration does not currently support cross-region data ingestion; your Amazon Aurora cluster and OpenSearch domain must be in the same AWS Region.
+ The integration does not currently support cross-account data ingestion; your Amazon Aurora cluster and OpenSearch Ingestion pipeline must be in the same AWS account. 
+ Ensure that the Amazon Aurora cluster has authentication enabled using AWS Secrets Manager, which is the only supported authentication mechanism.
+ The existing pipeline configuration can't be updated to ingest data from a different database and/or a different table. To update the database and/or table name of a pipeline, you have to stop the pipeline and restart it with an updated configuration, or create a new pipeline.
+ Data Definition Language (DDL) statements are generally not supported. Data consistency will not be maintained if:
  + Primary keys are changed (add/delete/rename).
  + Tables are dropped/truncated.
  + Column names or data types are changed.
+ If the Aurora PostgreSQL tables to sync don’t have primary keys defined, data consistency isn't guaranteed. You will need to define custom the `document_id` option in OpenSearch and sink configuration properly to be able to sync updates/deletes to OpenSearch.
+ Supported versions: Aurora PostgreSQL Version 16.4 and higher. 

## Recommended CloudWatch Alarms
<a name="aurora-mysql-pipeline-metrics"></a>

The following CloudWatch metrics are recommended for monitoring the performance of your ingestion pipeline. These metrics can help you identify the amount of data processed from exports, the number of events processed from streams, the errors in processing exports and stream events, and the number of documents written to the destination. You can setup CloudWatch alarms to perform an action when one of these metrics exceed a specified value for a specified amount of time.


| Metric | Description | 
| --- | --- | 
| pipeline-name.rds.credentialsChanged | This metric indicates how often AWS secrets are rotated. | 
| pipeline-name.rds.executorRefreshErrors | This metric indicates failures to refresh AWS secrets. | 
| pipeline-name.rds.exportRecordsTotal | This metric indicates the number of records exported from Amazon Aurora. | 
| pipeline-name.rds.exportRecordsProcessed | This metric indicates the number of records processed by OpenSearch Ingestion pipeline. | 
| pipeline-name.rds.exportRecordProcessingErrors | This metric indicates number of processing errors in an OpenSearch Ingestion pipeline while reading the data from an Amazon Aurora cluster. | 
| pipeline-name.rds.exportRecordsSuccessTotal | This metric indicates the total number of export records processed successfully. | 
| pipeline-name.rds.exportRecordsFailedTotal | This metric indicates the total number of export records that failed to process. | 
| pipeline-name.rds.bytesReceived | This metrics indicates the total number of bytes received by an OpenSearch Ingestion pipeline. | 
| pipeline-name.rds.bytesProcessed | This metrics indicates the total number of bytes processed by an OpenSearch Ingestion pipeline. | 
| pipeline-name.rds.streamRecordsSuccessTotal | This metric indicates the number of records successfully processed from the stream. | 
| pipeline-name.rds.streamRecordsFailedTotal | This metrics indicates the total number of records failed to process from the stream. | 

# Using an OpenSearch Ingestion pipeline with Amazon DynamoDB
<a name="configure-client-ddb"></a>

You can use the [DynamoDB](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/dynamo-db/) plugin to stream table events, such as creates, updates, and deletes, to Amazon OpenSearch Service domains and Amazon OpenSearch Serverless collections. The pipeline uses change data capture (CDC) for high-scale, low-latency streaming.

You can process DynamoDB data with or without a full initial snapshot. 
+ **With a full snapshot** – DynamoDB uses [point-in-time recovery](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/PointInTimeRecovery.html) (PITR) to create a backup and uploads it to Amazon S3. OpenSearch Ingestion then indexes the snapshot in one or multiple OpenSearch indexes. To maintain consistency, the pipeline synchronizes all DynamoDB changes with OpenSearch. This option requires you to enable both PITR and [DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.CoreComponents.html#HowItWorks.CoreComponents.Streams).
+ **Without a snapshot** – OpenSearch Ingestion streams only new DynamoDB events. Choose this option if you already have a snapshot or need real-time streaming without historical data. This option requires you to enable only DynamoDB Streams.

For more information, see [DynamoDB zero-ETL integration with Amazon OpenSearch Service](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/OpenSearchIngestionForDynamoDB.html) in the *Amazon DynamoDB Developer Guide*.

**Topics**
+ [Prerequisites](#s3-prereqs)
+ [Step 1: Configure the pipeline role](#ddb-pipeline-role)
+ [Step 2: Create the pipeline](#ddb-pipeline)
+ [Data consistency](#ddb-pipeline-consistency)
+ [Mapping data types](#ddb-pipeline-mapping)
+ [Limitations](#ddb-pipeline-limitations)
+ [Recommended CloudWatch Alarms for DynamoDB](#ddb-pipeline-metrics)

## Prerequisites
<a name="s3-prereqs"></a>

To set up your pipeline, you must have a DynamoDB table with DynamoDB Streams enabled. Your stream should use the `NEW_IMAGE` stream view type. However, OpenSearch Ingestion pipelines can also stream events with `NEW_AND_OLD_IMAGES` if this stream view type fits your use case.

If you're using snapshots, you must also enable point-in-time recovery on your table. For more information, see [Creating a table](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.Basics.html#WorkingWithTables.Basics.CreateTable), [Enabling point-in-time recovery](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/PointInTimeRecovery_Howitworks.html#howitworks_enabling), and [Enabling a stream](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling) in the *Amazon DynamoDB Developer Guide*.

## Step 1: Configure the pipeline role
<a name="ddb-pipeline-role"></a>

After you have your DynamoDB table set up, [set up the pipeline role](pipeline-security-overview.md#pipeline-security-sink) that you want to use in your pipeline configuration, and add the following DynamoDB permissions in the role:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "allowRunExportJob",
            "Effect": "Allow",
            "Action": [
                "dynamodb:DescribeTable",
                "dynamodb:DescribeContinuousBackups",
                "dynamodb:ExportTableToPointInTime"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-east-1:111122223333:table/my-table"
            ]
        },
        {
            "Sid": "allowCheckExportjob",
            "Effect": "Allow",
            "Action": [
                "dynamodb:DescribeExport"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-east-1:111122223333:table/my-table/export/*"
            ]
        },
        {
            "Sid": "allowReadFromStream",
            "Effect": "Allow",
            "Action": [
                "dynamodb:DescribeStream",
                "dynamodb:GetRecords",
                "dynamodb:GetShardIterator"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-east-1:111122223333:table/my-table/stream/*"
            ]
        },
        {
            "Sid": "allowReadAndWriteToS3ForExport",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:AbortMultipartUpload",
                "s3:PutObject",
                "s3:PutObjectAcl"
            ],
            "Resource": [
                "arn:aws:s3:::amzn-s3-demo-bucket/export-folder/*"
            ]
        }
    ]
}
```

------

You can also use an AWS KMS customer managed key to encrypt the export data files. To decrypt the exported objects, specify `s3_sse_kms_key_id` for the key ID in the export configuration of the pipeline with the following format: `arn:aws:kms:region:account-id:key/my-key-id`. The following policy includes the required permissions for using a customer managed key:

```
{
    "Sid": "allowUseOfCustomManagedKey",
    "Effect": "Allow",
    "Action": [
        "kms:GenerateDataKey",
        "kms:Decrypt"
    ],
    "Resource": arn:aws:kms:region:account-id:key/my-key-id
}
```

## Step 2: Create the pipeline
<a name="ddb-pipeline"></a>

You can then configure an OpenSearch Ingestion pipeline like the following, which specifies DynamoDB as the source. This sample pipeline ingests data from `table-a` with the PITR snapshot, followed by events from DynamoDB Streams. A start position of `LATEST` indicates that the pipeline should read the latest data from DynamoDB Streams.

```
version: "2"
cdc-pipeline:
  source:
    dynamodb:
      tables:
      - table_arn: "arn:aws:dynamodb:region:account-id:table/table-a"  
        export:
          s3_bucket: "my-bucket"
          s3_prefix: "export/"
        stream:
          start_position: "LATEST"
      aws:
        region: "us-east-1"
  sink:
  - opensearch:
      hosts: ["https://search-mydomain.region.es.amazonaws.com"]
      index: "${getMetadata(\"table-name\")}"
      index_type: custom
      normalize_index: true
      document_id: "${getMetadata(\"primary_key\")}"
      action: "${getMetadata(\"opensearch_action\")}"
      document_version: "${getMetadata(\"document_version\")}"
      document_version_type: "external"
```

You can use a preconfigured DynamoDB blueprint to create this pipeline. For more information, see [Working with blueprints](pipeline-blueprint.md).

## Data consistency
<a name="ddb-pipeline-consistency"></a>

OpenSearch Ingestion supports end-to-end acknowledgement to ensure data durability. When a pipeline reads snapshots or streams, it dynamically creates partitions for parallel processing. The pipeline marks a partition as complete when it receives an acknowledgement after ingesting all records in the OpenSearch domain or collection. 

If you want to ingest into an OpenSearch Serverless *search* collection, you can generate a document ID in the pipeline. If you want to ingest into an OpenSearch Serverless *time series* collection, note that the pipeline doesn't generate a document ID.

An OpenSearch Ingestion pipeline also maps incoming event actions into corresponding bulk indexing actions to help ingest documents. This keeps data consistent, so that every data change in DynamoDB is reconciled with the corresponding document changes in OpenSearch.

## Mapping data types
<a name="ddb-pipeline-mapping"></a>

OpenSearch Service dynamically maps data types in each incoming document to the corresponding data type in DynamoDB. The following table shows how OpenSearch Service automatically maps various data types.


| Data type | OpenSearch | DynamoDB | 
| --- | --- | --- | 
| Number |  OpenSearch automatically maps numeric data. If the number is a whole number, OpenSearch maps it as a long value. If the number is fractional, then OpenSearch maps it as a float value. OpenSearch dynamically maps various attributes based on the first sent document. If you have a mix of data types for the same attribute in DynamoDB, such as both a whole number and a fractional number, mapping might fail.  For example, if your first document has an attribute that is a whole number, and a later document has that same attribute as a fractional number, OpenSearch fails to ingest the second document. In these cases, you should provide an explicit mapping template, such as the following: <pre>{<br /> "template": {<br />  "mappings": {<br />   "properties": {<br />    "MixedNumberAttribute": {<br />     "type": "float"<br />    }<br />   }<br />  }<br /> }<br />}</pre> If you need double precision, use string-type field mapping. There is no equivalent numeric type that supports 38 digits of precision in OpenSearch.  |  DynamoDB supports [numbers](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.Number).  | 
| Number set | OpenSearch automatically maps a number set into an array of either long values or float values. As with the scalar numbers, this depends on whether the first number ingested is a whole number or a fractional number. You can provide mappings for number sets the same way that you map scalar strings. |  DynamoDB supports types that represent [sets of numbers](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.SetTypes).  | 
| String |  OpenSearch automatically maps string values as text. In some situations, such as enumerated values, you can map to the keyword type. The following example shows how to map a DynamoDB attribute named `PartType` to an OpenSearch keyword. <pre>{<br /> "template": {<br />  "mappings": {<br />   "properties": {<br />    "PartType": {<br />     "type": "keyword"<br />    }<br />   }<br />  }<br /> }<br />}</pre>  |  DynamoDB supports [strings](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.String).  | 
| String set |  OpenSearch automatically maps a string set into an array of strings. You can provide mappings for string sets the same way that you map scalar strings.  | DynamoDB supports types that represent [sets of strings](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.SetTypes). | 
| Binary |  OpenSearch automatically maps binary data as text. You can provide a mapping to write these as binary fields in OpenSearch. The following example shows how to map a DynamoDB attribute named `ImageData` to an OpenSearch binary field. <pre>{<br /> "template": {<br />  "mappings": {<br />   "properties": {<br />    "ImageData": {<br />     "type": "binary"<br />    }<br />   }<br />  }<br /> }<br />}</pre>  | DynamoDB supports [binary type attributes](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.Binary). | 
| Binary set |  OpenSearch automatically maps a binary set into an array of binary data as text. You can provide mappings for number sets the same way that you map scalar binary.  | DynamoDB supports types that represent [sets of binary values](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.SetTypes). | 
| Boolean |  OpenSearch maps a DynamoDB Boolean type into an OpenSearch Boolean type.  |  DynamoDB supports [Boolean type attributes](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.Boolean).  | 
| Null |  OpenSearch can ingest documents with the DynamoDB null type. It saves the value as a null value in the document. There is no mapping for this type, and this field is not indexed or searchable. If the same attribute name is used for a null type and then later changes to different type such as string, OpenSearch creates a dynamic mapping for the first non-null value. Subsequent values can still be DynamoDB null values.  | DynamoDB supports [null type attributes](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.Null). | 
| Map |  OpenSearch maps DynamoDB map attributes to nested fields. The same mappings apply within a nested field. The following example maps a string in a nested field to a keyword type in OpenSearch: <pre>{<br /> "template": {<br />  "mappings": {<br />   "properties": {<br />    "AdditionalDescriptions": {<br />     "properties": {<br />      "PartType": {<br />       "type": "keyword"<br />      }<br />     }<br />    }<br />   }<br />  }<br /> }<br />}</pre>  | DynamoDB supports [map type attributes](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.Document.Map). | 
| List |  OpenSearch provides different results for DynamoDB lists, depending on what is in the list. When a list contains all of the same type of scalar types (for example, a list of all strings), then OpenSearch ingests the list as an array of that type. This works for string, number, Boolean, and null types. The restrictions for each of these types are the same as restrictions for a scalar of that type. You can also provide mappings for lists of maps by using the same mapping as you would use for a map. You can't provide a list of mixed types.   |  DynamoDB supports [list type attributes](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.Document.List).  | 
| Set |  OpenSearch provides different results for DynamoDB sets depending on what is in the set. When a set contains all of the same type of scalar types (for example, a set of all strings), then OpenSearch ingests the set as an array of that type. This works for string, number, Boolean, and null types. The restrictions for each of these types are the same as the restrictions for a scalar of that type. You can also provide mappings for sets of maps by using the same mapping as you would use for a map. You can't provide a set of mixed types.   | DynamoDB supports types that represent [sets](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.SetTypes). | 

We recommend that you configure the dead-letter queue (DLQ) in your OpenSearch Ingestion pipeline. If you've configured the queue, OpenSearch Service sends all failed documents that can't be ingested due to dynamic mapping failures to the queue. 

In case automatic mappings fail, you can use `template_type` and `template_content` in your pipeline configuration to define explicit mapping rules. Alternatively, you can create mapping templates directly in your search domain or collection before you start the pipeline. 

## Limitations
<a name="ddb-pipeline-limitations"></a>

Consider the following limitations when you set up an OpenSearch Ingestion pipeline for DynamoDB:
+ The OpenSearch Ingestion integration with DynamoDB currently doesn't support cross-Region ingestion. Your DynamoDB table and OpenSearch Ingestion pipeline must be in the same AWS Region.
+ Your DynamoDB table and OpenSearch Ingestion pipeline must be in the same AWS account.
+ An OpenSearch Ingestion pipeline supports only one DynamoDB table as its source. 
+ DynamoDB Streams only stores data in a log for up to 24 hours. If ingestion from an initial snapshot of a large table takes 24 hours or more, there will be some initial data loss. To mitigate this data loss, estimate the size of the table and configure appropriate compute units of OpenSearch Ingestion pipelines. 

## Recommended CloudWatch Alarms for DynamoDB
<a name="ddb-pipeline-metrics"></a>

The following CloudWatch metrics are recommended for monitioring the performance of your ingestion pipeline. These metrics can help you identify the amount of data processed from exports, the amount of events processed from streams, the errors in processing exports and stream events, and the number of documents written to the destination. You can setup CloudWatch alarms to perform an action when one of these metrics exceed a specified value for a specified amount of time.


| Metric | Description | 
| --- |--- |
| dynamodb-pipeline.BlockingBuffer.bufferUsage.value |  Indicates how much of the buffer is being utilized.  | 
|  dynamodb-pipeline.dynamodb.activeExportS3ObjectConsumers.value  |  Shows the total number of OCUs that are actively processing Amazon S3 objects for the export.  | 
|  dynamodb-pipeline.dynamodb.bytesProcessed.count  |  Count of bytes processed from DynamoDB source.  | 
|  dynamodb-pipeline.dynamodb.changeEventsProcessed.count  |  Number of change events processed from DynamoDB stream.  | 
|  dynamodb-pipeline.dynamodb.changeEventsProcessingErrors.count  |  Number of errors from change events processed from DynamoDB.  | 
|  dynamodb-pipeline.dynamodb.exportJobFailure.count  | Number of export job submission attempts that have failed. | 
|  dynamodb-pipeline.dynamodb.exportJobSuccess.count  | Number of export jobs that have been submitted successfully. | 
|  dynamodb-pipeline.dynamodb.exportRecordsProcessed.count  |  Total number of records processed from the export.  | 
|  dynamodb-pipeline.dynamodb.exportRecordsTotal.count  |  Total number of records exported from DynamoDB, essential for tracking data export volumes.  | 
|  dynamodb-pipeline.dynamodb.exportS3ObjectsProcessed.count  | Total number of export data files that have been processed successfully from Amazon S3. | 
|  dynamodb-pipeline.opensearch.bulkBadRequestErrors.count  | Count of errors during bulk requests due to malformed request. | 
|  dynamodb-pipeline.opensearch.bulkRequestLatency.avg  | Average latency for bulk write requests made to OpenSearch. | 
|  dynamodb-pipeline.opensearch.bulkRequestNotFoundErrors.count  | Number of bulk requests that failed because the target data could not be found. | 
|  dynamodb-pipeline.opensearch.bulkRequestNumberOfRetries.count  | Number of retries by OpenSearch Ingestion pipelines to write OpenSearch cluster. | 
|  dynamodb-pipeline.opensearch.bulkRequestSizeBytes.sum  | Total size in bytes of all bulk requests made to OpenSearch. | 
|  dynamodb-pipeline.opensearch.documentErrors.count  | Number of errors when sending documents to OpenSearch. The documents causing the errors witll be sent to DLQ. | 
|  dynamodb-pipeline.opensearch.documentsSuccess.count  | Number of documents successfully written to an OpenSearch cluster or collection. | 
|  dynamodb-pipeline.opensearch.documentsSuccessFirstAttempt.count  | Number of documents successfully indexed in OpenSearch on the first attempt. | 
|  `dynamodb-pipeline.opensearch.documentsVersionConflictErrors.count`  | Count of errors due to version conflicts in documents during processing. | 
|  `dynamodb-pipeline.opensearch.PipelineLatency.avg`  | Average latency of OpenSearch Ingestion pipeline to process the data by reading from the source to writint to the destination. | 
|  dynamodb-pipeline.opensearch.PipelineLatency.max  | Maximum latency of OpenSearch Ingestion pipeline to process the data by reading from the source to writing the destination. | 
|  dynamodb-pipeline.opensearch.recordsIn.count  | Count of records successfully ingested into OpenSearch. This metric is essential for tracking the volume of data being processed and stored. | 
|  dynamodb-pipeline.opensearch.s3.dlqS3RecordsFailed.count  | Number of records that failed to write to DLQ. | 
|  dynamodb-pipeline.opensearch.s3.dlqS3RecordsSuccess.count  | Number of records that are written to DLQ. | 
|  dynamodb-pipeline.opensearch.s3.dlqS3RequestLatency.count  | Count of latency measurements for requests to the Amazon S3 dead-letter queue. | 
|  dynamodb-pipeline.opensearch.s3.dlqS3RequestLatency.sum  | Total latency for all requests to the Amazon S3 dead-letter queue | 
|  dynamodb-pipeline.opensearch.s3.dlqS3RequestSizeBytes.sum  | Total size in bytes of all requests made to the Amazon S3 dead-letter queue. | 
|  dynamodb-pipeline.recordsProcessed.count  | Total number of records processed in the pipeline, a key metric for overal throughput. | 
|  dynamodb.changeEventsProcessed.count  | No records are being gathered from DynamoDB streams. This could be due to no activitiy on the table, an export being in progress, or an issue accessing the DynamoDB streams. | 
|  `dynamodb.exportJobFailure.count`  | The attempt to trigger an export to S3 failed. | 
|  `dynamodb-pipeline.opensearch.bulkRequestInvalidInputErrors.count`  | Count of bulk request errors in OpenSearch due to invalid input, crucial for monitoring data quality and operational issues. | 
|  opensearch.EndToEndLatency.avg  | The end to end latnecy is higher than desired for reading from DynamoDB streams. This could be due to an underscaled OpenSearch cluster or a maximum pipeline OCU capacity that is too low for the WCU throughput on the DynamoDB table. This end to end latency will be high after an export and should decrease over time as it catches up to the latest DynamoDB streams. | 

# Using an OpenSearch Ingestion pipeline with Amazon DocumentDB
<a name="configure-client-docdb"></a>

You can use the [DocumentDB](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/documentdb/) plugin to stream document changes, such as creates, updates, and deletes, to Amazon OpenSearch Service. The pipeline supports change data capture (CDC), if available, or API polling for high-scale, low-latency streaming.

You can process data with or without a full initial snapshot. A full snapshot captures an entire Amazon DocumentDB collection and uploads it to Amazon S3. The pipeline then sends the data to one or more OpenSearch indexes. After it ingests the snapshot, the pipeline synchronizes ongoing changes to maintain consistency and eventually catches up to near real-time updates.

If you already have a full snapshot from another source, or only need to process new events, you can stream without a snapshot. In this case, the pipeline reads directly from Amazon DocumentDB change streams without an initial bulk load.

If you enable streaming, you must [enable a change stream](https://docs.aws.amazon.com/documentdb/latest/developerguide/change_streams.html#change_streams-enabling) on your Amazon DocumentDB collection. However, if you only perform a full load or export, you don’t need a change stream.

## Prerequisites
<a name="s3-prereqs"></a>

Before you create your OpenSearch Ingestion pipeline, perform the following steps:

1. Create a Amazon DocumentDB cluster with permission to read data by following the steps in [Create an Amazon DocumentDB cluster](https://docs.aws.amazon.com/documentdb/latest/developerguide/get-started-guide.html#cloud9-cluster) in the *Amazon DocumentDB Developer Guide*. If you use CDC infrastructure, configure your Amazon DocumentDB cluster to publish change streams. 

1. Enable TLS on your Amazon DocumentDB cluster.

1. Set up a VPC CIDR of a private address space for use with OpenSearch Ingestion.

1. Set up authentication on your Amazon DocumentDB cluster with AWS Secrets Manager. Enable secrets rotation by following the steps in [Automatically rotating passwords for Amazon DocumentDB](https://docs.aws.amazon.com/documentdb/latest/developerguide/security.managing-users.html#security.managing-users-rotating-passwords). For more information, see [Database access using Role-Based Access Control](https://docs.aws.amazon.com/documentdb/latest/developerguide/role_based_access_control.html) and [Security in Amazon DocumentDB](https://docs.aws.amazon.com/documentdb/latest/developerguide/security.html).

1. If you use a change stream to subscribe to data changes on your Amazon DocumentDB collection, avoid data loss by extending the retention period to up to 7 days using the `change_stream_log_retention_duration` parameter. Change streams events are stored for 3 hours, by default, after the event has been recorded, which isn't enough time for large collections. To modify the change stream retention period, see [Modifying the change stream log retention duration](https://docs.aws.amazon.com/documentdb/latest/developerguide/change_streams.html#change_streams-modifying_log_retention).

1. Create an OpenSearch Service domain or OpenSearch Serverless collection. For more information, see [Creating OpenSearch Service domains](createupdatedomains.md#createdomains) and [Creating collections](serverless-create.md).

1. Attach a [resource-based policy](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource) to your domain or a [data access policy](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-data-access.html) to your collection. These access policies allow OpenSearch Ingestion to write data from your Amazon DocumentDB cluster to your domain or collection. 

   The following sample domain access policy allows the pipeline role, which you create in the next step, to write data to a domain. Make sure that you update the `resource` with your own ARN. 

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "AWS": "arn:aws:iam::444455556666:role/pipeline-role"
         },
         "Action": [
           "es:DescribeDomain",
           "es:ESHttp*"
         ],
         "Resource": [
           "arn:aws:es:us-east-1:111122223333:domain/domain-name"
         ]
       }
     ]
   }
   ```

------

   To create an IAM role with the correct permissions to access write data to the collection or domain, see [Setting up roles and users in Amazon OpenSearch Ingestion](pipeline-security-overview.md).

## Step 1: Configure the pipeline role
<a name="docdb-pipeline-role"></a>

After you have your Amazon DocumentDB pipeline prerequisites set up, [configure the pipeline role](pipeline-security-overview.md#pipeline-security-sink) that you want to use in your pipeline configuration, and add the following Amazon DocumentDB permissions in the role:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "allowS3ListObjectAccess",
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::s3-bucket"
            ],
            "Condition": {
                "StringLike": {
                    "s3:prefix": "s3-prefix/*"
                }
            }
        },
        {
            "Sid": "allowReadAndWriteToS3ForExportStream",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:DeleteObject"
            ],
            "Resource": [
                "arn:aws:s3:::s3-bucket/s3-prefix/*"
            ]
        },
        {
            "Sid": "SecretsManagerReadAccess",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetSecretValue"
            ],
            "Resource": [
                "arn:aws:secretsmanager:us-east-1:111122223333:secret:secret-name"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:AttachNetworkInterface",
                "ec2:CreateNetworkInterface",
                "ec2:CreateNetworkInterfacePermission",
                "ec2:DeleteNetworkInterface",
                "ec2:DeleteNetworkInterfacePermission",
                "ec2:DetachNetworkInterface",
                "ec2:DescribeNetworkInterfaces"
            ],
            "Resource": [
                "arn:aws:ec2:*:111122223333:network-interface/*",
                "arn:aws:ec2:*:111122223333:subnet/*",
                "arn:aws:ec2:*:111122223333:security-group/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:DescribeDhcpOptions",
                "ec2:DescribeRouteTables",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeSubnets",
                "ec2:DescribeVpcs",
                "ec2:Describe*"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:CreateTags"
            ],
            "Resource": "arn:aws:ec2:*:*:network-interface/*",
            "Condition": {
                "StringEquals": {
                    "aws:RequestTag/OSISManaged": "true"
                }
            }
        }
    ]
}
```

------

You must provide the above Amazon EC2 permissions on the IAM role that you use to create the OpenSearch Ingestion pipeline because the pipeline uses these permissions to create and delete a network interface in your VPC. The pipeline can only access the Amazon DocumentDB cluster through this network interface.

## Step 2: Create the pipeline
<a name="docdb-pipeline"></a>

You can then configure an OpenSearch Ingestion pipeline like the following, which specifies Amazon DocumentDB as the source. Note that to populate the index name, the `getMetadata` function uses `documentdb_collection` as a metadata key. If you want to use a different index name without the `getMetadata` method, you can use the configuration `index: "my_index_name"`.

```
version: "2"
documentdb-pipeline:
  source:
    documentdb:
      acknowledgments: true
      host: "https://docdb-cluster-id.us-east-1.docdb.amazonaws.com"
      port: 27017
      authentication:
        username: ${aws_secrets:secret:username}
        password: ${aws_secrets:secret:password}
      aws:
      s3_bucket: "bucket-name"
      s3_region: "bucket-region" 
      s3_prefix: "path" #optional path for storing the temporary data
      collections:
        - collection: "dbname.collection"
          export: true
          stream: true
  sink:
  - opensearch:
      hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"]
      index: "${getMetadata(\"documentdb_collection\")}"
      index_type: custom
      document_id: "${getMetadata(\"primary_key\")}"
      action: "${getMetadata(\"opensearch_action\")}"
      document_version: "${getMetadata(\"document_version\")}"
      document_version_type: "external"
extension:
  aws:
    secrets:
      secret:
        secret_id: "my-docdb-secret"
        region: "us-east-1"
        refresh_interval: PT1H
```

You can use a preconfigured Amazon DocumentDB blueprint to create this pipeline. For more information, see [Working with blueprints](pipeline-blueprint.md).

If you're using the AWS Management Console to create your pipeline, you must also attach your pipeline to your VPC in order to use Amazon DocumentDB as a source. To do so, find the **Source network options** section, select the **Attach to VPC** checkbox, and choose your CIDR from one of the provided default options. You can use any CIDR from a private address space as defined in the [RFC 1918 Best Current Practice](https://datatracker.ietf.org/doc/html/rfc1918).

To provide a custom CIDR, select **Other** from the dropdown menu. To avoid a collision in IP addresses between OpenSearch Ingestion and Amazon DocumentDB, ensure that the Amazon DocumentDB VPC CIDR is different from the CIDR for OpenSearch Ingestion.

For more information, see [Configuring VPC access for a pipeline](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-security.html#pipeline-vpc-configure).

## Data consistency
<a name="docdb-pipeline-consistency"></a>

The pipeline ensures data consistency by continuously polling or receiving changes from the Amazon DocumentDB cluster and updating the corresponding documents in the OpenSearch index.

OpenSearch Ingestion supports end-to-end acknowledgement to ensure data durability. When a pipeline reads snapshots or streams, it dynamically creates partitions for parallel processing. The pipeline marks a partition as complete when it receives an acknowledgement after ingesting all records in the OpenSearch domain or collection. 

If you want to ingest into an OpenSearch Serverless *search* collection, you can generate a document ID in the pipeline. If you want to ingest into an OpenSearch Serverless *time series* collection, note that the pipeline doesn't generate a document ID, so you must omit `document_id: "${getMetadata(\"primary_key\")}"` in your pipeline sink configuration. 

An OpenSearch Ingestion pipeline also maps incoming event actions into corresponding bulk indexing actions to help ingest documents. This keeps data consistent, so that every data change in Amazon DocumentDB is reconciled with the corresponding document changes in OpenSearch.

## Mapping data types
<a name="docdb-pipeline-mapping"></a>

OpenSearch Service dynamically maps data types in each incoming document to the corresponding data type in Amazon DocumentDB. The following table shows how OpenSearch Service automatically maps various data types.


| Data type | OpenSearch | Amazon DocumentDB | 
| --- | --- | --- | 
| Integer |  OpenSearch automatically maps Amazon DocumentDB integer values to OpenSearch integers. OpenSearch dynamically maps the field based on the first sent document. If you have a mix of data types for the same attribute in Amazon DocumentDB, automatic mapping might fail.  For example, if your first document has an attribute that is a long, and a later document has that same attribute as an integer, OpenSearch fails to ingest the second document. In these cases, you should provide an explicit mapping template that chooses the most flexible number type, such as the following: <pre>{<br /> "template": {<br />  "mappings": {<br />   "properties": {<br />    "MixedNumberField": {<br />     "type": "float"<br />    }<br />   }<br />  }<br /> }<br />}</pre>  |  Amazon DocumentDB supports [integers](https://docs.aws.amazon.com/documentdb/latest/developerguide/mongo-apis.html#mongo-apis-data-types).  | 
| Long |  OpenSearch automatically maps Amazon DocumentDB long values to OpenSearch longs. OpenSearch dynamically maps the field based on the first sent document. If you have a mix of data types for the same attribute in Amazon DocumentDB, automatic mapping might fail.  For example, if your first document has an attribute that is a long, and a later document has that same attribute as an integer, OpenSearch fails to ingest the second document. In these cases, you should provide an explicit mapping template that chooses the most flexible number type, such as the following: <pre>{<br /> "template": {<br />  "mappings": {<br />   "properties": {<br />    "MixedNumberField": {<br />     "type": "float"<br />    }<br />   }<br />  }<br /> }<br />}</pre>  |  Amazon DocumentDB supports [longs](https://docs.aws.amazon.com/documentdb/latest/developerguide/mongo-apis.html#mongo-apis-data-types).  | 
| String |  OpenSearch automatically maps string values as text. In some situations, such as enumerated values, you can map to the keyword type. The following example shows how to map a Amazon DocumentDB attribute named `PartType` to an OpenSearch keyword. <pre>{<br /> "template": {<br />  "mappings": {<br />   "properties": {<br />    "PartType": {<br />     "type": "keyword"<br />    }<br />   }<br />  }<br /> }<br />}</pre>  |  Amazon DocumentDB supports [strings](https://docs.aws.amazon.com/documentdb/latest/developerguide/mongo-apis.html#mongo-apis-data-types).  | 
| Double |  OpenSearch automatically maps Amazon DocumentDB double values to OpenSearch doubles. OpenSearch dynamically maps the field based on the first sent document. If you have a mix of data types for the same attribute in Amazon DocumentDB, automatic mapping might fail.  For example, if your first document has an attribute that is a long, and a later document has that same attribute as an integer, OpenSearch fails to ingest the second document. In these cases, you should provide an explicit mapping template that chooses the most flexible number type, such as the following: <pre>{<br /> "template": {<br />  "mappings": {<br />   "properties": {<br />    "MixedNumberField": {<br />     "type": "float"<br />    }<br />   }<br />  }<br /> }<br />}</pre>  | Amazon DocumentDB supports [doubles](https://docs.aws.amazon.com/documentdb/latest/developerguide/mongo-apis.html#mongo-apis-data-types). | 
| Date |  By default, date maps to an integer in OpenSearch. You can define a custom mapping template to map a date to an OpenSearch date. <pre>{<br /> "template": {<br />  "mappings": {<br />   "properties": {<br />    "myDateField": {<br />     "type": "date",<br />     "format": "epoch_second"<br />    }<br />   }<br />  }<br /> }<br />}</pre>  | Amazon DocumentDB supports [dates](https://docs.aws.amazon.com/documentdb/latest/developerguide/mongo-apis.html#mongo-apis-data-types). | 
| Timestamp |  By default, timestamp maps to an integer in OpenSearch. You can define a custom mapping template to map a date to an OpenSearch date. <pre>{<br /> "template": {<br />  "mappings": {<br />   "properties": {<br />    "myTimestampField": {<br />     "type": "date",<br />     "format": "epoch_second"<br />    }<br />   }<br />  }<br /> }<br />}</pre>  | Amazon DocumentDB supports [timestamps](https://docs.aws.amazon.com/documentdb/latest/developerguide/mongo-apis.html#mongo-apis-data-types). | 
| Boolean |  OpenSearch maps a Amazon DocumentDB Boolean type into an OpenSearch Boolean type.  |  Amazon DocumentDB supports [Boolean type attributes](https://docs.aws.amazon.com/documentdb/latest/developerguide/mongo-apis.html#mongo-apis-data-types).  | 
| Decimal |  OpenSearch maps Amazon DocumentDB map attributes to nested fields. The same mappings apply within a nested field. The following example maps a string in a nested field to a keyword type in OpenSearch: <pre>{<br /> "template": {<br />  "mappings": {<br />   "properties": {<br />    "myDecimalField": {<br />     "type": "double"<br />    }<br />   }<br />  }<br /> }<br />}</pre> With this custom mapping, you can query and aggregate the field with double-level precision. The original value retains the full precision in the `_source` property of the OpenSearch document. Without this mapping, OpenSearch uses text by default.  | Amazon DocumentDB supports [decimals](https://docs.aws.amazon.com/documentdb/latest/developerguide/mongo-apis.html#mongo-apis-data-types). | 
| Regular Expression | The regex type creates nested fields. These include <myFieldName>.pattern and <myFieldName>.options. |  Amazon DocumentDB supports [regular expressions](https://docs.aws.amazon.com/documentdb/latest/developerguide/mongo-apis.html#mongo-apis-data-types).  | 
| Binary Data |  OpenSearch automatically maps Amazon DocumentDB binary data to OpenSearch text. You can provide a mapping to write these as binary fields in OpenSearch.  The following example shows how to map a Amazon DocumentDB field named `imageData` to an OpenSearch binary field. <pre>{<br /> "template": {<br />  "mappings": {<br />   "properties": {<br />    "imageData": {<br />     "type": "binary"<br />    }<br />   }<br />  }<br /> }<br />}</pre>  | Amazon DocumentDB supports[binary data fields](https://docs.aws.amazon.com/documentdb/latest/developerguide/mongo-apis.html#mongo-apis-data-types). | 
| ObjectId | Fields with a type of objectId map to OpenSearch text fields. The value will be the string representation of the objectId.  | Amazon DocumentDB supports [objectIds](https://docs.aws.amazon.com/documentdb/latest/developerguide/mongo-apis.html#mongo-apis-data-types). | 
| Null |  OpenSearch can ingest documents with the Amazon DocumentDB null type. It saves the value as a null value in the document. There is no mapping for this type, and this field is not indexed or searchable. If the same attribute name is used for a null type and then later changes to different type such as string, OpenSearch creates a dynamic mapping for the first non-null value. Subsequent values can still be Amazon DocumentDB null values.  | Amazon DocumentDB supports [null type fields](https://docs.aws.amazon.com/documentdb/latest/developerguide/mongo-apis.html#mongo-apis-data-types). | 
| Undefined |  OpenSearch can ingest documents with the Amazon DocumentDB undefined type. It saves the value as a null value in the document. There is no mapping for this type, and this field is not indexed or searchable. If the same field name is used for a undefined type and then later changes to different type such as string, OpenSearch creates a dynamic mapping for the first non-undefined value. Subsequent values can still be Amazon DocumentDB undefined values.  | Amazon DocumentDB supports [undefined type fields](https://docs.aws.amazon.com/documentdb/latest/developerguide/mongo-apis.html#mongo-apis-data-types). | 
| MinKey |  OpenSearch can ingest documents with the Amazon DocumentDB minKey type. It saves the value as a null value in the document. There is no mapping for this type, and this field is not indexed or searchable. If the same field name is used for a minKey type and then later changes to different type such as string, OpenSearch creates a dynamic mapping for the first non-minKey value. Subsequent values can still be Amazon DocumentDB minKey values.  | Amazon DocumentDB supports [minKey type fields](https://docs.aws.amazon.com/documentdb/latest/developerguide/mongo-apis.html#mongo-apis-data-types). | 
| MaxKey |  OpenSearch can ingest documents with the Amazon DocumentDB maxKey type. It saves the value as a null value in the document. There is no mapping for this type, and this field is not indexed or searchable. If the same field name is used for a maxKey type and then later changes to different type such as string, OpenSearch creates a dynamic mapping for the first non-maxKey value. Subsequent values can still be Amazon DocumentDB maxKey values.  | Amazon DocumentDB supports [maxKey type fields](https://docs.aws.amazon.com/documentdb/latest/developerguide/mongo-apis.html#mongo-apis-data-types). | 

We recommend that you configure the dead-letter queue (DLQ) in your OpenSearch Ingestion pipeline. If you've configured the queue, OpenSearch Service sends all failed documents that can't be ingested due to dynamic mapping failures to the queue. 

In case automatic mappings fail, you can use `template_type` and `template_content` in your pipeline configuration to define explicit mapping rules. Alternatively, you can create mapping templates directly in your search domain or collection before you start the pipeline. 

## Limitations
<a name="docdb-pipeline-limitations"></a>

Consider the following limitations when you set up an OpenSearch Ingestion pipeline for Amazon DocumentDB:
+ The OpenSearch Ingestion integration with Amazon DocumentDB currently doesn't support cross-Region ingestion. Your Amazon DocumentDB cluster and OpenSearch Ingestion pipeline must be in the same AWS Region.
+ The OpenSearch Ingestion integration with Amazon DocumentDB currently doesn't support cross-account ingestion. Your Amazon DocumentDB cluster and OpenSearch Ingestion pipeline must be in the same AWS account.
+ An OpenSearch Ingestion pipeline supports only one Amazon DocumentDB cluster as its source. 
+ The OpenSearch Ingestion integration with Amazon DocumentDB specifically supports Amazon DocumentDB instance-based clusters. It doesn't support Amazon DocumentDB elastic clusters.
+ The OpenSearch Ingestion integration only supports AWS Secrets Manager as an authentication mechanism for your Amazon DocumentDB cluster.
+ You can't update the existing pipeline configuration to ingest data from a different database or collection. Instead, you must create a new pipeline. 

## Recommended CloudWatch alarms
<a name="cloudwatch-metrics-docdb"></a>

For the best performance, we recommend that you use the following CloudWatch alarms when you create an OpenSearch Ingestion pipeline to access an Amazon DocumentDB cluster as a source.


| CloudWatch Alarm | Description | 
| --- | --- | 
| <pipeline-name>.doucmentdb.credentialsChanged | This metric indicates how often AWS secrets are rotated.  | 
| <pipeline-name>.doucmentdb.executorRefreshErrors | This metric indicates failures to refresh AWS secrets.  | 
| <pipeline-name>.doucmentdb.exportRecordsTotal |  This metric indicates the number of records exported from Amazon DocumentDB.  | 
| <pipeline-name>.doucmentdb.exportRecordsProcessed | This metric indicates the number of records processed by OpenSearch Ingestion pipeline.  | 
| <pipeline-name>.doucmentdb.exportRecordProcessingErrors |  This metric indicates number of processing errors in an OpenSearch Ingestion pipeline while reading the data from an Amazon DocumentDB cluster.  | 
| <pipeline-name>.doucmentdb.exportRecordsSuccessTotal |  This metric indicates the total number of export records processed successfully.  | 
| <pipeline-name>.doucmentdb.exportRecordsFailedTotal |  This metric indicates the total number of export records that failed to process.  | 
| <pipeline-name>.doucmentdb.bytesReceived |  This metrics indicates the total number of bytes received by an OpenSearch Ingestion pipeline.  | 
| <pipeline-name>.doucmentdb.bytesProcessed |  This metrics indicates the total number of bytes processed by an OpenSearch Ingestion pipeline.  | 
| <pipeline-name>.doucmentdb.exportPartitionQueryTotal |  This metric indicates the export partition total.  | 
| <pipeline-name>.doucmentdb.streamRecordsSuccessTotal |  This metric indicates the number of records successfully processed from the stream.  | 
| <pipeline-name>.doucmentdb.streamRecordsFailedTotal |  This metrics indicates the total number of records failed to process from the stream.  | 

# Using an OpenSearch Ingestion pipeline with Confluent Cloud Kafka
<a name="configure-client-confluent-kafka"></a>

You can use an OpenSearch Ingestion pipeline to stream data from Confluent Cloud Kafka clusters to Amazon OpenSearch Service domains and OpenSearch Serverless collections. OpenSearch Ingestion supports both public and private network configurations for the streaming of data from Confluent Cloud Kafka clusters to domains or collections managed by OpenSearch Service or OpenSearch Serverless. 

## Connectivity to Confluent Cloud public Kafka clusters
<a name="confluent-cloud-kafka-public"></a>

You can use OpenSearch Ingestion pipelines to migrate data from a Confluent Cloud Kafka cluster with a public configuration, which means that the domain DNS name can be publicly resolved. To do so, set up an OpenSearch Ingestion pipeline with Confluent Cloud public Kafka cluster as the source and OpenSearch Service or OpenSearch Serverless as the destination. This processes your streaming data from a self-managed source cluster to an AWS-managed destination domain or collection. 

### Prerequisites
<a name="confluent-cloud-kafka-public-prereqs"></a>

Before you create your OpenSearch Ingestion pipeline, perform the following steps:

1. Create a Confluent Cloud Kafka clusters cluster acting as a source. The cluster should contain the data you want to ingest into OpenSearch Service.

1. Create an OpenSearch Service domain or OpenSearch Serverless collection where you want to migrate data to. For more information, see [Creating OpenSearch Service domains](createupdatedomains.md#createdomains) and [Creating collections](serverless-create.md).

1. Set up authentication on your Confluent Cloud Kafka cluster with AWS Secrets Manager. Enable secrets rotation by following the steps in [Rotate AWS Secrets Manager secrets](https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets.html).

1. Attach a [resource-based policy](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource) to your domain or a [data access policy](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-data-access.html) to your collection. These access policies allow OpenSearch Ingestion to write data from your self-managed cluster to your domain or collection. 

   The following sample domain access policy allows the pipeline role, which you create in the next step, to write data to a domain. Make sure that you update the `resource` with your own ARN. 

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "AWS": "arn:aws:iam::444455556666:role/pipeline-role"
         },
         "Action": [
           "es:DescribeDomain",
           "es:ESHttp*"
         ],
         "Resource": [
           "arn:aws:es:us-east-1:111122223333:domain/domain-name"
         ]
       }
     ]
   }
   ```

------

   To create an IAM role with the correct permissions to access write data to the collection or domain, see [Setting up roles and users in Amazon OpenSearch Ingestion](pipeline-security-overview.md).

### Step 1: Configure the pipeline role
<a name="confluent-cloud-kafka-public-pipeline-role"></a>

After you have your Confluent Cloud Kafka cluster pipeline prerequisites set up, [configure the pipeline role](pipeline-security-overview.md#pipeline-security-sink) that you want to use in your pipeline configuration, and add permission to write to an OpenSearch Service domain or OpenSearch Serverless collection, as well as permission to read secrets from Secrets Manager.

The following permission is needed to manage the network interface: 

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "ec2:AttachNetworkInterface",
                "ec2:CreateNetworkInterface",
                "ec2:CreateNetworkInterfacePermission",
                "ec2:DeleteNetworkInterface",
                "ec2:DeleteNetworkInterfacePermission",
                "ec2:DetachNetworkInterface",
                "ec2:DescribeNetworkInterfaces"
            ],
            "Resource": [
                "arn:aws:ec2:us-east-1:111122223333:network-interface/*",
                "arn:aws:ec2:us-east-1:111122223333:subnet/*",
                "arn:aws:ec2:us-east-1:111122223333:security-group/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:DescribeDhcpOptions",
                "ec2:DescribeRouteTables",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeSubnets",
                "ec2:DescribeVpcs",
                "ec2:Describe*"
            ],
            "Resource": "arn:aws:ec2:us-east-1:111122223333:subnet/*"
        },
        { 
            "Effect": "Allow",
            "Action": [ "ec2:CreateTags" ],
            "Resource": "arn:aws:ec2:us-east-1:111122223333:network-interface/*",
            "Condition": { 
               "StringEquals": { "aws:RequestTag/OSISManaged": "true" } 
            } 
        }
    ]
}
```

------

The following is permission needed to read secrets from AWS Secrets Manager service:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "SecretsManagerReadAccess",
            "Effect": "Allow",
            "Action": ["secretsmanager:GetSecretValue"],
            "Resource": ["arn:aws:secretsmanager:us-east-1:111122223333:secret:,secret-name"]
        }
    ]
}
```

------

The following permissions are needed to write to an Amazon OpenSearch Service domain:

```
{
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::account-id:role/pipeline-role"
      },
      "Action": ["es:DescribeDomain", "es:ESHttp*"],
      "Resource": "arn:aws:es:region:account-id:domain/domain-name/*"
    }
  ]
}
```

### Step 2: Create the pipeline
<a name="confluent-cloud-kafka-public-pipeline"></a>

You can then configure an OpenSearch Ingestion pipeline like the following, which specifies your Confluent Cloud Kafka as the source. 

You can specify multiple OpenSearch Service domains as destinations for your data. This capability enables conditional routing or replication of incoming data into multiple OpenSearch Service domains.

You can also migrate data from a source Confluent Kafka cluster to an OpenSearch Serverless VPC collection. Ensure you provide a network access policy within the pipeline configuration. You can use a Confluent schema registry to define a Confluent schema.

```
version: "2"
kafka-pipeline:
  source:
    kafka:
      encryption:
        type: "ssl"
      topics:
        - name: "topic-name"
          group_id: "group-id"
      bootstrap_servers:
        - "bootstrap-server.us-east-1.aws.private.confluent.cloud:9092"
      authentication:
        sasl:
          plain:
            username: ${aws_secrets:confluent-kafka-secret:username}
            password: ${aws_secrets:confluent-kafka-secret:password}
      schema:
        type: confluent
        registry_url: https://my-registry.us-east-1.aws.confluent.cloud
        api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}"
        api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}"
        basic_auth_credentials_source: "USER_INFO"
  sink:
  - opensearch:
      hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"]
      aws:
          region: "us-east-1"
  aws:
    secrets:
      confluent-kafka-secret:
        secret_id: "my-kafka-secret"
        region: "us-east-1"
      schema-secret:
        secret_id: "my-self-managed-kafka-schema"
        region: "us-east-1"
```

You can use a preconfigured blueprint to create this pipeline. For more information, see [Working with blueprints](pipeline-blueprint.md).

### Connectivity to Confluent Cloud Kafka clusters in a VPC
<a name="confluent-cloud-kafka-private"></a>

You can also use OpenSearch Ingestion pipelines to migrate data from a Confluent Cloud Kafka cluster running in a VPC. To do so, set up an OpenSearch Ingestion pipeline with a Confluent Cloud Kafka cluster as a source and OpenSearch Service or OpenSearch Serverless as the destination. This processes your streaming data from a Confluent Cloud Kafka source cluster to an AWS-managed destination domain or collection. 

 OpenSearch Ingestion supports Confluent Cloud Kafka clusters configured in all supported network modes in Confluent. The following modes of network configuration are supported as a source in OpenSearch Ingestion:
+ AWS VPC peering
+  AWS PrivateLink for dedicated clusters
+  AWS PrivateLink for Enterprise clusters
+ AWS Transit Gateway

#### Prerequisites
<a name="confluent-cloud-kafka-private-prereqs"></a>

Before you create your OpenSearch Ingestion pipeline, perform the following steps:

1. Create a Confluent Cloud Kafka cluster with a VPC network configuration that contains the data you want to ingest into OpenSearch Service. 

1. Create an OpenSearch Service domain or OpenSearch Serverless collection where you want to migrate data to. For more information, see For more information, see [Creating OpenSearch Service domains](createupdatedomains.md#createdomains) and [Creating collections](serverless-create.md).

1. Set up authentication on your Confluent Cloud Kafka cluster with AWS Secrets Manager. Enable secrets rotation by following the steps in [Rotate AWS Secrets Manager secrets](https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets.html).

1. Obtain the ID of the VPC that has access to the Confluent Cloud Kafka cluster. Choose the VPC CIDR to be used by OpenSearch Ingestion.
**Note**  
If you're using the AWS Management Console to create your pipeline, you must also attach your OpenSearch Ingestion pipeline to your VPC in order to use Confluent Cloud Kafka cluster. To do so, find the **Network configuration** section, select the **Attach to VPC** checkbox, and choose your CIDR from one of the provided default options, or select your own. You can use any CIDR from a private address space as defined in the [RFC 1918 Best Current Practice](https://datatracker.ietf.org/doc/html/rfc1918).  
To provide a custom CIDR, select **Other** from the dropdown menu. To avoid a collision in IP addresses between OpenSearch Ingestion and self-managed OpenSearch, ensure that the self-managed OpenSearch VPC CIDR is different from the CIDR for OpenSearch Ingestion.

1. Attach a [resource-based policy](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource) to your domain or a [data access policy](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-data-access.html) to your collection. These access policies allow OpenSearch Ingestion to write data from your self-managed cluster to your domain or collection. 
**Note**  
If you are using AWS PrivateLink to connect your Confluent Cloud Kafka, you will need to configure [VPC DHCP Options](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_DHCP_Options.html). *DNS hostnames* and *DNS resolution*should be enabled.  
Specifically, use the following option set values:  
**Enterprise clusters:**  

   ```
   domain-name: aws.private.confluent.cloud
   domain-name-servers: AmazonProvidedDNS
   ```
**Dedicated clusters:**  

   ```
   domain-name: aws.confluent.cloud
   domain-name-servers: AmazonProvidedDNS
   ```
This change ensures that DNS resolution for the Confluent PrivateLink endpoint works correctly within the VPC.

   The following sample domain access policy allows the pipeline role, which you create in the next step, to write data to a domain. Make sure that you update the `resource` with your own ARN. 

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "AWS": "arn:aws:iam::444455556666:role/pipeline-role"
         },
         "Action": [
           "es:DescribeDomain",
           "es:ESHttp*"
         ],
         "Resource": [
           "arn:aws:es:us-east-1:111122223333:domain/domain-name"
         ]
       }
     ]
   }
   ```

------

   To create an IAM role with the correct permissions to access write data to the collection or domain, see [Setting up roles and users in Amazon OpenSearch Ingestion](pipeline-security-overview.md).

#### Step 1: Configure the pipeline role
<a name="confluent-cloud-kafka-private-pipeline-role"></a>

After you have your pipeline prerequisites set up, [configure the pipeline role](pipeline-security-overview.md#pipeline-security-sink) that you want to use in your pipeline configuration, and add the following permissions in the role:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "SecretsManagerReadAccess",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetSecretValue"
            ],
            "Resource": ["arn:aws:secretsmanager:us-east-1:111122223333:secret:secret-name"]
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:AttachNetworkInterface",
                "ec2:CreateNetworkInterface",
                "ec2:CreateNetworkInterfacePermission",
                "ec2:DeleteNetworkInterface",
                "ec2:DeleteNetworkInterfacePermission",
                "ec2:DetachNetworkInterface",
                "ec2:DescribeNetworkInterfaces"
            ],
            "Resource": [
                "arn:aws:ec2:*:*:network-interface/*",
                "arn:aws:ec2:*:*:subnet/*",
                "arn:aws:ec2:*:*:security-group/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:DescribeDhcpOptions",
                "ec2:DescribeRouteTables",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeSubnets",
                "ec2:DescribeVpcs",
                "ec2:Describe*"
            ],
            "Resource": "*"
        },
        { 
            "Effect": "Allow",
            "Action": [ 
                "ec2:CreateTags"
            ],
            "Resource": "arn:aws:ec2:*:*:network-interface/*",
            "Condition": { 
               "StringEquals": 
                    {
                        "aws:RequestTag/OSISManaged": "true"
                    } 
            } 
        }
    ]
}
```

------

You must provide the above Amazon EC2 permissions on the IAM role that you use to create the OpenSearch Ingestion pipeline because the pipeline uses these permissions to create and delete a network interface in your VPC. The pipeline can only access the Kafka cluster through this network interface.

#### Step 2: Create the pipeline
<a name="self-managed-kafka-private-pipeline"></a>

You can then configure an OpenSearch Ingestion pipeline like the following, which specifies Kafka as the source.

You can specify multiple OpenSearch Service domains as destinations for your data. This capability enables conditional routing or replication of incoming data into multiple OpenSearch Service domains.

You can also migrate data from a source Confluent Kafka cluster to an OpenSearch Serverless VPC collection. Ensure you provide a network access policy within the pipeline configuration. You can use a Confluent schema registry to define a Confluent schema.

```
 version: "2"
kafka-pipeline:
  source:
    kafka:
      encryption:
        type: "ssl"
      topics:
        - name: "topic-name"
          group_id: "group-id"
      bootstrap_servers:
        - "bootstrap-server.us-east-1.aws.private.confluent.cloud:9092"
      authentication:
        sasl:
          plain:
            username: ${aws_secrets:confluent-kafka-secret:username}
            password: ${aws_secrets:confluent-kafka-secret:password}
      schema:
        type: confluent
        registry_url: https://my-registry.us-east-1.aws.confluent.cloud
        api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}"
        api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}"
        basic_auth_credentials_source: "USER_INFO"
  sink:
  - opensearch:
      hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"]
      aws:
          region: "us-east-1"
      index: "confluent-index"
extension:
  aws:
    secrets:
      confluent-kafka-secret:
        secret_id: "my-kafka-secret"
        region: "us-east-1"
      schema-secret:
        secret_id: "my-self-managed-kafka-schema"
        region: "us-east-2"
```

# Using an OpenSearch Ingestion pipeline with Amazon Managed Streaming for Apache Kafka
<a name="configure-client-msk"></a>

You can use the [Kafka plugin](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/) to ingest data from [Amazon Managed Streaming for Apache Kafka](https://docs.aws.amazon.com/msk/latest/developerguide/) (Amazon MSK) into your OpenSearch Ingestion pipeline. With Amazon MSK, you can build and run applications that use Apache Kafka to process streaming data. OpenSearch Ingestion uses AWS PrivateLink to connect to Amazon MSK. You can ingest data from both Amazon MSK and Amazon MSK Serverless clusters. The only difference between the two processes is the prerequisite steps you must take before you set up your pipeline.

**Topics**
+ [Provisioned Amazon MSK prerequisites](#msk-prereqs)
+ [Amazon MSK Serverless prerequisites](#msk-serverless-prereqs)
+ [Step 1: Configure a pipeline role](#msk-pipeline-role)
+ [Step 2: Create the pipeline](#msk-pipeline)
+ [Step 3: (Optional) Use the AWS Glue Schema Registry](#msk-glue)
+ [Step 4: (Optional) Configure recommended compute units (OCUs) for the Amazon MSK pipeline](#msk-ocu)

## Provisioned Amazon MSK prerequisites
<a name="msk-prereqs"></a>

Before you create your OpenSearch Ingestion pipeline, perform the following steps:

1. Create an Amazon MSK provisioned cluster by following the steps in [Creating a cluster](https://docs.aws.amazon.com/msk/latest/developerguide/msk-create-cluster.html#create-cluster-console) in the *Amazon Managed Streaming for Apache Kafka Developer Guide*. For **Broker type**, choose any option except for `t3` types, as these aren't supported by OpenSearch Ingestion.

1. After the cluster has an **Active** status, follow the steps in [Turn on multi-VPC connectivity](https://docs.aws.amazon.com/msk/latest/developerguide/aws-access-mult-vpc.html#mvpc-cluster-owner-action-turn-on).

1. Follow the steps in [Attach a cluster policy to the MSK cluster](https://docs.aws.amazon.com/msk/latest/developerguide/aws-access-mult-vpc.html#mvpc-cluster-owner-action-policy) to attach one of the following policies, depending on if your cluster and pipeline are in the same AWS account. This policy allows OpenSearch Ingestion to create a AWS PrivateLink connection to your Amazon MSK cluster and read data from Kafka topics. Make sure that you update the `resource` with your own ARN. 

   The following policies applies when your cluster and pipeline are in the same AWS account:

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       },
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis-pipelines.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:GetBootstrapBrokers",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       }
     ]
   }
   ```

------

   If your Amazon MSK cluster is in a different AWS account than your pipeline, attach the following policy instead. Note that cross-account access is only possible with provisioned Amazon MSK clusters and not Amazon MSK Serverless clusters. The ARN for the AWS `principal` should be the ARN for the same pipeline role that you provide to your pipeline configuration:

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       },
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis-pipelines.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:GetBootstrapBrokers",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       },
       {
         "Effect": "Allow",
         "Principal": {
           "AWS": "arn:aws:iam::444455556666:role/pipeline-role"
         },
         "Action": [
           "kafka-cluster:*",
           "kafka:*"
         ],
         "Resource": [
           "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id",
           "arn:aws:kafka:us-east-1:111122223333:topic/cluster-name/cluster-id/*",
           "arn:aws:kafka:us-east-1:111122223333:group/cluster-name/*"
         ]
       }
     ]
   }
   ```

------

1. Create a Kafka topic by following the steps in [Create a topic](https://docs.aws.amazon.com/msk/latest/developerguide/create-topic.html). Make sure that `BootstrapServerString` is one of the private endpoint (single-VPC) bootstrap URLs. The value for `--replication-factor` should be `2` or `3`, based on the number of zones your Amazon MSK cluster has. The value for `--partitions` should be at least `10`.

1. Produce and consume data by following the steps in [Produce and consume data](https://docs.aws.amazon.com/msk/latest/developerguide/produce-consume.html). Again, make sure that `BootstrapServerString` is one of your private endpoint (single-VPC) bootstrap URLs.

## Amazon MSK Serverless prerequisites
<a name="msk-serverless-prereqs"></a>

Before you create your OpenSearch Ingestion pipeline, perform the following steps:

1. Create an Amazon MSK Serverless cluster by following the steps in [Create an MSK Serverless cluster](https://docs.aws.amazon.com/msk/latest/developerguide/create-serverless-cluster.html#) in the *Amazon Managed Streaming for Apache Kafka Developer Guide*.

1. After the cluster has an **Active** status, follow the steps in [Attach a cluster policy to the MSK cluster](https://docs.aws.amazon.com/msk/latest/developerguide/aws-access-mult-vpc.html#mvpc-cluster-owner-action-policy) to attach the following policy. Make sure that you update the `resource` with your own ARN. 

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       },
       {
         "Effect": "Allow",
         "Principal": {
           "Service": "osis-pipelines.amazonaws.com"
         },
         "Action": [
           "kafka:CreateVpcConnection",
           "kafka:GetBootstrapBrokers",
           "kafka:DescribeClusterV2"
         ],
         "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
       }
     ]
   }
   ```

------

   This policy allows OpenSearch Ingestion to create a AWS PrivateLink connection to your Amazon MSK Serverless cluster and read data from Kafka topics. This policy applies when your cluster and pipeline are in the same AWS account, which must be true as Amazon MSK Serverless doesn't support cross-account access.

1. Create a Kafka topic by following the steps in [Create a topic](https://docs.aws.amazon.com/msk/latest/developerguide/msk-serverless-create-topic.html). Make sure that `BootstrapServerString` is one of your Simple Authentication and Security Layer (SASL) IAM bootstrap URLs. The value for `--replication-factor` should be `2` or `3`, based on the number of zones your Amazon MSK Serverless cluster has. The value for `--partitions` should be at least `10`.

1. Produce and consume data by following the steps in [Produce and consume data](https://docs.aws.amazon.com/msk/latest/developerguide/msk-serverless-produce-consume.html). Again, make sure that `BootstrapServerString` is one of your Simple Authentication and Security Layer (SASL) IAM bootstrap URLs.

## Step 1: Configure a pipeline role
<a name="msk-pipeline-role"></a>

After you have your Amazon MSK provisoned or serverless cluster set up, add the following Kafka permissions in the pipeline role that you want to use in your pipeline configuration:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster",
                "kafka:DescribeClusterV2",
                "kafka:GetBootstrapBrokers"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:111122223333:topic/cluster-name/cluster-id/topic-name"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:111122223333:group/cluster-name/*"
            ]
        }
    ]
}
```

------

## Step 2: Create the pipeline
<a name="msk-pipeline"></a>

You can then configure an OpenSearch Ingestion pipeline like the following, which specifies Kafka as the source:

```
version: "2"
log-pipeline:
  source:
    kafka:
      acknowledgements: true
      topics:
      - name: "topic-name"
        group_id: "grouplambd-id"
      aws:
        msk:
          arn: "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-id"
        region: "us-west-2"
  processor:
  - grok:
      match:
        message:
        - "%{COMMONAPACHELOG}"
  - date:
      destination: "@timestamp"
      from_time_received: true
  sink:
  - opensearch:
      hosts: ["https://search-domain-endpoint.us-east-1es.amazonaws.com"]
      index: "index_name"
      aws_region: "region"
      aws_sigv4: true
```

You can use a preconfigured Amazon MSK blueprint to create this pipeline. For more information, see [Working with blueprints](pipeline-blueprint.md).

## Step 3: (Optional) Use the AWS Glue Schema Registry
<a name="msk-glue"></a>

When you use OpenSearch Ingestion with Amazon MSK, you can use the AVRO data format for schemas hosted in the AWS Glue Schema Registry. With the [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html), you can centrally discover, control, and evolve data stream schemas. 

To use this option, enable the schema `type` in your pipeline configuration:

```
schema:
  type: "aws_glue"
```

You must also provide AWS Glue with read access permissions in your pipeline role. You can use the AWS managed policy called [AWSGlueSchemaRegistryReadonlyAccess](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSGlueSchemaRegistryReadonlyAccess.html). Additionally, your registry must be in the same AWS account and Region as your OpenSearch Ingestion pipeline.

## Step 4: (Optional) Configure recommended compute units (OCUs) for the Amazon MSK pipeline
<a name="msk-ocu"></a>

Each compute unit has one consumer per topic. Brokers balance partitions among these consumers for a given topic. However, when the number of partitions is greater than the number of consumers, Amazon MSK hosts multiple partitions on every consumer. OpenSearch Ingestion has built-in auto scaling to scale up or down based on CPU usage or number of pending records in the pipeline. 

For optimal performance, distribute your partitions across many compute units for parallel processing. If topics have a large number of partitions (for example, more than 96, which is the maximum OCUs per pipeline), we recommend that you configure a pipeline with 1–96 OCUs. This is because it will automatically scale as needed. If a topic has a low number of partitions (for example, less than 96), keep the maximum compute unit the same as the number of partitions. 

When a pipeline has more than one topic, choose the topic with the highest number of partitions as a reference to configure maximum computes units. By adding another pipeline with a new set of OCUs to the same topic and consumer group, you can scale the throughput almost linearly.

# Using an OpenSearch Ingestion pipeline with Amazon RDS
<a name="configure-client-rds"></a>

You can use an OpenSearch Ingestion pipeline with Amazon RDS to export existing data and stream changes (such as create, update, and delete) to Amazon OpenSearch Service domains and collections. The OpenSearch Ingestion pipeline incorporates change data capture (CDC) infrastructure to provide a high-scale, low-latency way to continuously stream data from Amazon RDS. RDS for MySQL and RDS for PostgreSQL are supported.

There are two ways that you can use Amazon RDS as a source to process data—with or without a full initial snapshot. A full initial snapshot is a snapshot of specified tables and this snapshot is exported to Amazon S3. From there, an OpenSearch Ingestion pipeline sends it to one index in a domain, or partitions it to multiple indexes in a domain. To keep the data in Amazon RDS and OpenSearch consistent, the pipeline syncs all of the create, update, and delete events in the tables in Amazon RDS instances with the documents saved in the OpenSearch index or indexes.

When you use a full initial snapshot, your OpenSearch Ingestion pipeline first ingests the snapshot and then starts reading data from Amazon RDS change streams. It eventually catches up and maintains near real-time data consistency between Amazon RDS and OpenSearch. 

You can also use the OpenSearch Ingestion integration with Amazon RDS to track change data capture and ingest all updates in Aurora to OpenSearch. Choose this option if you already have a full snapshot from some other mechanism, or if you just want to capture all changes to the data in an Amazon RDS instance. 

When you choose this option you need to [configure Amazon RDS for MySQL binary logging](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_LogAccess.MySQL.BinaryFormat.html) or [set up logical replication for Amazon RDS for PostgresSQL DB instance](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Appendix.PostgreSQL.CommonDBATasks.pglogical.setup-replication.html). 

**Topics**
+ [RDS for MySQL](rds-mysql.md)
+ [RDS for PostgreSQL](rds-PostgreSQL.md)

# RDS for MySQL
<a name="rds-mysql"></a>

Complete the following steps to configure an OpenSearch Ingestion pipeline with Amazon RDS for RDS for MySQL.

**Topics**
+ [RDS for MySQL prerequisites](#rds-mysql-prereqs)
+ [Step 1: Configure the pipeline role](#rds-mysql-pipeline-role)
+ [Step 2: Create the pipeline](#rds-mysql-pipeline)
+ [Data consistency](#rds-mysql-pipeline-consistency)
+ [Mapping data types](#rds-mysql-pipeline-mapping)
+ [Limitations](#rds-mysql-pipeline-limitations)
+ [Recommended CloudWatch Alarms](#aurora-mysql-pipeline-metrics)

## RDS for MySQL prerequisites
<a name="rds-mysql-prereqs"></a>

Before you create your OpenSearch Ingestion pipeline, perform the following steps:

1. Create a custom DB parameter group in Amazon RDS to configure binary logging and set the following parameters.

   ```
   binlog_format=ROW
   binlog_row_image=full
   binlog_row_metadata=FULL
   ```

   Additionally, make sure the `binlog_row_value_options` parameter is not set to `PARTIAL_JSON`.

   For more information, see [Configuring RDS for MySQL binary logging](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_LogAccess.MySQL.BinaryFormat.html).

1. [Select or create an RDS for MySQL DB instance](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_CreateDBInstance.html) and associate the parameter group created in the previous step with the DB instance.

1. Verify that automated backups are enabled on the database. For more information, see [Enabling automated backups](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_WorkingWithAutomatedBackups.Enabling.html). 

1. Configure binary log retention with enough time for replication to occur, for example 24 hours. For more information, see [Setting and showing binary log configuration](https://docs.aws.amazon.com//AmazonRDS/latest/UserGuide/mysql-stored-proc-configuring.html) in the *Amazon RDS User Guide*.

1. Set up username and password authentication on your Amazon RDS instance using [password management with Amazon RDS and AWS Secrets Manager](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/rds-secrets-manager.html). You can also create a username/password combination by [creating a Secrets Manager secret](https://docs.aws.amazon.com/secretsmanager/latest/userguide/create_secret.html).

1. If you use the full initial snapshot feature, create an AWS KMS key and an IAM role for exporting data from Amazon RDS to Amazon S3.

   The IAM role should have the following permission policy:

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ExportPolicy",
               "Effect": "Allow",
               "Action": [
                   "s3:PutObject*",
                   "s3:ListBucket",
                   "s3:GetObject*",
                   "s3:DeleteObject*",
                   "s3:GetBucketLocation"
               ],
               "Resource": [
                   "arn:aws:s3:::s3-bucket-used-in-pipeline",
                   "arn:aws:s3:::s3-bucket-used-in-pipeline/*"
               ]
           }
       ]
   }
   ```

------

   The role should also have the following trust relationships:

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Principal": {
                   "Service": "export.rds.amazonaws.com"
               },
               "Action": "sts:AssumeRole"
           }
       ]
   }
   ```

------

1. Select or create an OpenSearch Service domain or OpenSearch Serverless collection. For more information, see [Creating OpenSearch Service domains](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/createupdatedomains.html#createdomains) and [Creating collections](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-manage.html#serverless-create).

1. Attach a [resource-based policy](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource) to your domain or a [data access policy](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-data-access.html) to your collection. These access policies allow OpenSearch Ingestion to write data from your Amazon RDS DB instance to your domain or collection.

## Step 1: Configure the pipeline role
<a name="rds-mysql-pipeline-role"></a>

After you have your Amazon RDS pipeline prerequisites set up, [configure the pipeline role](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-security-overview.html#pipeline-security-sink) to use in your pipeline configuration. Also add the following permissions for Amazon RDS source to the role:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
    {
    "Sid": "allowReadingFromS3Buckets",
    "Effect": "Allow",
    "Action": [
    "s3:GetObject",
    "s3:DeleteObject",
    "s3:GetBucketLocation",
    "s3:ListBucket",
    "s3:PutObject"
    ],
    "Resource": [
    "arn:aws:s3:::s3_bucket",
    "arn:aws:s3:::s3_bucket/*"
    ]
    },
    {
    "Sid": "allowNetworkInterfacesActions",
    "Effect": "Allow",
    "Action": [
    "ec2:AttachNetworkInterface",
    "ec2:CreateNetworkInterface",
    "ec2:CreateNetworkInterfacePermission",
    "ec2:DeleteNetworkInterface",
    "ec2:DeleteNetworkInterfacePermission",
    "ec2:DetachNetworkInterface",
    "ec2:DescribeNetworkInterfaces"
    ],
    "Resource": [
    "arn:aws:ec2:*:111122223333:network-interface/*",
    "arn:aws:ec2:*:111122223333:subnet/*",
    "arn:aws:ec2:*:111122223333:security-group/*"
    ]
    },
    {
    "Sid": "allowDescribeEC2",
    "Effect": "Allow",
    "Action": [
    "ec2:Describe*"
    ],
    "Resource": "*"
    },
    {
    "Sid": "allowTagCreation",
    "Effect": "Allow",
    "Action": [
    "ec2:CreateTags"
    ],
    "Resource": "arn:aws:ec2:*:111122223333:network-interface/*",
    "Condition": {
    "StringEquals": {
    "aws:RequestTag/OSISManaged": "true"
    }
    }
    },
    {
    "Sid": "AllowDescribeInstances",
    "Effect": "Allow",
    "Action": [
    "rds:DescribeDBInstances"
    ],
    "Resource": [
    "arn:aws:rds:us-east-2:111122223333:db:*"
    ]
    },
    {
    "Sid": "AllowSnapshots",
    "Effect": "Allow",
    "Action": [
    "rds:DescribeDBSnapshots",
    "rds:CreateDBSnapshot",
    "rds:AddTagsToResource"
    ],
    "Resource": [
    "arn:aws:rds:us-east-2:111122223333:db:DB-id",
    "arn:aws:rds:us-east-2:111122223333:snapshot:DB-id*"
    ]
    },
    {
    "Sid": "AllowExport",
    "Effect": "Allow",
    "Action": [
    "rds:StartExportTask"
    ],
    "Resource": [
    "arn:aws:rds:us-east-2:111122223333:snapshot:DB-id*"
    ]
    },
    {
    "Sid": "AllowDescribeExports",
    "Effect": "Allow",
    "Action": [
    "rds:DescribeExportTasks"
    ],
    "Resource": "*",
    "Condition": {
    "StringEquals": {
    "aws:RequestedRegion": "us-east-2",
    "aws:ResourceAccount": "111122223333"
    }
    }
    },
    {
    "Sid": "AllowAccessToKmsForExport",
    "Effect": "Allow",
    "Action": [
    "kms:Decrypt",
    "kms:Encrypt",
    "kms:DescribeKey",
    "kms:RetireGrant",
    "kms:CreateGrant",
    "kms:ReEncrypt*",
    "kms:GenerateDataKey*"
    ],
    "Resource": [
    "arn:aws:kms:us-east-2:111122223333:key/export-key-id"
    ]
    },
    {
    "Sid": "AllowPassingExportRole",
    "Effect": "Allow",
    "Action": "iam:PassRole",
    "Resource": [
    "arn:aws:iam::111122223333:role/export-role"
    ]
    },
    {
    "Sid": "SecretsManagerReadAccess",
    "Effect": "Allow",
    "Action": [
    "secretsmanager:GetSecretValue"
    ],
    "Resource": [
    "arn:aws:secretsmanager:*:111122223333:secret:*"
    ]
    }
    ]
    }
```

------

## Step 2: Create the pipeline
<a name="rds-mysql-pipeline"></a>

Configure an OpenSearch Ingestion pipeline similar to the following. The example pipeline specifies an Amazon RDS instance as the source. 

```
version: "2"
rds-mysql-pipeline:
  source:
    rds:
      db_identifier: "instance-id"
      engine: mysql
      database: "database-name"
      tables:
        include:
          - "table1"
          - "table2"
      s3_bucket: "bucket-name"
      s3_region: "bucket-region"
      s3_prefix: "prefix-name"
      export:
        kms_key_id: "kms-key-id"
        iam_role_arn: "export-role-arn"
      stream: true
      aws:
        sts_role_arn: "arn:aws:iam::account-id:role/pipeline-role"
        region: "us-east-1"
      authentication:
        username: ${{aws_secrets:secret:username}}
        password: ${{aws_secrets:secret:password}}
  sink:
    - opensearch:
        hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"]
        index: "${getMetadata(\"table_name\")}"
        index_type: custom
        document_id: "${getMetadata(\"primary_key\")}"
        action: "${getMetadata(\"opensearch_action\")}"
        document_version: "${getMetadata(\"document_version\")}"
        document_version_type: "external"
        aws:
          sts_role_arn: "arn:aws:iam::account-id:role/pipeline-role"
          region: "us-east-1"
extension:
  aws:
    secrets:
      secret:
        secret_id: "rds-secret-id"
        region: "us-east-1"
        sts_role_arn: "arn:aws:iam::account-id:role/pipeline-role"
        refresh_interval: PT1H
```

You can use a preconfigured Amazon RDS blueprint to create this pipeline. For more information, see [Working with blueprints](pipeline-blueprint.md).

To use Amazon Aurora as a source, you need to configure VPC access for the pipeline. The VPC you choose should be the same VPC your Amazon Aurora source uses. Then choose one or more subnets and one or more VPC security groups. Note that the pipeline needs network access to a Aurora MySQL database, so you should also verify that your Aurora cluster is configured with a VPC security group that allows inbound traffic from the pipeline's VPC security group to the database port. For more information, see [Controlling access with security groups](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/Overview.RDSSecurityGroups.html).

If you're using the AWS Management Console to create your pipeline, you must also attach your pipeline to your VPC in order to use Amazon Aurora as a source. To do this, find the **Network configuration** section, choose **Attach to VPC**, and choose your CIDR from one of the provided default options, or select your own. You can use any CIDR from a private address space as defined in the [RFC 1918 Best Current Practice](https://datatracker.ietf.org/doc/html/rfc1918).

To provide a custom CIDR, select **Other** from the dropdown menu. To avoid a collision in IP addresses between OpenSearch Ingestion and Amazon RDS, ensure that the Amazon RDS VPC CIDR is different from the CIDR for OpenSearch Ingestion.

For more information, see [Configuring VPC access for a pipeline](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-security.html#pipeline-vpc-configure).

## Data consistency
<a name="rds-mysql-pipeline-consistency"></a>

The pipeline ensures data consistency by continuously polling or receiving changes from the Amazon RDS instance and updating the corresponding documents in the OpenSearch index.

OpenSearch Ingestion supports end-to-end acknowledgement to ensure data durability. When a pipeline reads snapshots or streams, it dynamically creates partitions for parallel processing. The pipeline marks a partition as complete when it receives an acknowledgement after ingesting all records in the OpenSearch domain or collection. If you want to ingest into an OpenSearch Serverless search collection, you can generate a document ID in the pipeline. If you want to ingest into an OpenSearch Serverless time series collection, note that the pipeline doesn't generate a document ID, so you must omit `document_id: "${getMetadata(\"primary_key\")}"` in your pipeline sink configuration. 

An OpenSearch Ingestion pipeline also maps incoming event actions into corresponding bulk indexing actions to help ingest documents. This keeps data consistent, so that every data change in Amazon RDS is reconciled with the corresponding document changes in OpenSearch.

## Mapping data types
<a name="rds-mysql-pipeline-mapping"></a>

OpenSearch Ingestion pipeline maps MySQL data types to representations that are suitable for OpenSearch Service domains or collections to consume. If no mapping template is defined in OpenSearch, OpenSearch automatically determines field types with [dynamic mapping](https://docs.opensearch.org/latest/field-types/#dynamic-mapping) based on the first sent document. You can also explicitly define the field types that work best for you in OpenSearch through a mapping template. 

The table below lists MySQL data types and corresponding OpenSearch field types. The *Default OpenSearch Field Type* column shows the corresponding field type in OpenSearch if no explicit mapping is defined. In this case, OpenSearch automatically determines field types with dynamic mapping. The *Recommended OpenSearch Field Type* column is the corresponding field type that is recommended to explicitly specify in a mapping template. These field types are more closely aligned with the data types in MySQL and can usually enable better search features available in OpenSearch.


| MySQL Data Type | Default OpenSearch Field Type | Recommended OpenSearch Field Type | 
| --- | --- | --- | 
| BIGINT | long | long | 
| BIGINT UNSIGNED | long | unsigned long | 
| BIT | long | byte, short, integer, or long depending on number of bits | 
| DECIMAL | text | double or keyword | 
| DOUBLE | float | double | 
| FLOAT | float | float | 
| INT | long | integer | 
| INT UNSIGNED | long | long | 
| MEDIUMINT | long | integer | 
| MEDIUMINT UNSIGNED | long | integer | 
| NUMERIC | text | double or keyword | 
| SMALLINT | long | short | 
| SMALLINT UNSIGNED | long | integer | 
| TINYINT | long | byte | 
| TINYINT UNSIGNED | long | short | 
| BINARY | text | binary | 
| BLOB | text | binary | 
| CHAR | text | text | 
| ENUM | text | keyword | 
| LONGBLOB | text | binary | 
| LONGTEXT | text | text | 
| MEDIUMBLOB | text | binary | 
| MEDIUMTEXT | text | text | 
| SET | text | keyword | 
| TEXT | text | text | 
| TINYBLOB | text | binary | 
| TINYTEXT | text | text | 
| VARBINARY | text | binary | 
| VARCHAR | text | text | 
| DATE | long (in epoch milliseconds) | date | 
| DATETIME | long (in epoch milliseconds) | date | 
| TIME | long (in epoch milliseconds) | date | 
| TIMESTAMP | long (in epoch milliseconds) | date | 
| YEAR | long (in epoch milliseconds) | date | 
| GEOMETRY | text (in WKT format) | geo\$1shape | 
| GEOMETRYCOLLECTION | text (in WKT format) | geo\$1shape | 
| LINESTRING | text (in WKT format) | geo\$1shape | 
| MULTILINESTRING | text (in WKT format) | geo\$1shape | 
| MULTIPOINT | text (in WKT format) | geo\$1shape | 
| MULTIPOLYGON | text (in WKT format) | geo\$1shape | 
| POINT | text (in WKT format) | geo\$1point or geo\$1shape | 
| POLYGON | text (in WKT format) | geo\$1shape | 
| JSON | text | object | 

We recommend that you configure the dead-letter queue (DLQ) in your OpenSearch Ingestion pipeline. If you've configured the queue, OpenSearch Service sends all failed documents that can't be ingested due to dynamic mapping failures to the queue.

If automatic mappings fail, you can use `template_type` and `template_content` in your pipeline configuration to define explicit mapping rules. Alternatively, you can create mapping templates directly in your search domain or collection before you start the pipeline.

## Limitations
<a name="rds-mysql-pipeline-limitations"></a>

Consider the following limitations when you set up an OpenSearch Ingestion pipeline for RDS for MySQL:
+ The integration only supports one MySQL database per pipeline.
+ The integration does not currently support cross-region data ingestion; your Amazon RDS instance and OpenSearch domain must be in the same AWS Region.
+ The integration does not currently support cross-account data ingestion; your Amazon RDS instance and OpenSearch Ingestion pipeline must be in the same AWS account. 
+ Ensure that the Amazon RDS instance has authentication enabled using Secrets Manager, which is the only supported authentication mechanism.
+ The existing pipeline configuration can't be updated to ingest data from a different database and/or a different table. To update the database and/or table name of a pipeline, you have to create a new pipeline.
+ Data Definition Language (DDL) statements are generally not supported. Data consistency will not be maintained if:
  + Primary keys are changed (add/delete/rename).
  + Tables are dropped/truncated.
  + Column names or data types are changed.
+ If the MySQL tables to sync don't have primary keys defined, data consistency are not guaranteed. You will need to define custom `document_id` option in OpenSearch sink configuration properly to be able to sync updates/deletes to OpenSearch.
+ Foreign key references with cascading delete actions are not supported and can result in data inconsistency between RDS for MySQL and OpenSearch.
+ Amazon RDS multi-availability zone DB clusters are not supported.
+ Supported versions: MySQL version 8.0 and higher.

## Recommended CloudWatch Alarms
<a name="aurora-mysql-pipeline-metrics"></a>

The following CloudWatch metrics are recommended for monitoring the performance of your ingestion pipeline. These metrics can help you identify the amount of data processed from exports, the number of events processed from streams, the errors in processing exports and stream events, and the number of documents written to the destination. You can setup CloudWatch alarms to perform an action when one of these metrics exceed a specified value for a specified amount of time.


| Metric | Description | 
| --- | --- | 
| pipeline-name.rds.credentialsChanged | This metric indicates how often AWS secrets are rotated. | 
| pipeline-name.rds.executorRefreshErrors | This metric indicates failures to refresh AWS secrets. | 
| pipeline-name.rds.exportRecordsTotal | This metric indicates the number of records exported from Amazon Aurora. | 
| pipeline-name.rds.exportRecordsProcessed | This metric indicates the number of records processed by OpenSearch Ingestion pipeline. | 
| pipeline-name.rds.exportRecordProcessingErrors | This metric indicates number of processing errors in an OpenSearch Ingestion pipeline while reading the data from an Amazon Aurora cluster. | 
| pipeline-name.rds.exportRecordsSuccessTotal | This metric indicates the total number of export records processed successfully. | 
| pipeline-name.rds.exportRecordsFailedTotal | This metric indicates the total number of export records that failed to process. | 
| pipeline-name.rds.bytesReceived | This metrics indicates the total number of bytes received by an OpenSearch Ingestion pipeline. | 
| pipeline-name.rds.bytesProcessed | This metrics indicates the total number of bytes processed by an OpenSearch Ingestion pipeline. | 
| pipeline-name.rds.streamRecordsSuccessTotal | This metric indicates the number of records successfully processed from the stream. | 
| pipeline-name.rds.streamRecordsFailedTotal | This metrics indicates the total number of records failed to process from the stream. | 

# RDS for PostgreSQL
<a name="rds-PostgreSQL"></a>

Complete the following steps to configure an OpenSearch Ingestion pipeline with Amazon RDS for RDS for PostgreSQL.

**Topics**
+ [RDS for PostgreSQL prerequisites](#rds-PostgreSQL-prereqs)
+ [Step 1: Configure the pipeline role](#rds-mysql-pipeline-role)
+ [Step 2: Create the pipeline](#rds-PostgreSQL-pipeline)
+ [Data consistency](#rds-mysql-pipeline-consistency)
+ [Mapping data types](#rds-PostgreSQL-pipeline-mapping)
+ [Limitations](#rds-PostgreSQL-pipeline-limitations)
+ [Recommended CloudWatch Alarms](#aurora-mysql-pipeline-metrics)

## RDS for PostgreSQL prerequisites
<a name="rds-PostgreSQL-prereqs"></a>

Before you create your OpenSearch Ingestion pipeline, perform the following steps:

1. [Create a custom DB parameter group](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/zero-etl.setting-up.html#zero-etl.parameters) in Amazon RDS to configure logical replication.

   ```
   rds.logical_replication=1
   ```

   For more information, see [Performing logical replication for Amazon RDS for PostgreSQL](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/PostgreSQL.Concepts.General.FeatureSupport.LogicalReplication.html).

1. [Select or create an RDS for PostgreSQL DB instance](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_GettingStarted.CreatingConnecting.PostgreSQL.html) and associate the parameter group created in step 1 with the DB instance.

1. Set up username and password authentication on your Amazon RDS instance using [password management with Aurora and AWS Secrets Manager](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/rds-secrets-manager.html). You can also create a username/password combination by [creating a Secrets Manager secret](https://docs.aws.amazon.com/secretsmanager/latest/userguide/create_secret.html).

1. If you use the full initial snapshot feature, create an AWS KMS key and an IAM role for exporting data from Amazon RDS to Amazon S3.

   The IAM role should have the following permission policy:

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ExportPolicy",
               "Effect": "Allow",
               "Action": [
                   "s3:PutObject*",
                   "s3:ListBucket",
                   "s3:GetObject*",
                   "s3:DeleteObject*",
                   "s3:GetBucketLocation"
               ],
               "Resource": [
                   "arn:aws:s3:::s3-bucket-used-in-pipeline",
                   "arn:aws:s3:::s3-bucket-used-in-pipeline/*"
               ]
           }
       ]
   }
   ```

------

   The role should also have the following trust relationships:

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Principal": {
                   "Service": "export.rds.amazonaws.com"
               },
               "Action": "sts:AssumeRole"
           }
       ]
   }
   ```

------

1. Select or create an OpenSearch Service domain or OpenSearch Serverless collection. For more information, see [Creating OpenSearch Service domains](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/createupdatedomains.html#createdomains) and [Creating collections](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-manage.html#serverless-create).

1. Attach a [resource-based policy](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource) to your domain or a [data access policy](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-data-access.html) to your collection. These access policies allow OpenSearch Ingestion to write data from your Amazon RDS DB instance to your domain or collection.

## Step 1: Configure the pipeline role
<a name="rds-mysql-pipeline-role"></a>

After you have your Amazon RDS pipeline prerequisites set up, [configure the pipeline role](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-security-overview.html#pipeline-security-sink) to use in your pipeline configuration. Also add the following permissions for Amazon RDS source to the role:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
    {
    "Sid": "allowReadingFromS3Buckets",
    "Effect": "Allow",
    "Action": [
    "s3:GetObject",
    "s3:DeleteObject",
    "s3:GetBucketLocation",
    "s3:ListBucket",
    "s3:PutObject"
    ],
    "Resource": [
    "arn:aws:s3:::s3_bucket",
    "arn:aws:s3:::s3_bucket/*"
    ]
    },
    {
    "Sid": "allowNetworkInterfacesActions",
    "Effect": "Allow",
    "Action": [
    "ec2:AttachNetworkInterface",
    "ec2:CreateNetworkInterface",
    "ec2:CreateNetworkInterfacePermission",
    "ec2:DeleteNetworkInterface",
    "ec2:DeleteNetworkInterfacePermission",
    "ec2:DetachNetworkInterface",
    "ec2:DescribeNetworkInterfaces"
    ],
    "Resource": [
    "arn:aws:ec2:*:111122223333:network-interface/*",
    "arn:aws:ec2:*:111122223333:subnet/*",
    "arn:aws:ec2:*:111122223333:security-group/*"
    ]
    },
    {
    "Sid": "allowDescribeEC2",
    "Effect": "Allow",
    "Action": [
    "ec2:Describe*"
    ],
    "Resource": "*"
    },
    {
    "Sid": "allowTagCreation",
    "Effect": "Allow",
    "Action": [
    "ec2:CreateTags"
    ],
    "Resource": "arn:aws:ec2:*:111122223333:network-interface/*",
    "Condition": {
    "StringEquals": {
    "aws:RequestTag/OSISManaged": "true"
    }
    }
    },
    {
    "Sid": "AllowDescribeInstances",
    "Effect": "Allow",
    "Action": [
    "rds:DescribeDBInstances"
    ],
    "Resource": [
    "arn:aws:rds:us-east-2:111122223333:db:*"
    ]
    },
    {
    "Sid": "AllowSnapshots",
    "Effect": "Allow",
    "Action": [
    "rds:DescribeDBSnapshots",
    "rds:CreateDBSnapshot",
    "rds:AddTagsToResource"
    ],
    "Resource": [
    "arn:aws:rds:us-east-2:111122223333:db:DB-id",
    "arn:aws:rds:us-east-2:111122223333:snapshot:DB-id*"
    ]
    },
    {
    "Sid": "AllowExport",
    "Effect": "Allow",
    "Action": [
    "rds:StartExportTask"
    ],
    "Resource": [
    "arn:aws:rds:us-east-2:111122223333:snapshot:DB-id*"
    ]
    },
    {
    "Sid": "AllowDescribeExports",
    "Effect": "Allow",
    "Action": [
    "rds:DescribeExportTasks"
    ],
    "Resource": "*",
    "Condition": {
    "StringEquals": {
    "aws:RequestedRegion": "us-east-2",
    "aws:ResourceAccount": "111122223333"
    }
    }
    },
    {
    "Sid": "AllowAccessToKmsForExport",
    "Effect": "Allow",
    "Action": [
    "kms:Decrypt",
    "kms:Encrypt",
    "kms:DescribeKey",
    "kms:RetireGrant",
    "kms:CreateGrant",
    "kms:ReEncrypt*",
    "kms:GenerateDataKey*"
    ],
    "Resource": [
    "arn:aws:kms:us-east-2:111122223333:key/export-key-id"
    ]
    },
    {
    "Sid": "AllowPassingExportRole",
    "Effect": "Allow",
    "Action": "iam:PassRole",
    "Resource": [
    "arn:aws:iam::111122223333:role/export-role"
    ]
    },
    {
    "Sid": "SecretsManagerReadAccess",
    "Effect": "Allow",
    "Action": [
    "secretsmanager:GetSecretValue"
    ],
    "Resource": [
    "arn:aws:secretsmanager:*:111122223333:secret:*"
    ]
    }
    ]
    }
```

------

## Step 2: Create the pipeline
<a name="rds-PostgreSQL-pipeline"></a>

Configure an OpenSearch Ingestion pipeline like the following, which specifies an RDS for PostgreSQL instance as the source. 

```
version: "2"
rds-postgres-pipeline:
  source:
    rds:
      db_identifier: "instance-id"
      engine: postgresql
      database: "database-name"
      tables:
        include:
          - "schema1.table1"
          - "schema2.table2"
      s3_bucket: "bucket-name"
      s3_region: "bucket-region"
      s3_prefix: "prefix-name"
      export:
        kms_key_id: "kms-key-id"
        iam_role_arn: "export-role-arn"
      stream: true
      aws:
        sts_role_arn: "arn:aws:iam::account-id:role/pipeline-role"
        region: "us-east-1"
      authentication:
        username: ${{aws_secrets:secret:username}}
        password: ${{aws_secrets:secret:password}}
  sink:
    - opensearch:
        hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"]
        index: "${getMetadata(\"table_name\")}"
        index_type: custom
        document_id: "${getMetadata(\"primary_key\")}"
        action: "${getMetadata(\"opensearch_action\")}"
        document_version: "${getMetadata(\"document_version\")}"
        document_version_type: "external"
        aws:
          sts_role_arn: "arn:aws:iam::account-id:role/pipeline-role"
          region: "us-east-1"
extension:
  aws:
    secrets:
      secret:
        secret_id: "rds-secret-id"
        region: "us-east-1"
        sts_role_arn: "arn:aws:iam::account-id:role/pipeline-role"
        refresh_interval: PT1H
```

**Note**  
You can use a preconfigured Amazon RDS blueprint to create this pipeline. For more information, see [Working with blueprints](pipeline-blueprint.md).

To use Amazon Aurora as a source, you need to configure VPC access for the pipeline. The VPC you choose should be the same VPC your Amazon Aurora source uses. Then choose one or more subnets and one or more VPC security groups. Note that the pipeline needs network access to a Aurora MySQL database, so you should also verify that your Aurora cluster is configured with a VPC security group that allows inbound traffic from the pipeline's VPC security group to the database port. For more information, see [Controlling access with security groups](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/Overview.RDSSecurityGroups.html).

If you're using the AWS Management Console to create your pipeline, you must also attach your pipeline to your VPC in order to use Amazon Aurora as a source. To do this, find the **Network configuration** section, choose **Attach to VPC**, and choose your CIDR from one of the provided default options, or select your own. You can use any CIDR from a private address space as defined in the [RFC 1918 Best Current Practice](https://datatracker.ietf.org/doc/html/rfc1918).

To provide a custom CIDR, select **Other** from the dropdown menu. To avoid a collision in IP addresses between OpenSearch Ingestion and Amazon RDS, ensure that the Amazon Aurora VPC CIDR is different from the CIDR for OpenSearch Ingestion.

For more information, see [Configuring VPC access for a pipeline](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-security.html#pipeline-vpc-configure).

## Data consistency
<a name="rds-mysql-pipeline-consistency"></a>

The pipeline ensures data consistency by continuously polling or receiving changes from the Amazon RDS instance and updating the corresponding documents in the OpenSearch index.

OpenSearch Ingestion supports end-to-end acknowledgement to ensure data durability. When a pipeline reads snapshots or streams, it dynamically creates partitions for parallel processing. The pipeline marks a partition as complete when it receives an acknowledgement after ingesting all records in the OpenSearch domain or collection. If you want to ingest into an OpenSearch Serverless search collection, you can generate a document ID in the pipeline. If you want to ingest into an OpenSearch Serverless time series collection, note that the pipeline doesn't generate a document ID, so you must omit `document_id: "${getMetadata(\"primary_key\")}"` in your pipeline sink configuration. 

An OpenSearch Ingestion pipeline also maps incoming event actions into corresponding bulk indexing actions to help ingest documents. This keeps data consistent, so that every data change in Amazon RDS is reconciled with the corresponding document changes in OpenSearch.

## Mapping data types
<a name="rds-PostgreSQL-pipeline-mapping"></a>

OpenSearch Ingestion pipeline maps PostgreSQL data types to representations that are suitable for OpenSearch Service domains or collections to consume. If no mapping template is defined in OpenSearch, OpenSearch automatically determine field types with a [dynamic mapping](https://docs.opensearch.org/latest/field-types/#dynamic-mapping) based on the first sent document. You can also explicitly define the field types that work best for you in OpenSearch through a mapping template. 

The table below lists RDS for PostgreSQL data types and corresponding OpenSearch field types. The *Default OpenSearch Field Type* column shows the corresponding field type in OpenSearch if no explicit mapping is defined. In this case, OpenSearch automatically determines field types with dynamic mapping. The *Recommended OpenSearch Field Type* column is the corresponding recommended field type to explicitly specify in a mapping template. These field types are more closely aligned with the data types in RDS for PostgreSQL and can usually enable better search features available in OpenSearch.


| RDS for PostgreSQL Data Type | Default OpenSearch Field Type | Recommended OpenSearch Field Type | 
| --- | --- | --- | 
| smallint | long | short | 
| integer | long | integer | 
| bigint | long | long | 
| decimal | text | double or keyword | 
| numeric[ (p, s) ] | text | double or keyword | 
| real | float | float | 
| double precision | float | double | 
| smallserial | long | short | 
| serial | long | integer | 
| bigserial | long | long | 
| money | object | object | 
| character varying(n) | text | text | 
| varchar(n) | text | text | 
| character(n) | text | text | 
| char(n) | text | text | 
| bpchar(n) | text | text | 
| bpchar | text | text | 
| text | text | text | 
| enum | text | text | 
| bytea | text | binary | 
| timestamp [ (p) ] [ without time zone ] | long (in epoch milliseconds) | date | 
| timestamp [ (p) ] with time zone | long (in epoch milliseconds) | date | 
| date | long (in epoch milliseconds) | date | 
| time [ (p) ] [ without time zone ] | long (in epoch milliseconds) | date | 
| time [ (p) ] with time zone | long (in epoch milliseconds) | date | 
| interval [ fields ] [ (p) ] | text (ISO8601 format) | text | 
| boolean | boolean | boolean | 
| point | text (in WKT format) | geo\$1shape | 
| line | text (in WKT format) | geo\$1shape | 
| lseg | text (in WKT format) | geo\$1shape | 
| box | text (in WKT format) | geo\$1shape | 
| path | text (in WKT format) | geo\$1shape | 
| polygon | text (in WKT format) | geo\$1shape | 
| circle | object | object | 
| cidr | text | text | 
| inet | text | text | 
| macaddr | text | text | 
| macaddr8 | text | text | 
| bit(n) | long | byte, short, integer, or long (depending on number of bits) | 
| bit varying(n) | long | byte, short, integer, or long (depending on number of bits) | 
| json | object | object | 
| jsonb | object | object | 
| jsonpath | text | text | 

We recommend that you configure the dead-letter queue (DLQ) in your OpenSearch Ingestion pipeline. If you've configured the queue, OpenSearch Service sends all failed documents that can't be ingested due to dynamic mapping failures to the queue.

In case automatic mappings fail, you can use `template_type` and `template_content` in your pipeline configuration to define explicit mapping rules. Alternatively, you can create mapping templates directly in your search domain or collection before you start the pipeline.

## Limitations
<a name="rds-PostgreSQL-pipeline-limitations"></a>

Consider the following limitations when you set up an OpenSearch Ingestion pipeline for RDS for PostgreSQL:
+ The integration only supports one PostgreSQL database per pipeline.
+ The integration does not currently support cross-region data ingestion; your Amazon RDS instance and OpenSearch domain must be in the same AWS Region.
+ The integration does not currently support cross-account data ingestion; your Amazon RDS instance and OpenSearch Ingestion pipeline must be in the same AWS account. 
+ Ensure that the Amazon RDS instance has authentication enabled using AWS Secrets Manager, which is the only supported authentication mechanism.
+ The existing pipeline configuration can't be updated to ingest data from a different database and/or a different table. To update the database and/or table name of a pipeline, you have to stop the pipeline and restart it with an updated configuration, or create a new pipeline.
+ Data Definition Language (DDL) statements are generally not supported. Data consistency will not be maintained if:
  + Primary keys are changed (add/delete/rename).
  + Tables are dropped/truncated.
  + Column names or data types are changed.
+ If the PostgreSQL tables to sync don’t have primary keys defined, data consistency isn't guaranteed. You will need to define custom the `document_id` option in OpenSearch and sink configuration properly to be able to sync updates/deletes to OpenSearch.
+ RDS multi-AZ DB clusters are not supported.
+ Supported versions: PostgreSQL 16 and higher.

## Recommended CloudWatch Alarms
<a name="aurora-mysql-pipeline-metrics"></a>

The following CloudWatch metrics are recommended for monitoring the performance of your ingestion pipeline. These metrics can help you identify the amount of data processed from exports, the number of events processed from streams, the errors in processing exports and stream events, and the number of documents written to the destination. You can setup CloudWatch alarms to perform an action when one of these metrics exceed a specified value for a specified amount of time.


| Metric | Description | 
| --- | --- | 
| pipeline-name.rds.credentialsChanged | This metric indicates how often AWS secrets are rotated. | 
| pipeline-name.rds.executorRefreshErrors | This metric indicates failures to refresh AWS secrets. | 
| pipeline-name.rds.exportRecordsTotal | This metric indicates the number of records exported from Amazon Aurora. | 
| pipeline-name.rds.exportRecordsProcessed | This metric indicates the number of records processed by OpenSearch Ingestion pipeline. | 
| pipeline-name.rds.exportRecordProcessingErrors | This metric indicates number of processing errors in an OpenSearch Ingestion pipeline while reading the data from an Amazon Aurora cluster. | 
| pipeline-name.rds.exportRecordsSuccessTotal | This metric indicates the total number of export records processed successfully. | 
| pipeline-name.rds.exportRecordsFailedTotal | This metric indicates the total number of export records that failed to process. | 
| pipeline-name.rds.bytesReceived | This metrics indicates the total number of bytes received by an OpenSearch Ingestion pipeline. | 
| pipeline-name.rds.bytesProcessed | This metrics indicates the total number of bytes processed by an OpenSearch Ingestion pipeline. | 
| pipeline-name.rds.streamRecordsSuccessTotal | This metric indicates the number of records successfully processed from the stream. | 
| pipeline-name.rds.streamRecordsFailedTotal | This metrics indicates the total number of records failed to process from the stream. | 

# Using an OpenSearch Ingestion pipeline with Amazon S3
<a name="configure-client-s3"></a>

With OpenSearch Ingestion, you can use Amazon S3 as a source or as a destination. When you use Amazon S3 as a source, you send data to an OpenSearch Ingestion pipeline. When you use Amazon S3 as a destination, you write data from an OpenSearch Ingestion pipeline to one or more S3 buckets.

**Topics**
+ [Amazon S3 as a source](#s3-source)
+ [Amazon S3 as a destination](#s3-destination)
+ [Amazon S3 cross account as a source](#fdsf)

## Amazon S3 as a source
<a name="s3-source"></a>

There are two ways that you can use Amazon S3 as a source to process data—with *S3-SQS processing* and with *scheduled scans*. 

Use S3-SQS processing when you require near real-time scanning of files after they are written to S3. You can configure Amazon S3 buckets to raise an event any time an object is stored or modified within the bucket. Use a one-time or recurring scheduled scan to batch process data in a S3 bucket. 

**Topics**
+ [Prerequisites](#s3-prereqs)
+ [Step 1: Configure the pipeline role](#s3-pipeline-role)
+ [Step 2: Create the pipeline](#s3-pipeline)

### Prerequisites
<a name="s3-prereqs"></a>

To use Amazon S3 as the source for an OpenSearch Ingestion pipeline for both a scheduled scan or S3-SQS processing, first [create an S3 bucket](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html).

**Note**  
If the S3 bucket used as a source in the OpenSearch Ingestion pipeline is in a different AWS account, you also need to enable cross-account read permissions on the bucket. This allows the pipeline to read and process the data. To enable cross-account permissions, see [Bucket owner granting cross-account bucket permissions](https://docs.aws.amazon.com/AmazonS3/latest/userguide/example-walkthroughs-managing-access-example2.html) in the *Amazon S3 User Guide*.  
If your S3 buckets are in multiple accounts, use a `bucket_owners` map. For an example, see [Cross-account S3 access](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/#cross-account-s3-access) in the OpenSearch documentation.

To set up S3-SQS processing, you also need to perform the following steps:

1. [Create an Amazon SQS queue](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/step-create-queue.html).

1. [Enable event notifications](https://docs.aws.amazon.com/AmazonS3/latest/userguide/enable-event-notifications.html) on the S3 bucket with the SQS queue as a destination.

### Step 1: Configure the pipeline role
<a name="s3-pipeline-role"></a>

Unlike other source plugins that *push* data to a pipeline, the [S3 source plugin](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/) has a read-based architecture in which the pipeline *pulls* data from the source. 

Therefore, in order for a pipeline to read from S3, you must specify a role within the pipeline's S3 source configuration that has access to both the S3 bucket and the Amazon SQS queue. The pipeline will assume this role in order to read data from the queue.

**Note**  
The role that you specify within the S3 source configuration must be the [pipeline role](). Therefore, your pipeline role must contain two separate permissions policies—one to write to a sink, and one to pull from the S3 source. You must use the same `sts_role_arn` in all pipeline components.

The following sample policy shows the required permissions for using S3 as a source:

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action":[
          "s3:ListBucket",
          "s3:GetBucketLocation",
          "s3:GetObject"
       ],
      "Resource": "arn:aws:s3:::amzn-s3-demo-bucket/*"
    },
    {
       "Effect":"Allow",
       "Action":"s3:ListAllMyBuckets",
       "Resource":"arn:aws:s3:::*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": "arn:aws:sqs:us-east-1:111122223333:MyS3EventSqsQueue"
    }
  ]
}
```

------

 You must attach these permissions to the IAM role that you specify in the `sts_role_arn` option within the S3 source plugin configuration:

```
version: "2"
source:
  s3:
    ...
    aws:
      ...
processor:
  ...
sink:
  - opensearch:
      ...
```

### Step 2: Create the pipeline
<a name="s3-pipeline"></a>

After you've set up your permissions, you can configure an OpenSearch Ingestion pipeline depending on your Amazon S3 use case.

#### S3-SQS processing
<a name="s3-sqs-processing"></a>

To set up S3-SQS processing, configure your pipeline to specify S3 as the source and set up Amazon SQS notifications:

```
version: "2"
s3-pipeline:
  source:
    s3:
      notification_type: "sqs"
      codec:
        newline: null
      sqs:
        queue_url: "https://sqs.us-east-1amazonaws.com/account-id/ingestion-queue"
      compression: "none"
      aws:
        region: "region"
  processor:
  - grok:
      match:
        message:
        - "%{COMMONAPACHELOG}"
  - date:
      destination: "@timestamp"
      from_time_received: true
  sink:
  - opensearch:
      hosts: ["https://search-domain-endpoint.us-east-1es.amazonaws.com"]
      index: "index-name"
      aws:
        region: "region"
```

If you observe low CPU utilization while processing small files on Amazon S3, consider increasing the throughput by modifying the value of the `workers` option. For more information, see the [S3 plugin configuration options](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/#configuration).

#### Scheduled scan
<a name="s3-scheduled-scan"></a>

To set up a scheduled scan, configure your pipeline with a schedule at the scan level that applies to all your S3 buckets, or at the bucket level. A bucket-level schedule or a scan-interval configuration always overwrites a scan-level configuration. 

You can configure scheduled scans with either a *one-time scan*, which is ideal for data migration, or a *recurring scan*, which is ideal for batch processing. 

To configure your pipeline to read from Amazon S3, use the preconfigured Amazon S3 blueprints. You can edit the `scan` portion of your pipeline configuration to meet your scheduling needs. For more information, see [Working with blueprints](pipeline-blueprint.md).

**One-time scan**

A one-time scheduled scan runs once. In your pipeline configuration, you can use a `start_time` and `end_time` to specify when you want the objects in the bucket to be scanned. Alternatively, you can use `range` to specify the interval of time relative to current time that you want the objects in the bucket to be scanned. 

For example, a range set to `PT4H` scans all files created in the last four hours. To configure a one-time scan to run a second time, you must stop and restart the pipeline. If you don't have a range configured, you must also update the start and end times.

The following configuration sets up a one-time scan for all buckets and all objects in those buckets:

```
version: "2"
log-pipeline:
  source:
    s3:
      codec:
        csv:
      compression: "none"
      aws:
        region: "region"
      acknowledgments: true
      scan:
        buckets:
          - bucket:
              name: my-bucket
              filter:
                include_prefix:
                  - Objects1/
                exclude_suffix:
                  - .jpeg
                  - .png
          - bucket:
              name: my-bucket-2
              key_prefix:
                include:
                  - Objects2/
                exclude_suffix:
                  - .jpeg
                  - .png
      delete_s3_objects_on_read: false
  processor:
    - date:
        destination: "@timestamp"
        from_time_received: true
  sink:
    - opensearch:
        hosts: ["https://search-domain-endpoint.us-east-1es.amazonaws.com"]
        index: "index-name"
        aws:
          region: "region"
        dlq:
          s3:
            bucket: "dlq-bucket"
            region: "us-east-1"
```

The following configuration sets up a one-time scan for all buckets during a specified time window. This means that S3 processes only those objects with creation times that fall within this window.

```
scan:
  start_time: 2023-01-21T18:00:00.000Z
  end_time: 2023-04-21T18:00:00.000Z
  buckets:
    - bucket:
        name: my-bucket-1
        filter:
          include:
            - Objects1/
          exclude_suffix:
            - .jpeg
            - .png
    - bucket:
        name: my-bucket-2
        filter:
          include:
            - Objects2/
          exclude_suffix:
            - .jpeg
            - .png
```

The following configuration sets up a one-time scan at both the scan level and the bucket level. Start and end times at the bucket level override start and end times at the scan level. 

```
scan:
  start_time: 2023-01-21T18:00:00.000Z
  end_time: 2023-04-21T18:00:00.000Z
  buckets:
    - bucket:
        start_time: 2023-01-21T18:00:00.000Z
        end_time: 2023-04-21T18:00:00.000Z
        name: my-bucket-1
        filter:
          include:
            - Objects1/
          exclude_suffix:
            - .jpeg
            - .png
    - bucket:
        start_time: 2023-01-21T18:00:00.000Z
        end_time: 2023-04-21T18:00:00.000Z
        name: my-bucket-2
        filter:
          include:
            - Objects2/
          exclude_suffix:
            - .jpeg
            - .png
```

Stopping a pipeline removes any pre-existing reference of what objects have been scanned by the pipeline before the stop. If a single scan pipeline is stopped, it will rescan all objects again after its started, even if they were already scanned. If you need to stop a single scan pipeline, it is recommended you change your time window before starting the pipeline again.

If you need to filter objects by start time and end time, stopping and starting your pipeline is the only option. If you don't need to filter by start time and end time, you can filter objects by name. Flitering by name doesn't require you to stop and start your pipeline. To do this, use `include_prefix` and `exclude_suffix`.

**Recurring scan**

A recurring scheduled scan runs a scan of your specified S3 buckets at regular, scheduled intervals. You can only configure these intervals at the scan level because individual bucket level configurations aren't supported. 

In your pipeline configuration, the `interval` specifies the frequency of the recurring scan, and can be between 30 seconds and 365 days. The first of these scans always occurs when you create the pipeline. The `count` defines the total number of scan instances.

The following configuration sets up a recurring scan, with a delay of 12 hours between the scans:

```
scan:
  scheduling:
    interval: PT12H
    count: 4
  buckets:
    - bucket:
        name: my-bucket-1
        filter:
          include:
            - Objects1/
          exclude_suffix:
            - .jpeg
            - .png
    - bucket:
        name: my-bucket-2
        filter:
          include:
            - Objects2/
          exclude_suffix:
            - .jpeg
            - .png
```

## Amazon S3 as a destination
<a name="s3-destination"></a>

To write data from an OpenSearch Ingestion pipeline to an S3 bucket, use the preconfigured S3 blueprint to create a pipeline with an [S3 sink](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sinks/s3/). This pipeline routes selective data to an OpenSearch sink and simultaneously sends all data for archival in S3. For more information, see [Working with blueprints](pipeline-blueprint.md).

When you create your S3 sink, you can specify your preferred formatting from a variety of [sink codecs](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sinks/s3/#codec). For example, if you want to write data in columnar format, choose the Parquet or Avro codec. If you prefer a row-based format, choose JSON or NDJSON. To write data to S3 in a specified schema, you can also define an inline schema within sink codecs using the [Avro format](https://avro.apache.org/docs/current/specification/#schema-declaration). 

The following example defines an inline schema in an S3 sink:

```
- s3:
  codec:
    parquet:
      schema: >
        {
           "type" : "record",
           "namespace" : "org.vpcFlowLog.examples",
           "name" : "VpcFlowLog",
           "fields" : [
             { "name" : "version", "type" : "string"},
             { "name" : "srcport", "type": "int"},
             { "name" : "dstport", "type": "int"},
             { "name" : "start", "type": "int"},
             { "name" : "end", "type": "int"},
             { "name" : "protocol", "type": "int"},
             { "name" : "packets", "type": "int"},
             { "name" : "bytes", "type": "int"},
             { "name" : "action", "type": "string"},
             { "name" : "logStatus", "type" : "string"}
           ]
         }
```

When you define this schema, specify a superset of all keys that might be present in the different types of events that your pipeline delivers to a sink. 

For example, if an event has the possibility of a key missing, add that key in your schema with a `null` value. Null value declarations allow the schema to process non-uniform data (where some events have these keys and others don't). When incoming events do have these keys present, their values are written to sinks. 

This schema definition acts as a filter that only allows defined keys to be sent to sinks, and drops undefined keys from incoming events. 

You can also use `include_keys` and `exclude_keys` in your sink to filter data that's routed to other sinks. These two filters are mutually exclusive, so you can only use one at a time in your schema. Additionally, you can't use them within user-defined schemas. 

To create pipelines with such filters, use the preconfigured sink filter blueprint. For more information, see [Working with blueprints](pipeline-blueprint.md).

## Amazon S3 cross account as a source
<a name="fdsf"></a>

You can grant access across accounts with Amazon S3 so that OpenSearch Ingestion pipelines can access S3 buckets in another account as a source. To enable cross-account access, see [Bucket owner granting cross-account bucket permissions](https://docs.aws.amazon.com/AmazonS3/latest/userguide/example-walkthroughs-managing-access-example2.html) in the *Amazon S3 User Guide*. After you have granted access, ensure that your pipeline role has the required permissions.

Then, you can create a pipeline using `bucket_owners` to enable cross-account access to an Amazon S3 bucket as a source:

```
s3-pipeline:
 source:
  s3:
   notification_type: "sqs"
   codec:
    csv:
     delimiter: ","
     quote_character: "\""
     detect_header: True
   sqs:
    queue_url: "https://sqs.ap-northeast-1.amazonaws.com/401447383613/test-s3-queue"
   bucket_owners:
    my-bucket-01: 123456789012
    my-bucket-02: 999999999999
   compression: "gzip"
```

# Using an OpenSearch Ingestion pipeline with Amazon Security Lake
<a name="configure-client-security-lake"></a>

You can use the [S3 source plugin](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/) to ingest data from [Amazon Security Lake](https://docs.aws.amazon.com/security-lake/latest/userguide/what-is-security-lake.html) into your OpenSearch Ingestion pipeline. Security Lake automatically centralizes security data from AWS environments, on-premises environments, and SaaS providers into a purpose-built data lake. You can create a subscription that replicates data from Security Lake to your OpenSearch Ingestion pipeline, which then writes it to your OpenSearch Service domain or OpenSearch Serverless collection.

To configure your pipeline to read from Security Lake, use the preconfigured Security Lake blueprint. The blueprint includes a default configuration for ingesting Open Cybersecurity Schema Framework (OCSF) parquet files from Security Lake. For more information, see [Working with blueprints](pipeline-blueprint.md).

**Topics**
+ [Using an OpenSearch Ingestion pipeline with Amazon Security Lake as a source](configure-client-source-security-lake.md)
+ [Using an OpenSearch Ingestion pipeline with Amazon Security Lake as a sink](configure-client-sink-security-lake.md)

# Using an OpenSearch Ingestion pipeline with Amazon Security Lake as a source
<a name="configure-client-source-security-lake"></a>

You can use the Amazon S3 source plugin within your OpenSearch Ingestion pipeline to ingest data from Amazon Security Lake. Security Lake automatically centralizes security data from AWS environments, on-premises systems, and SaaS providers into a purpose-built data lake.

Amazon Security Lake has the following metadata attributes within a pipeline:
+ `bucket_name`: The name of the Amazon S3 bucket created by Security Lake for storing security data.
+ `path_prefix`: The custom source name defined in the Security Lake IAM role policy.
+ `region`: The AWS Region where the Security Lake S3 bucket is located.
+ `accountID`: The AWS account ID in which Security Lake is enabled.
+ `sts_role_arn`: The ARN of the IAM role intended for use with Security Lake.

## Prerequisites
<a name="sl-prereqs"></a>

Before you create your OpenSearch Ingestion pipeline, perform the following steps:
+ [Enable Security Lake](https://docs.aws.amazon.com/security-lake/latest/userguide/getting-started.html#enable-service).
+ [Create a subscriber](https://docs.aws.amazon.com/security-lake/latest/userguide/subscriber-data-access.html#create-subscriber-data-access) in Security Lake.
  + Choose the sources that you want to ingest into your pipeline.
  + For **Subscriber credentials**, add the ID of the AWS account where you intend to create the pipeline. For the external ID, specify `OpenSearchIngestion-{accountid}`.
  + For **Data access method**, choose **S3**.
  + For **Notification details**, choose **SQS queue**.

When you create a subscriber, Security Lake automatically creates two inline permissions policies—one for S3 and one for SQS. The policies take the following format: `AmazonSecurityLake-amzn-s3-demo-bucket-S3` and `AmazonSecurityLake-AWSDemo-SQS`. To allow your pipeline to access the subscriber sources, you must associate the required permissions with your pipeline role.

## Configure the pipeline role
<a name="sl-pipeline-role"></a>

Create a new permissions policy in IAM that combines only the required permissions from the two policies that Security Lake automatically created. The following example policy shows the least privilege required for an OpenSearch Ingestion pipeline to read data from multiple Security Lake sources:

------
#### [ JSON ]

****  

```
{
   "Version":"2012-10-17",		 	 	 
   "Statement":[
      {
         "Effect":"Allow",
         "Action":[
            "s3:GetObject"
         ],
         "Resource":[
            "arn:aws:s3:::aws-security-data-lake-us-east-1-abcde/aws/LAMBDA_EXECUTION/1.0/*",
            "arn:aws:s3:::aws-security-data-lake-us-east-1-abcde/aws/S3_DATA/1.0/*",
            "arn:aws:s3:::aws-security-data-lake-us-east-1-abcde/aws/VPC_FLOW/1.0/*",
            "arn:aws:s3:::aws-security-data-lake-us-east-1-abcde/aws/ROUTE53/1.0/*",
            "arn:aws:s3:::aws-security-data-lake-us-east-1-abcde/aws/SH_FINDINGS/1.0/*"
         ]
      },
      {
         "Effect":"Allow",
         "Action":[
            "sqs:ReceiveMessage",
            "sqs:DeleteMessage"
         ],
         "Resource":[
            "arn:aws:sqs:us-east-1:111122223333:AmazonSecurityLake-abcde-Main-Queue"
         ]
      }
   ]
}
```

------

**Important**  
Security Lake doesn’t manage the pipeline role policy for you. If you add or remove sources from your Security Lake subscription, you must manually update the policy. Security Lake creates partitions for each log source, so you need to manually add or remove permissions in the pipeline role.

You must attach these permissions to the IAM role that you specify in the `sts_role_arn` option within the S3 source plugin configuration, under `sqs`.

```
version: "2"
source:
  s3:
    ...
    sqs:
      queue_url: "https://sqs.us-east-1amazonaws.com/account-id/AmazonSecurityLake-amzn-s3-demo-bucket-Main-Queue"
    aws:
      ...
processor:
  ...
sink:
  - opensearch:
      ...
```

## Create the pipeline
<a name="sl-pipeline"></a>

After you add the permissions to the pipeline role, use the preconfigured Security Lake blueprint to create the pipeline. For more information, see [Working with blueprints](pipeline-blueprint.md).

You must specify the `queue_url` option within the `s3` source configuration, which is the Amazon SQS queue URL to read from. To format the URL, locate the **Subscription endpoint** in the subscriber configuration and change `arn:aws:` to `https://`. For example, `https://sqs.us-east-1amazonaws.com/account-id/AmazonSecurityLake-AWSDemo-Main-Queue`.

The `sts_role_arn` that you specify within the S3 source configuration must be the ARN of the pipeline role.

# Using an OpenSearch Ingestion pipeline with Amazon Security Lake as a sink
<a name="configure-client-sink-security-lake"></a>

Use the Amazon S3 sink plugin in OpenSearch Ingestion to send data from any supported source to Amazon Security Lake. Security Lake collects and stores security data from AWS, on-premises environments, and SaaS providers in a dedicated data lake.

To configure your pipeline to write log data to Security Lake, use the preconfigured **Firewall Traffic logs** blueprint. The blueprint includes a default configuration for retrieving raw security logs or other data stored in an Amazon S3 bucket, processing the records, and normalizing them. It then maps the data to Open Cybersecurity Schema Framework (OCSF) and sends the transformed OCSF-compliant data to Security Lake.

The pipeline has the following metadata attributes:
+ `bucket_name`: The name of the Amazon S3 bucket created by Security Lake for storing security data.
+ `path_prefix`: The custom source name defined in the Security Lake IAM role policy.
+ `region`: The AWS Region where the Security Lake S3 bucket is located.
+ `accountID`: The AWS account ID in which Security Lake is enabled.
+ `sts_role_arn`: The ARN of the IAM role intended for use with Security Lake.

## Prerequisites
<a name="configure-clients-lambda-prereqs"></a>

Before you create a pipeline to send data to Security Lake, perform the following steps:
+ **Enable and configure Amazon Security Lake**: Set up Amazon Security Lake to centralize security data from various sources. For instructions, see [Enabling Security Lake using the console](https://docs.aws.amazon.com/security-lake/latest/userguide/get-started-console.html).

  When you select a source, choose **Ingest specific AWS sources** and select one or more log and event sources that you want to ingest.
+ **Set up permissions**: Configure the pipeline role with the required permissions to write data to Security Lake. For more information, see [Pipeline role](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-security-overview.html#pipeline-security-sink).

### Create the pipeline
<a name="create-opensearch-ingestion-pipeline"></a>

Use the preconfigured Security Lake blueprint to create the pipeline. For more information, see [Using blueprints to create a pipeline](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-blueprint.html). 

# Using an OpenSearch Ingestion pipeline with Fluent Bit
<a name="configure-client-fluentbit"></a>

This sample [Fluent Bit configuration file](https://docs.fluentbit.io/manual/pipeline/outputs/http) sends log data from Fluent Bit to an OpenSearch Ingestion pipeline. For more information about ingesting log data, see [Log Analytics](https://github.com/opensearch-project/data-prepper/blob/main/docs/log_analytics.md) in the Data Prepper documentation.

Note the following:
+ The `host` value must be your pipeline endpoint. For example, `pipeline-endpoint.us-east-1osis.amazonaws.com`.
+ The `aws_service` value must be `osis`.
+ The `aws_role_arn` value is the ARN of the AWS IAM role for the client to assume and use for Signature Version 4 authentication.

```
[INPUT]
  name                  tail
  refresh_interval      5
  path                  /var/log/test.log
  read_from_head        true

[OUTPUT]
  Name http
  Match *
  Host pipeline-endpoint.us-east-1osis.amazonaws.com
  Port 443
  URI /log/ingest
  Format json
  aws_auth true
  aws_region region
  aws_service osis
  aws_role_arn arn:aws:iam::account-id:role/ingestion-role
  Log_Level trace
  tls On
```

You can then configure an OpenSearch Ingestion pipeline like the following, which has HTTP as the source:

```
version: "2"
unaggregated-log-pipeline:
  source:
    http:
      path: "/log/ingest"
  processor:
    - grok:
        match:
          log:
            - "%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:network_node} %{NOTSPACE:network_host} %{IPORHOST:source_ip}:%{NUMBER:source_port:int} -> %{IPORHOST:destination_ip}:%{NUMBER:destination_port:int} %{GREEDYDATA:details}"
    - grok:
        match:
          details:
            - "'%{NOTSPACE:http_method} %{NOTSPACE:http_uri}' %{NOTSPACE:protocol}"
            - "TLS%{NOTSPACE:tls_version} %{GREEDYDATA:encryption}"
            - "%{NUMBER:status_code:int} %{NUMBER:response_size:int}"
    - delete_entries:
        with_keys: ["details", "log"]

  sink:
    - opensearch:
        hosts: ["https://search-domain-endpoint.us-east-1es.amazonaws.com"]
        index: "index_name"
        index_type: custom
        bulk_size: 20
        aws:
          region: "region"
```

# Using an OpenSearch Ingestion pipeline with Fluentd
<a name="configure-client-fluentd"></a>

Fluentd is an open-source data collection ecosystem that provides SDKs for different languages and sub-projects like Fluent Bit. This sample [Fluentd configuration file](https://docs.fluentd.org/output/http#example-configuration) sends log data from Fluentd to an OpenSearch Ingestion pipeline. For more information about ingesting log data, see [Log Analytics](https://github.com/opensearch-project/data-prepper/blob/main/docs/log_analytics.md) in the Data Prepper documentation.

Note the following:
+ The `endpoint` value must be your pipeline endpoint. For example, `pipeline-endpoint.us-east-1osis.amazonaws.com/apache-log-pipeline/logs`.
+ The `aws_service` value must be `osis`.
+ The `aws_role_arn` value is the ARN of the AWS IAM role for the client to assume and use for Signature Version 4 authentication.

```
<source>
  @type tail
  path logs/sample.log
  path_key log
  tag apache
  <parse>
    @type none
  </parse>
</source>

<filter apache>
  @type record_transformer
  <record>
    log ${record["message"]}
  </record>
</filter>

<filter apache>
  @type record_transformer
  remove_keys message
</filter>

<match apache>
  @type http
  endpoint pipeline-endpoint.us-east-1osis.amazonaws.com/apache-log-pipeline/logs
  json_array true

  <auth>
    method aws_sigv4
    aws_service osis
    aws_region region
    aws_role_arn arn:aws:iam::account-id:role/ingestion-role
  </auth>

  <format>
    @type json
  </format>

  <buffer>
    flush_interval 1s
  </buffer>
</match>
```

You can then configure an OpenSearch Ingestion pipeline like the following, which has HTTP as the source:

```
version: "2"
apache-log-pipeline:
  source:
    http:
      path: "/${pipelineName}/logs"
  processor:
    - grok:
        match:
          log:
            - "%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:network_node} %{NOTSPACE:network_host} %{IPORHOST:source_ip}:%{NUMBER:source_port:int} -> %{IPORHOST:destination_ip}:%{NUMBER:destination_port:int} %{GREEDYDATA:details}"
  sink:
    - opensearch:
        hosts: ["https://search-domain-endpoint.us-east-1es.amazonaws.com"]
        index: "index_name"
        aws_region: "region"
        aws_sigv4: true
```

# Using an OpenSearch Ingestion pipeline with machine learning offline batch inference
<a name="configure-clients-ml-commons-batch"></a>

Amazon OpenSearch Ingestion (OSI) pipelines support machine learning (ML) offline batch inference processing to efficiently enrich large volumes of data at low cost. Use offline batch inference whenever you have large datasets that can be processed asynchronously. Offline batch inference works with Amazon Bedrock and SageMaker models. This feature is available in all AWS Regions that support OpenSearch Ingestion with OpenSearch Service 2.17\$1 domains.

**Note**  
For real-time inference processing, use [Amazon OpenSearch Service ML connectors for third-party platforms](ml-external-connector.md).

Offline batch inference processing leverages a feature of OpenSearch called ML Commons. *ML Commons* provides ML algorithms through transport and REST API calls. Those calls choose the right nodes and resources for each ML request and monitor ML tasks to ensure uptime. In this way, ML Commons allows you to leverage existing open-source ML algorithms and reduce the effort required to develop new ML features. For more information about ML Commons, see [Machine learning](https://docs.opensearch.org/latest/ml-commons-plugin/) in the OpenSearch.org documentation. 

## How it works
<a name="configure-clients-ml-commons-batch-how"></a>

You can create an offline batch inference pipeline on OpenSearch Ingestion by [adding a machine learning inference processor](https://docs.opensearch.org/latest/ingest-pipelines/processors/ml-inference/) to a pipeline. This processor enables your pipeline to connect to AI services like SageMaker to run batch inference jobs. You can configure your processor to connect to your desired AI service through the AI connectors (with [batch\$1predict](https://docs.opensearch.org/latest/ml-commons-plugin/api/model-apis/batch-predict/) support) running on your target domain.

OpenSearch Ingestion uses the `ml_inference` processor with ML Commons to create offline batch inference jobs. ML Commons then uses the [batch\$1predict](https://docs.opensearch.org/latest/ml-commons-plugin/api/model-apis/batch-predict/) API, which performs inference on large datasets in an offline asynchronous mode using a model deployed on external model servers in Amazon Bedrock, Amazon SageMaker, Cohere, and OpenAI. The following diagram shows an OpenSearch Ingestion pipeline that orchestrates multiple components to perform this process end to end:

![\[Three-pipeline architecture of batch AI inference processing.\]](http://docs.aws.amazon.com/opensearch-service/latest/developerguide/images/ml_processor.png)


The pipeline components work as follows:

**Pipeline 1 (Data preparation and transformation)\$1:**
+ Source: Data is scanned from your external source supported by OpenSearch Ingestion.
+ Data processors: The raw data is processed and transformed to the correct format for batch inference on the integrated AI service.
+ S3 (Sink): The processed data is staged in an Amazon S3 bucket ready to serve as input for running batch inference jobs on the integrated AI service. 

**Pipeline 2 (Trigger ML batch\$1inference):**
+ Source: Automated S3 event detection of new files created by output of Pipeline 1.
+ Ml\$1inference processor: Processor that generates ML inferences through an asynchronous batch job. It connects to AI services through the configured AI connector that's running on your target domain.
+ Task ID: Each batch job is associated with a task ID in ml-commons for tracking and management.
+ OpenSearch ML Commons: ML Commons, which hosts the model for real-time neural search, manages the connectors to remote AI servers, and serves the APIs for batch inference and jobs management.
+ AI services: OpenSearch ML Commons interacts with AI services like Amazon Bedrock and Amazon SageMaker to perform batch inference on the data, producing predictions or insights. The results are saved asynchronously to a separate S3 file.

**Pipeline 3 (Bulk ingestion):**
+ S3 (source): The results of the batch jobs are stored in S3, which is the source of this pipeline.
+ Data transformation processors: Further processing and transformation are applied to the batch inference output before ingestion. This ensures the data is mapped correctly in the OpenSearch index.
+ OpenSearch index (Sink): The processed results are indexed into OpenSearch for storage, search, and further analysis.

**Note**  
\$1The process described by Pipeline 1 is optional. If you prefer, you can skip that process and simply upload your prepared data in the S3 sink to create batch jobs.

## About the ml\$1inference processor
<a name="configure-clients-ml-commons-batch-inference-processor"></a>

OpenSearch Ingestion uses a specialized integration between the S3 Scan source and ML inference processor for batch processing. The S3 Scan operates in metadata-only mode to efficiently collect S3 file information without reading the actual file contents. The `ml_inference` processor uses the S3 file URLs to coordinate with ML Commons for batch processing. This design optimizes the batch inference workflow by minimizing unnecessary data transfer during the scanning phase. You define the `ml_inference` processor using parameters. Here is an example: 

```
processor:
    - ml_inference:
        # The endpoint URL of your OpenSearch domain
        host: "https://AWStest-offlinebatch-123456789abcdefg.us-west-2.es.amazonaws.com"
        
        # Type of inference operation:
        # - batch_predict: for batch processing
        # - predict: for real-time inference
        action_type: "batch_predict"
        
        # Remote ML model service provider (Amazon Bedrock or SageMaker)
        service_name: "bedrock"
        
        # Unique identifier for the ML model
        model_id: "AWSTestModelID123456789abcde"
        
        # S3 path where batch inference results will be stored
        output_path: "s3://amzn-s3-demo-bucket/"
      
        # Supports ISO_8601 notation strings like PT20.345S or PT15M
        # These settings control how long to keep your inputs in the processor for retry on throttling errors
        retry_time_window: "PT9M"
        
        # AWS configuration settings
        aws:
            # AWS Region where the Lambda function is deployed
            region: "us-west-2"
            # IAM role ARN for Lambda function execution
            sts_role_arn: "arn:aws::iam::account_id:role/Admin"
        
        # Dead-letter queue settings for storing errors
        dlq:
          s3:
            region: us-west-2
            bucket: batch-inference-dlq
            key_path_prefix: bedrock-dlq
            sts_role_arn: arn:aws:iam::account_id:role/OSI-invoke-ml
            
        # Conditional expression that determines when to trigger the processor
        # In this case, only process when bucket matches "amzn-s3-demo-bucket"
        ml_when: /bucket == "amzn-s3-demo-bucket"
```

### Ingestion performance improvements using the ml\$1inference processor
<a name="configure-clients-ml-commons-batch-ingestion-performance"></a>

The OpenSearch Ingestion `ml_inference` processor significantly enhances data ingestion performance for ML-enabled search. The processor is ideally suited for use cases requiring machine learning model-generated data, including semantic search, multimodal search, document enrichment, and query understanding. In semantic search, the processor can accelerate the creation and ingestion of large-volume, high-dimensional vectors by an order of magnitude.

The processor's offline batch inference capability offers distinct advantages over real-time model invocation. While real-time processing requires a live model server with capacity limitations, batch inference dynamically scales compute resources on demand and processes data in parallel. For example, when the OpenSearch Ingestion pipeline receives one billion source data requests, it creates 100 S3 files for ML batch inference input. The `ml_inference` processor then initiates a SageMaker batch job using 100 `ml.m4.xlarge` Amazon Elastic Compute Cloud (Amazon EC2) instances, completing the vectorization of one billion requests in 14 hours—a task that would be virtually impossible to accomplish in real-time mode.

## Configure the ml\$1inference processor to ingest data requests for a semantic search
<a name="configure-clients-ml-commons-configuring"></a>

The following procedures walk you through the process of setting up and configuring the OpenSearch Ingestion `ml_inference` processor to ingest one billion data requests for semantic search using a text embedding model.

**Topics**
+ [Step 1: Create connectors and register models in OpenSearch](#configure-clients-ml-commons-configuring-create-connectors)
+ [Step 2: Create an OpenSearch Ingestion pipeline for ML offline batch inference](#configure-clients-ml-commons-configuring-pipeline)
+ [Step 3: Prepare your data for ingestion](#configure-clients-ml-commons-configuring-data)
+ [Step 4: Monitor the batch inference job](#configure-clients-ml-commons-configuring-monitor)
+ [Step 5: Run search](#configure-clients-ml-commons-configuring-semantic-search)

### Step 1: Create connectors and register models in OpenSearch
<a name="configure-clients-ml-commons-configuring-create-connectors"></a>

For the following procedure, use the ML Commons [batch\$1inference\$1sagemaker\$1connector\$1blueprint](https://github.com/opensearch-project/ml-commons/blob/main/docs/remote_inference_blueprints/batch_inference_sagemaker_connector_blueprint.md) to create a connector and model in Amazon SageMaker. If you prefer to use OpenSearch CloudFormation integration templates, see [(Alternative procedure) Step 1: Create connectors and models using an CloudFormation integration template](#configure-clients-ml-commons-configuring-create-connectors-alternative) later in this section. 

**To create connectors and register models in OpenSearch**

1. Create a Deep Java Library (DJL) ML model in SageMaker for batch transform. To view other DJL models, see [semantic\$1search\$1with\$1CFN\$1template\$1for\$1Sagemaker](https://github.com/opensearch-project/ml-commons/blob/main/docs/tutorials/aws/semantic_search_with_CFN_template_for_Sagemaker.md) on GitHub:

   ```
   POST https://api.sagemaker.us-east-1.amazonaws.com/CreateModel
   {
      "ExecutionRoleArn": "arn:aws:iam::123456789012:role/aos_ml_invoke_sagemaker",
      "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines",
      "PrimaryContainer": { 
         "Environment": { 
            "SERVING_LOAD_MODELS" : "djl://ai.djl.huggingface.pytorch/sentence-transformers/all-MiniLM-L6-v2" 
         },
         "Image": "763104351884.dkr.ecr.us-east-1.amazonaws.com/djl-inference:0.29.0-cpu-full"
      }
   }
   ```

1. Create a connector with `batch_predict` as the new `action` type in the `actions` field:

   ```
   POST /_plugins/_ml/connectors/_create
   {
     "name": "DJL Sagemaker Connector: all-MiniLM-L6-v2",
     "version": "1",
     "description": "The connector to sagemaker embedding model all-MiniLM-L6-v2",
     "protocol": "aws_sigv4",
     "credential": {
     "roleArn": "arn:aws:iam::111122223333:role/SageMakerRole"
   },
     "parameters": {
       "region": "us-east-1",
       "service_name": "sagemaker",
       "DataProcessing": {
         "InputFilter": "$.text",
         "JoinSource": "Input",
         "OutputFilter": "$"
       },
       "MaxConcurrentTransforms": 100,
       "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines",
       "TransformInput": {
         "ContentType": "application/json",
         "DataSource": {
           "S3DataSource": {
             "S3DataType": "S3Prefix",
             "S3Uri": "s3://offlinebatch/msmarcotests/"
           }
         },
         "SplitType": "Line"
       },
       "TransformJobName": "djl-batch-transform-1-billion",
       "TransformOutput": {
         "AssembleWith": "Line",
         "Accept": "application/json",
         "S3OutputPath": "s3://offlinebatch/msmarcotestsoutputs/"
       },
       "TransformResources": {
         "InstanceCount": 100,
         "InstanceType": "ml.m4.xlarge"
       },
       "BatchStrategy": "SingleRecord"
     },
     "actions": [
       {
         "action_type": "predict",
         "method": "POST",
         "headers": {
           "content-type": "application/json"
         },
         "url": "https://runtime.sagemaker.us-east-1.amazonaws.com/endpoints/OpenSearch-sagemaker-060124023703/invocations",
         "request_body": "${parameters.input}",
         "pre_process_function": "connector.pre_process.default.embedding",
         "post_process_function": "connector.post_process.default.embedding"
       },
       {
         "action_type": "batch_predict",
         "method": "POST",
         "headers": {
           "content-type": "application/json"
         },
         "url": "https://api.sagemaker.us-east-1.amazonaws.com/CreateTransformJob",
         "request_body": """{ "BatchStrategy": "${parameters.BatchStrategy}", "ModelName": "${parameters.ModelName}", "DataProcessing" : ${parameters.DataProcessing}, "MaxConcurrentTransforms": ${parameters.MaxConcurrentTransforms}, "TransformInput": ${parameters.TransformInput}, "TransformJobName" : "${parameters.TransformJobName}", "TransformOutput" : ${parameters.TransformOutput}, "TransformResources" : ${parameters.TransformResources}}"""
       },
       {
         "action_type": "batch_predict_status",
         "method": "GET",
         "headers": {
           "content-type": "application/json"
         },
         "url": "https://api.sagemaker.us-east-1.amazonaws.com/DescribeTransformJob",
         "request_body": """{ "TransformJobName" : "${parameters.TransformJobName}"}"""
       },
       {
         "action_type": "cancel_batch_predict",
         "method": "POST",
         "headers": {
           "content-type": "application/json"
         },
         "url": "https://api.sagemaker.us-east-1.amazonaws.com/StopTransformJob",
         "request_body": """{ "TransformJobName" : "${parameters.TransformJobName}"}"""
       }
     ]
   }
   ```

1. Use the returned connector ID to register the SageMaker model:

   ```
   POST /_plugins/_ml/models/_register
   {
       "name": "SageMaker model for batch",
       "function_name": "remote",
       "description": "test model",
       "connector_id": "example123456789-abcde"
   }
   ```

1. Invoke the model with the `batch_predict` action type:

   ```
   POST /_plugins/_ml/models/teHr3JABBiEvs-eod7sn/_batch_predict
   {
     "parameters": {
       "TransformJobName": "SM-offline-batch-transform"
     }
   }
   ```

   The response contains a task ID for the batch job:

   ```
   {
    "task_id": "exampleIDabdcefd_1234567",
    "status": "CREATED"
   }
   ```

1. Check the batch job status by calling the Get Task API using the task ID:

   ```
   GET /_plugins/_ml/tasks/exampleIDabdcefd_1234567
   ```

   The response contains the task status:

   ```
   {
     "model_id": "nyWbv5EB_tT1A82ZCu-e",
     "task_type": "BATCH_PREDICTION",
     "function_name": "REMOTE",
     "state": "RUNNING",
     "input_type": "REMOTE",
     "worker_node": [
       "WDZnIMcbTrGtnR4Lq9jPDw"
     ],
     "create_time": 1725496527958,
     "last_update_time": 1725496527958,
     "is_async": false,
     "remote_job": {
       "TransformResources": {
         "InstanceCount": 1,
         "InstanceType": "ml.c5.xlarge"
       },
       "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines",
       "TransformOutput": {
         "Accept": "application/json",
         "AssembleWith": "Line",
         "KmsKeyId": "",
         "S3OutputPath": "s3://offlinebatch/output"
       },
       "CreationTime": 1725496531.935,
       "TransformInput": {
         "CompressionType": "None",
         "ContentType": "application/json",
         "DataSource": {
           "S3DataSource": {
             "S3DataType": "S3Prefix",
             "S3Uri": "s3://offlinebatch/sagemaker_djl_batch_input.json"
           }
         },
         "SplitType": "Line"
       },
       "TransformJobArn": "arn:aws:sagemaker:us-east-1:111122223333:transform-job/SM-offline-batch-transform15",
       "TransformJobStatus": "InProgress",
       "BatchStrategy": "SingleRecord",
       "TransformJobName": "SM-offline-batch-transform15",
       "DataProcessing": {
         "InputFilter": "$.content",
         "JoinSource": "Input",
         "OutputFilter": "$"
       }
     }
   }
   ```

#### (Alternative procedure) Step 1: Create connectors and models using an CloudFormation integration template
<a name="configure-clients-ml-commons-configuring-create-connectors-alternative"></a>

If you prefer, you can use AWS CloudFormation to automatically create all required Amazon SageMaker connectors and models for ML inference. This approach simplifies setup by using a preconfigured template available in the Amazon OpenSearch Service console. For more information, see [Using CloudFormation to set up remote inference for semantic search](cfn-template.md).

**To deploy an CloudFormation stack that creates all the required SageMaker connectors and models**

1. Open the Amazon OpenSearch Service console.

1. In the navigation pane, choose **Integrations**.

1. In the Search field, enter **SageMaker**, and then choose **Integration with text embedding models through Amazon SageMaker**.

1. Choose **Configure domain** and then choose **Configure VPC domain** or **Configure public domain**.

1. Enter information in the template fields. For **Enable Offline Batch Inference**, choose **true** to provision resources for offline batch processing.

1. Choose **Create** to create the CloudFormation stack.

1. After the stack is created, open the **Outputs** tab in the CloudFormation console Locate the **connector\$1id** and **model\$1id**. You will need these values later when you configure the pipeline.

### Step 2: Create an OpenSearch Ingestion pipeline for ML offline batch inference
<a name="configure-clients-ml-commons-configuring-pipeline"></a>

Use the following sample to create an OpenSearch Ingestion pipeline for ML offline batch inference. For more information about creating a pipeline for OpenSearch Ingestion, see [Creating Amazon OpenSearch Ingestion pipelines](creating-pipeline.md).

**Before you begin**

In the following sample, you specify an IAM role ARN for the `sts_role_arn` parameter. Use the following procedure to verify that this role is mapped to the backend role that has access to ml-commons in OpenSearch.

1. Navigate to the OpenSearch Dashboards plugin for your OpenSearch Service domain. You can find the dashboards endpoint on your domain dashboard on the OpenSearch Service console.

1. From the main menu choose **Security**, **Roles**, and select the **ml\$1full\$1access** role.

1. Choose **Mapped users**, **Manage mapping**. 

1. Under **Backend roles**, enter the ARN of the Lambda role that needs permission to call your domain. Here is an example: arn:aws:iam::*111122223333*:role/*lambda-role*

1. Select **Map** and confirm the user or role shows up under **Mapped users**.

**Sample to create an OpenSearch Ingestion pipeline for ML offline batch inference**

```
version: '2'
extension:
  osis_configuration_metadata:
    builder_type: visual
sagemaker-batch-job-pipeline:
  source:
    s3:
      acknowledgments: true
      delete_s3_objects_on_read: false
      scan:
        buckets:
          - bucket:
              name: name
              data_selection: metadata_only
              filter:
                include_prefix:
                  - sagemaker/sagemaker_djl_batch_input
                exclude_suffix:
                  - .manifest
          - bucket:
              name: name
              data_selection: data_only
              filter:
                include_prefix:
                  - sagemaker/output/
        scheduling:
          interval: PT6M
      aws:
        region: name
      default_bucket_owner: account_ID
      codec:
        ndjson:
          include_empty_objects: false
      compression: none
      workers: '1'
  processor:
    - ml_inference:
        host: "https://search-AWStest-offlinebatch-123456789abcdef.us-west-2.es.amazonaws.com"
        aws_sigv4: true
        action_type: "batch_predict"
        service_name: "sagemaker"
        model_id: "model_ID"
        output_path: "s3://AWStest-offlinebatch/sagemaker/output"
        aws:
          region: "us-west-2"
          sts_role_arn: "arn:aws:iam::account_ID:role/Admin"
        ml_when: /bucket == "AWStest-offlinebatch"
        dlq:
          s3:
            region: us-west-2
            bucket: batch-inference-dlq
            key_path_prefix: bedrock-dlq
            sts_role_arn: arn:aws:iam::account_ID:role/OSI-invoke-ml
    - copy_values:
        entries:
          - from_key: /text
            to_key: chapter
          - from_key: /SageMakerOutput
            to_key: chapter_embedding
          - delete_entries:
            with_keys:
          - text
          - SageMakerOutput
  sink:
    - opensearch:
        hosts: ["https://search-AWStest-offlinebatch-123456789abcdef.us-west-2.es.amazonaws.com"]
        aws:
          serverless: false
          region: us-west-2
        routes:
          - ml-ingest-route
        index_type: custom
        index: test-nlp-index
  routes:
    - ml-ingest-route: /chapter != null and /title != null
```

### Step 3: Prepare your data for ingestion
<a name="configure-clients-ml-commons-configuring-data"></a>

To prepare your data for ML offline batch inference processing, either prepare the data yourself using your own tools or processes or use the [OpenSearch Data Prepper](https://docs.opensearch.org/latest/data-prepper/getting-started/). Verify that the data is organized into the correct format either by using a pipeline to consume the data from your data source or by creating a machine learning dataset.

The following example uses the [MS MARCO](https://microsoft.github.io/msmarco/Datasets.html) dataset, which includes a collection of real user queries for natural language processing tasks. The dataset is structured in JSONL format, where each line represents a request sent to the ML embedding model:

```
{"_id": "1185869", "text": ")what was the immediate impact of the Paris Peace Treaties of 1947?", "metadata": {"world war 2"}}
{"_id": "1185868", "text": "_________ justice is designed to repair the harm to victim, the community and the offender caused by the offender criminal act. question 19 options:", "metadata": {"law"}}
{"_id": "597651", "text": "what is amber", "metadata": {"nothing"}}
{"_id": "403613", "text": "is autoimmune hepatitis a bile acid synthesis disorder", "metadata": {"self immune"}}
...
```

To test using the MS MARCO dataset, imagine a scenario where you construct one billion input requests distributed across 100 files, each containing 10 million requests. The files would be stored in Amazon S3 with the prefix s3://offlinebatch/sagemaker/sagemaker\$1djl\$1batch\$1input/. The OpenSearch Ingestion pipeline would scan these 100 files simultaneously and initiate a SageMaker batch job with 100 workers for parallel processing, enabling efficient vectorization and ingestion of the one billion documents into OpenSearch.

In production environments, you can use an OpenSearch Ingestion pipeline to generate S3 files for batch inference input. The pipeline supports various [data sources](https://docs.opensearch.org/latest/data-prepper/pipelines/configuration/sources/sources/) and operates on a schedule to continuously transform source data into S3 files. These files are then automatically processed by AI servers through scheduled offline batch jobs, ensuring continuous data processing and ingestion.

### Step 4: Monitor the batch inference job
<a name="configure-clients-ml-commons-configuring-monitor"></a>

You can monitor the batch inference jobs using the SageMaker console or the AWS CLI. You can also use the Get Task API to monitor batch jobs:

```
GET /_plugins/_ml/tasks/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "state": "RUNNING"
          }
        }
      ]
    }
  },
  "_source": ["model_id", "state", "task_type", "create_time", "last_update_time"]
}
```

The API returns a list of active batch job tasks:

```
{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 3,
      "relation": "eq"
    },
    "max_score": 0.0,
    "hits": [
      {
        "_index": ".plugins-ml-task",
        "_id": "nyWbv5EB_tT1A82ZCu-e",
        "_score": 0.0,
        "_source": {
          "model_id": "nyWbv5EB_tT1A82ZCu-e",
          "state": "RUNNING",
          "task_type": "BATCH_PREDICTION",
          "create_time": 1725496527958,
          "last_update_time": 1725496527958
        }
      },
      {
        "_index": ".plugins-ml-task",
        "_id": "miKbv5EB_tT1A82ZCu-f",
        "_score": 0.0,
        "_source": {
          "model_id": "miKbv5EB_tT1A82ZCu-f",
          "state": "RUNNING",
          "task_type": "BATCH_PREDICTION",
          "create_time": 1725496528123,
          "last_update_time": 1725496528123
        }
      },
      {
        "_index": ".plugins-ml-task",
        "_id": "kiLbv5EB_tT1A82ZCu-g",
        "_score": 0.0,
        "_source": {
          "model_id": "kiLbv5EB_tT1A82ZCu-g",
          "state": "RUNNING",
          "task_type": "BATCH_PREDICTION",
          "create_time": 1725496529456,
          "last_update_time": 1725496529456
        }
      }
    ]
  }
}
```

### Step 5: Run search
<a name="configure-clients-ml-commons-configuring-semantic-search"></a>

After monitoring the batch inference job and confirming it completed, you can run various types of AI searches, including semantic, hybrid, conversational (with RAG), neural sparse, and multimodal. For more information about AI searches supported by OpenSearch Service, see [AI search](https://docs.opensearch.org/latest/vector-search/ai-search/index/). 

To search raw vectors, use the `knn` query type, provide the `vector` array as input, and specify the `k` number of returned results:

```
GET /my-raw-vector-index/_search
{
  "query": {
    "knn": {
      "my_vector": {
        "vector": [0.1, 0.2, 0.3],
        "k": 2
      }
    }
  }
}
```

To run an AI-powered search, use the `neural` query type. Specify the `query_text` input, the `model_id` of the embedding model you configured in the OpenSearch Ingestion pipeline, and the `k` number of returned results. To exclude embeddings from search results, specify the name of the embedding field in the `_source.excludes` parameter:

```
GET /my-ai-search-index/_search
{
  "_source": {
    "excludes": [
      "output_embedding"
    ]
  },
  "query": {
    "neural": {
      "output_embedding": {
        "query_text": "What is AI search?",
        "model_id": "mBGzipQB2gmRjlv_dOoB",
        "k": 2
      }
    }
  }
}
```

# Using an OpenSearch Ingestion pipeline with OpenTelemetry Collector
<a name="configure-client-otel"></a>

You can use the [OpenTelemetry Collector](https://opentelemetry.io/docs/collector/) to ingest logs, traces, and metrics into OpenSearch Ingestion pipelines. A single pipeline can be used to ingest all logs, traces, and metrics to different indices on a domain or collection. You can also use pipelines to ingest only logs, traces, or metrics individually. 

**Topics**
+ [Prerequisites](#otel-prereqs)
+ [Step 1: Configure the pipeline role](#otel-pipeline-role)
+ [Step 2: Create the pipeline](#create-otel-pipeline)
+ [Cross-account Connectivity](#x-account-connectivity)
+ [Limitations](#otel-limitations)
+ [Recommended CloudWatch Alarms for OpenTelemetry sources](#otel-pipeline-metrics)

## Prerequisites
<a name="otel-prereqs"></a>

While setting up the [OpenTelemetry configuration file](https://opentelemetry.io/docs/collector/configuration/), you must configure the following in order for ingestion to occur: 
+ The ingestion role needs the `osis:Ingest` permission to interact with the pipeline. For more information, see [Ingestion role](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-security-overview.html#pipeline-security-same-account). 
+ The endpoint value must include your pipeline endpoint. For example, `https://pipeline-endpoint.us-east-1.osis.amazonaws.com.`
+ The service value must be `osis`.
+ The compression option for the OTLP/HTTP Exporter must match the compression option on the pipeline's selected source.

```
extensions:
    sigv4auth:
        region: "region"
        service: "osis"

exporters:
    otlphttp:
        logs_endpoint: "https://pipeline-endpoint.us-east-1.osis.amazonaws.com/v1/logs"
        metrics_endpoint: "https://pipeline-endpoint.us-east-1.osis.amazonaws.com/v1/metrics"
        traces_endpoint: "https://pipeline-endpoint.us-east-1.osis.amazonaws.com/v1/traces"
        auth:
            authenticator: sigv4auth
        compression: none

service:
    extensions: [sigv4auth]
    pipelines:
        traces:
        receivers: [jaeger]
        exporters: [otlphttp]
```

## Step 1: Configure the pipeline role
<a name="otel-pipeline-role"></a>

 After setting up the OpenTelemetry collector configuration, [ set up the pipeline role](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-security-overview.html#pipeline-security-sink) that you want to use in your pipeline configuration. There are not specific permissions that the pipeline role needs for the OTLP source, only permissions to grant pipelines access to the OpenSearch domain or collection. 

## Step 2: Create the pipeline
<a name="create-otel-pipeline"></a>

 You can then configure an OpenSearch Ingestion pipeline like the following, which specifies OTLP as the source. You can also configure OpenTelemetry logs, metrics, and traces as individual sources. 

OTLP source pipeline configuration:

```
version: 2
otlp-pipeline:
    source:
        otlp:
            logs_path: /otlp-pipeline/v1/logs
            traces_path: /otlp-pipeline/v1/traces
            metrics_path: /otlp-pipeline/v1/metrics
    sink:
        - opensearch:
            hosts: ["https://search-mydomain.region.es.amazonaws.com"]
            index: "ss4o_metrics-otel-%{yyyy.MM.dd}"
            index_type: custom
            aws:
                region: "region"
```

OpenTelemetry Logs pipeline configuration:

```
version: 2
otel-logs-pipeline:
  source:
    otel_logs_source:
        path: /otel-logs-pipeline/v1/logs
  sink:
    - opensearch:
        hosts: ["https://search-mydomain.region.es.amazonaws.com"]
        index: "ss4o_metrics-otel-%{yyyy.MM.dd}"
        index_type: custom
        aws:
            region: "region"
```

OpenTelemetry Metrics pipeline configuration:

```
version: 2
otel-metrics-pipeline:
  source:
    otel_metrics_source:
        path: /otel-metrics-pipeline/v1/metrics
  sink:
    - opensearch:
        hosts: ["https://search-mydomain.region.es.amazonaws.com"]
        index: "ss4o_metrics-otel-%{yyyy.MM.dd}"
        index_type: custom
        aws:
            region: "region"
```

OpenTelemetry Traces pipeline configuration:

```
version: 2
otel-trace-pipeline:
  source:
    otel_trace_source:
        path: /otel-traces-pipeline/v1/traces
  sink:
    - opensearch:
        hosts: ["https://search-mydomain.region.es.amazonaws.com"]
        index: "ss4o_metrics-otel-%{yyyy.MM.dd}"
        index_type: custom
        aws:
            region: "region"
```

You can use a preconfigured blueprint to create this pipeline. For more information, see [Working with blueprints](pipeline-blueprint.md). 

## Cross-account Connectivity
<a name="x-account-connectivity"></a>

 OpenSearch Ingestion pipelines with OpenTelemetry sources have cross-account ingestion capability. Amazon OpenSearch Ingestion enables you to share pipelines across AWS accounts from a virtual private cloud (VPC) to a pipeline endpoint in a separate VPC. For more information, see [Configuring OpenSearch Ingestion pipelines for cross-account ingestion](cross-account-pipelines.md). 

## Limitations
<a name="otel-limitations"></a>

 The OpenSearch Ingestion pipeline cannot receive any requests greater than 20mb. This value is configured by the user in the `max_request_length` option. This option defaults to 10mb. 

## Recommended CloudWatch Alarms for OpenTelemetry sources
<a name="otel-pipeline-metrics"></a>

 The following CloudWatch metrics are recommended for monitoring the performance of your ingestion pipeline. These metrics can help you identify the amount of data processed from exports, the amount of events processed from streams, the errors in processing exports and stream events, and the number of documents written to the destination. You can setup CloudWatch alarms to perform an action when one of these metrics exceed a specified value for a specified amount of time. 

 The CloudWatch metrics for OTLP source are formatted as `{pipeline-name}.otlp.{logs | traces | metrics}.{metric-name}`. For example, `otel-pipeline.otlp.metrics.requestTimeouts.count`. 

 In the case of using an individual OpenTelemetry source, the metrics will be formatted as `{pipeline-name}.{source-name}.{metric-name}`. For example, `trace-pipeline.otel_trace_source.requestTimeouts.count`. 

All three OpenTelemetry data types will have the same metrics, but for brevity the metrics will only be listed in the below table for OTLP source log type data.


| Metric | Description | 
| --- |--- |
| otel-pipeline.BlockingBuffer.bufferUsage.value |  Indicates how much of the buffer is being utilized.  | 
|  otel-pipeline.otlp.logs.requestTimeouts.count  |  The number of requests that have timed out.  | 
|  otel-pipeline.otlp.logs.requestsReceived.count  |  The number of requests received by the OpenTelemetry Collector.  | 
|  otel-pipeline.otlp.logs.badRequests.count  |  The number of malformed requests received by the OpenTelemetry Collector.  | 
|  otel-pipeline.otlp.logs.requestsTooLarge.count  |  The number of requests greater than the maximum of 20mb received by the OpenTelemetry Collector.  | 
|  otel-pipeline.otlp.logs.internalServerError.count  | The number of HTTP 500 errors received from the OpenTelemetry Collector. | 
|  otel-pipeline.opensearch.bulkBadRequestErrors.count  | Count of errors during bulk requests due to malformed request. | 
|  otel-pipeline.opensearch.bulkRequestLatency.avg  | Average latency for bulk write requests made to OpenSearch. | 
|  otel-pipeline.opensearch.bulkRequestNotFoundErrors.count  | Number of bulk requests that failed because the target data could not be found. | 
|  otel-pipeline.opensearch.bulkRequestNumberOfRetries.count  | Number of retries by OpenSearch Ingestion pipelines to write OpenSearch cluster. | 
|  otel-pipeline.opensearch.bulkRequestSizeBytes.sum  | Total size in bytes of all bulk requests made to OpenSearch. | 
|  otel-pipeline.opensearch.documentErrors.count  | Number of errors when sending documents to OpenSearch. The documents causing the errors witll be sent to DLQ. | 
|  otel-pipeline.opensearch.documentsSuccess.count  | Number of documents successfully written to an OpenSearch cluster or collection. | 
|  otel-pipeline.opensearch.documentsSuccessFirstAttempt.count  | Number of documents successfully indexed in OpenSearch on the first attempt. | 
|  `otel-pipeline.opensearch.documentsVersionConflictErrors.count`  | Count of errors due to version conflicts in documents during processing. | 
|  `otel-pipeline.opensearch.PipelineLatency.avg`  | Average latency of OpenSearch Ingestion pipeline to process the data by reading from the source to writing to the destination. | 
|  otel-pipeline.opensearch.PipelineLatency.max  | Maximum latency of OpenSearch Ingestion pipeline to process the data by reading from the source to writing the destination. | 
|  otel-pipeline.opensearch.recordsIn.count  | Count of records successfully ingested into OpenSearch. This metric is essential for tracking the volume of data being processed and stored. | 
|  otel-pipeline.opensearch.s3.dlqS3RecordsFailed.count  | Number of records that failed to write to DLQ. | 
|  otel-pipeline.opensearch.s3.dlqS3RecordsSuccess.count  | Number of records that are written to DLQ. | 
|  otel-pipeline.opensearch.s3.dlqS3RequestLatency.count  | Count of latency measurements for requests to the Amazon S3 dead-letter queue. | 
|  otel-pipeline.opensearch.s3.dlqS3RequestLatency.sum  | Total latency for all requests to the Amazon S3 dead-letter queue | 
|  otel-pipeline.opensearch.s3.dlqS3RequestSizeBytes.sum  | Total size in bytes of all requests made to the Amazon S3 dead-letter queue. | 
|  otel-pipeline.recordsProcessed.count  | Total number of records processed in the pipeline, a key metric for overal throughput. | 
|  `otel-pipeline.opensearch.bulkRequestInvalidInputErrors.count`  | Count of bulk request errors in OpenSearch due to invalid input, crucial for monitoring data quality and operational issues. | 

# Using an OpenSearch Ingestion pipeline with Amazon Managed Service for Prometheus
<a name="configure-client-prometheus"></a>

You can use Amazon Managed Service for Prometheus as a destination for your OpenSearch Ingestion pipeline to store metrics in time series format. The Prometheus sink allows you to send OpenTelemetry metrics or other time series data from your pipeline to an Amazon Managed Service for Prometheus workspace for monitoring, alerting, and analysis.

The `prometheus` sink plugin enables OpenSearch Ingestion pipelines to write metrics data to Amazon Managed Service for Prometheus workspaces using the Prometheus remote write protocol. This integration allows you to:
+ Store time series metrics data in Amazon Managed Service for Prometheus
+ Monitor and alert on metrics using Amazon Managed Service for Prometheus and Amazon Managed Grafana
+ Route metrics to multiple destinations simultaneously (for example, OpenSearch and Amazon Managed Service for Prometheus)
+ Process OpenTelemetry metrics from external agents or generate metrics within the pipeline

**Topics**
+ [Prerequisites](#prometheus-prereqs)
+ [Step 1: Configure the pipeline role](#prometheus-pipeline-role)
+ [Step 2: Create the pipeline](#prometheus-pipeline)
+ [Monitoring and troubleshooting](#prometheus-monitoring)
+ [Limitations](#prometheus-limitations)
+ [Best practices](#prometheus-best-practices)

## Prerequisites
<a name="prometheus-prereqs"></a>

Before you configure the Prometheus sink, ensure you have the following:
+ **Amazon Managed Service for Prometheus workspace**: Create a workspace in the same AWS account and AWS Region as your OpenSearch Ingestion pipeline. For instructions, see [Creating a workspace](https://docs.aws.amazon.com/prometheus/latest/userguide/AMP-onboard-create-workspace.html) in the *Amazon Managed Service for Prometheus User Guide*.
+ **IAM permissions**: Configure an IAM role with permissions to write to Amazon Managed Service for Prometheus. For more information, see [Step 1: Configure the pipeline role](#prometheus-pipeline-role).

**Note**  
Amazon Managed Service for Prometheus workspaces must use AWS service-managed AWS KMS keys. Customer-managed AWS KMS keys are not currently supported for Amazon Managed Service for Prometheus sinks in OpenSearch Ingestion pipelines.

## Step 1: Configure the pipeline role
<a name="prometheus-pipeline-role"></a>

The Prometheus sink automatically inherits the [pipeline role's](pipeline-security-overview.md#pipeline-security-sink) IAM permissions for authentication, so no additional role configuration (like `sts_role_arn`) is required in the sink settings.

The following sample policy shows the required permissions for using Amazon Managed Service for Prometheus as a sink:

```
{
  "Version": "2012-10-17",		 	 	 
  "Statement": [
    {
      "Sid": "AMPRemoteWrite",
      "Effect": "Allow",
      "Action": [
        "aps:RemoteWrite"
      ],
      "Resource": "arn:aws:aps:region:account-id:workspace/workspace-id"
    }
  ]
}
```

Replace the following placeholders:
+ `region`: Your AWS Region (for example, `us-east-1`)
+ `account-id`: Your AWS account ID
+ `workspace-id`: Your Amazon Managed Service for Prometheus workspace ID

You must attach these permissions to your pipeline role.

Ensure your pipeline role has a trust relationship that allows OpenSearch Ingestion to assume it:

```
{
  "Version": "2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "osis-pipelines.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
```

## Step 2: Create the pipeline
<a name="prometheus-pipeline"></a>

After you've set up your permissions, you can configure an OpenSearch Ingestion pipeline to use Amazon Managed Service for Prometheus as a sink.

### Basic configuration
<a name="prometheus-basic-config"></a>

The following example shows a minimal Prometheus sink configuration:

```
version: "2"
sink:
  - prometheus:
      url: "https://aps-workspaces.region.amazonaws.com/workspaces/workspace-id/api/v1/remote_write"
      aws:
        region: "region"
```

You must specify the `url` option within the `prometheus` sink configuration, which is the Amazon Managed Service for Prometheus remote write endpoint. To format the URL, locate your workspace ID in the Amazon Managed Service for Prometheus console and construct the URL as follows: `https://aps-workspaces.region.amazonaws.com/workspaces/workspace-id/api/v1/remote_write`.

### Configuration options
<a name="prometheus-config-options"></a>

Use the following options to configure batching and flushing behavior for the Prometheus sink:


**Prometheus sink configuration options**  

| Option | Required | Type | Description | 
| --- | --- | --- | --- | 
| max\$1events | No | Integer | The maximum number of events to accumulate before flushing to Prometheus. Default is 1000. | 
| max\$1request\$1size | No | Byte Count | The maximum size of the request payload before flushing. Default is 1mb. | 
| flush\$1interval | No | Duration | The maximum amount of time to wait before flushing events. Default is 10s. Maximum allowed value is 60s. | 

### Example pipelines
<a name="prometheus-example-pipelines"></a>

**Example 1: OpenTelemetry metrics to Amazon Managed Service for Prometheus**

This pipeline receives OpenTelemetry metrics from an external agent and writes them to Amazon Managed Service for Prometheus:

```
version: "2"
source:
  otel_metrics_source:
    path: "/v1/metrics"
    output_format: otel

sink:
  - prometheus:
      url: "https://aps-workspaces.us-east-1.amazonaws.com/workspaces/ws-a1b2c3d4-5678-90ab-cdef-EXAMPLE11111/api/v1/remote_write"
      aws:
        region: "us-east-1"
```

**Example 2: Dual sink - OpenSearch and Amazon Managed Service for Prometheus**

This pipeline routes metrics to both OpenSearch and Amazon Managed Service for Prometheus:

```
version: "2"
source:
  otel_metrics_source:
    path: "/v1/metrics"
    output_format: otel

sink:
  - opensearch:
      hosts:
        - "https://search-domain-endpoint.us-east-1.es.amazonaws.com"
      index: "metrics-%{yyyy.MM.dd}"
      aws:
        region: "us-east-1"
        sts_role_arn: "arn:aws:iam::123456789012:role/OSI-Pipeline-Role"

  - prometheus:
      url: "https://aps-workspaces.us-east-1.amazonaws.com/workspaces/ws-a1b2c3d4-5678-90ab-cdef-EXAMPLE11111/api/v1/remote_write"
      aws:
        region: "us-east-1"
```

**Example 3: Metrics with filtering**

This pipeline filters metrics before sending to Amazon Managed Service for Prometheus:

```
version: "2"
source:
  otel_metrics_source:
    path: "/v1/metrics"
    output_format: otel

processor:
  - drop_events:
      drop_when: '/name != "http.server.duration" and /name != "http.client.duration"'

sink:
  - prometheus:
      url: "https://aps-workspaces.us-east-1.amazonaws.com/workspaces/ws-a1b2c3d4-5678-90ab-cdef-EXAMPLE11111/api/v1/remote_write"
      aws:
        region: "us-east-1"
```

You can use a preconfigured Amazon Managed Service for Prometheus blueprint to create these pipelines. For more information, see [Working with blueprints](pipeline-blueprint.md).

### Creating a pipeline with Amazon Managed Service for Prometheus sink
<a name="prometheus-create-pipeline"></a>

#### Using the AWS Console
<a name="prometheus-console"></a>

1. Navigate to the OpenSearch Service console.

1. Choose **Pipelines** under **Ingestion**.

1. Choose **Create pipeline**.

1. Select **Build using blueprint** and choose the **OpenTelemetry metrics to Amazon Prometheus** blueprint.

1. Configure the pipeline:
   + Enter your Amazon Managed Service for Prometheus workspace ID
   + Specify the pipeline role ARN
   + Configure source and processor settings as needed

1. Review and create the pipeline.

#### Using the AWS CLI
<a name="prometheus-cli"></a>

Create a pipeline configuration file (for example, `amp-pipeline.yaml`) with your desired configuration, then run:

```
aws osis create-pipeline \
  --pipeline-name my-amp-pipeline \
  --min-units 2 \
  --max-units 4 \
  --pipeline-configuration-body file://amp-pipeline.yaml
```

#### Using AWS CloudFormation
<a name="prometheus-cfn"></a>

```
Resources:
  MyAMPPipeline:
    Type: AWS::OSIS::Pipeline
    Properties:
      PipelineName: my-amp-pipeline
      MinUnits: 2
      MaxUnits: 4
      PipelineConfigurationBody: |
        version: "2"
        source:
          otel_metrics_source:
            path: "/v1/metrics"
            output_format: otel
        sink:
          - prometheus:
              url: "https://aps-workspaces.us-east-1.amazonaws.com/workspaces/ws-a1b2c3d4-5678-90ab-cdef-EXAMPLE11111/api/v1/remote_write"
              aws:
                region: "us-east-1"
```

## Monitoring and troubleshooting
<a name="prometheus-monitoring"></a>

### CloudWatch metrics
<a name="prometheus-cloudwatch-metrics"></a>

Monitor your pipeline's performance using CloudWatch metrics:
+ `DocumentsWritten`: Number of metrics successfully written to Amazon Managed Service for Prometheus
+ `DocumentsWriteFailed`: Number of metrics that failed to write
+ `RequestLatency`: Latency of remote write requests

### Common issues
<a name="prometheus-troubleshooting"></a>

**Issue**: Pipeline fails to write to Amazon Managed Service for Prometheus

**Solutions**:
+ Verify the workspace ID and region in the URL are correct
+ Ensure the pipeline role has `aps:RemoteWrite` permission
+ Check that the workspace uses service-managed AWS KMS keys
+ Verify the pipeline and workspace are in the same AWS account

**Issue**: Authentication errors

**Solutions**:
+ Verify the trust relationship allows `osis-pipelines.amazonaws.com` to assume the pipeline role
+ Ensure the pipeline role has the required `aps:RemoteWrite` permission

**Issue**: High latency or throttling

**Solutions**:
+ Increase pipeline capacity units
+ Implement batching in the processor
+ Review Amazon Managed Service for Prometheus service quotas

## Limitations
<a name="prometheus-limitations"></a>

Consider the following limitations when you set up an OpenSearch Ingestion pipeline for Amazon Managed Service for Prometheus:
+ Amazon Managed Service for Prometheus workspaces must use AWS service-managed AWS KMS keys. Customer-managed AWS KMS keys are not currently supported.
+ The pipeline and Amazon Managed Service for Prometheus workspace must be in the same AWS account.

## Best practices
<a name="prometheus-best-practices"></a>
+ **Use the same IAM role**: The Prometheus sink automatically uses the pipeline role. If other sinks are used, ensure the `sts_role_arn` is the same as the pipeline role
+ **Monitor metrics**: Set up CloudWatch alarms for failed writes and high latency
+ **Implement filtering**: Use processors to filter unnecessary metrics before sending to Amazon Managed Service for Prometheus
+ **Right-size capacity**: Start with minimum capacity and scale based on metrics volume
+ **Use blueprints**: Leverage pre-configured blueprints for common use cases

# Using an OpenSearch Ingestion pipeline with Kafka
<a name="configure-client-self-managed-kafka"></a>

You can use the [Kafka](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/) plugin to stream data from self-managed Kafka clusters to Amazon OpenSearch Service domains and OpenSearch Serverless collections. OpenSearch Ingestion supports connections from Kafka clusters configured with either public or private (VPC) networking. This topic outlines the prerequisites and steps to set up an ingestion pipeline, including configuring network settings and authentication methods such as mutual TLS (mTLS), SASL/SCRAM, or IAM.

## Migrating data from public Kafka clusters
<a name="self-managaged-kafka-public"></a>

You can use OpenSearch Ingestion pipelines to migrate data from a public self-managed Kafka cluster, which means that the domain DNS name can be publicly resolved. To do so, set up an OpenSearch Ingestion pipeline with self-managed Kafka as the source and OpenSearch Service or OpenSearch Serverless as the destination. This processes your streaming data from a self-managed source cluster to an AWS-managed destination domain or collection. 

### Prerequisites
<a name="self-managaged-kafka-public-prereqs"></a>

Before you create your OpenSearch Ingestion pipeline, perform the following steps:

1. Create a self-managed Kafka cluster with a public network configuration. The cluster should contain the data you want to ingest into OpenSearch Service.

1. Create an OpenSearch Service domain or OpenSearch Serverless collection where you want to migrate data to. For more information, see [Creating OpenSearch Service domains](createupdatedomains.md#createdomains) and [Creating collections](serverless-create.md).

1. Set up authentication on your self-managed cluster with AWS Secrets Manager. Enable secrets rotation by following the steps in [Rotate AWS Secrets Manager secrets](https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets.html).

1. Attach a [resource-based policy](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource) to your domain or a [data access policy](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-data-access.html) to your collection. These access policies allow OpenSearch Ingestion to write data from your self-managed cluster to your domain or collection. 

   The following sample domain access policy allows the pipeline role, which you create in the next step, to write data to a domain. Make sure that you update the `resource` with your own ARN. 

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "AWS": "arn:aws:iam::444455556666:role/pipeline-role"
         },
         "Action": [
           "es:DescribeDomain",
           "es:ESHttp*"
         ],
         "Resource": [
           "arn:aws:es:us-east-1:111122223333:domain/domain-name"
         ]
       }
     ]
   }
   ```

------

   To create an IAM role with the correct permissions to access write data to the collection or domain, see [Setting up roles and users in Amazon OpenSearch Ingestion](pipeline-security-overview.md).

### Step 1: Configure the pipeline role
<a name="self-managed-kafka-public-pipeline-role"></a>

After you have your Kafka pipeline prerequisites set up, [configure the pipeline role](pipeline-security-overview.md#pipeline-security-sink) that you want to use in your pipeline configuration, and add permission to write to an OpenSearch Service domain or OpenSearch Serverless collection, as well as permission to read secrets from Secrets Manager.

### Step 2: Create the pipeline
<a name="self-managed-kafka-public-pipeline"></a>

You can then configure an OpenSearch Ingestion pipeline like the following, which specifies Kafka as the source. 

You can specify multiple OpenSearch Service domains as destinations for your data. This capability enables conditional routing or replication of incoming data into multiple OpenSearch Service domains.

You can also migrate data from a source Confluent Kafka cluster to an OpenSearch Serverless VPC collection. Ensure you provide a network access policy within the pipeline configuration. You can use a Confluent schema registry to define a Confluent schema.

```
version: "2"
kafka-pipeline:
  source:
    kafka:
      encryption:
        type: "ssl"
      topics:
        - name: "topic-name"
          group_id: "group-id"
      bootstrap_servers:
        - "bootstrap-server.us-east-1.aws.private.confluent.cloud:9092"
      authentication:
        sasl:
          plain:
            username: ${aws_secrets:confluent-kafka-secret:username}
            password: ${aws_secrets:confluent-kafka-secret:password}
      schema:
        type: confluent
        registry_url: https://my-registry.us-east-1.aws.confluent.cloud
        api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}"
        api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}"
        basic_auth_credentials_source: "USER_INFO"
  sink:
  - opensearch:
      hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"]
      aws:
          region: "us-east-1"
      index: "confluent-index"
extension:
  aws:
    secrets:
      confluent-kafka-secret:
        secret_id: "my-kafka-secret"
        region: "us-east-1"
      schema-secret:
        secret_id: "my-self-managed-kafka-schema"
        region: "us-east-1"
```

You can use a preconfigured blueprint to create this pipeline. For more information, see [Working with blueprints](pipeline-blueprint.md).

### Migrating data from Kafka clusters in a VPC
<a name="self-managaged-kafka-private"></a>

You can also use OpenSearch Ingestion pipelines to migrate data from a self-managed Kafka cluster running in a VPC. To do so, set up an OpenSearch Ingestion pipeline with self-managed Kafka as the source and OpenSearch Service or OpenSearch Serverless as the destination. This processes your streaming data from a self-managed source cluster to an AWS-managed destination domain or collection.

#### Prerequisites
<a name="self-managaged-kafka-private-prereqs"></a>

Before you create your OpenSearch Ingestion pipeline, perform the following steps:

1. Create a self-managed Kafka cluster with a VPC network configuration that contains the data you want to ingest into OpenSearch Service. 

1. Create an OpenSearch Service domain or OpenSearch Serverless collection where you want to migrate data to. For more information, see [Creating OpenSearch Service domains](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/createupdatedomains.html#createdomains) and [Creating collections](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-manage.html#serverless-create).

1. Set up authentication on your self-managed cluster with AWS Secrets Manager. Enable secrets rotation by following the steps in [Rotate AWS Secrets Manager secrets](https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets.html).

1. Obtain the ID of the VPC that that has access to self-managed Kafka. Choose the VPC CIDR to be used by OpenSearch Ingestion.
**Note**  
If you're using the AWS Management Console to create your pipeline, you must also attach your OpenSearch Ingestion pipeline to your VPC in order to use self-managed Kafka. To do so, find the **Network configuration** section, select the **Attach to VPC** checkbox, and choose your CIDR from one of the provided default options, or select your own. You can use any CIDR from a private address space as defined in the [RFC 1918 Best Current Practice](https://datatracker.ietf.org/doc/html/rfc1918).  
To provide a custom CIDR, select **Other** from the dropdown menu. To avoid a collision in IP addresses between OpenSearch Ingestion and self-managed OpenSearch, ensure that the self-managed OpenSearch VPC CIDR is different from the CIDR for OpenSearch Ingestion.

1. Attach a [resource-based policy](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource) to your domain or a [data access policy](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-data-access.html) to your collection. These access policies allow OpenSearch Ingestion to write data from your self-managed cluster to your domain or collection. 

   The following sample domain access policy allows the pipeline role, which you create in the next step, to write data to a domain. Make sure that you update the `resource` with your own ARN. 

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "AWS": "arn:aws:iam::444455556666:role/pipeline-role"
         },
         "Action": [
           "es:DescribeDomain",
           "es:ESHttp*"
         ],
         "Resource": [
           "arn:aws:es:us-east-1:111122223333:domain/domain-name"
         ]
       }
     ]
   }
   ```

------

   To create an IAM role with the correct permissions to access write data to the collection or domain, see [Setting up roles and users in Amazon OpenSearch Ingestion](pipeline-security-overview.md).

#### Step 1: Configure the pipeline role
<a name="self-managed-kafka-private-pipeline-role"></a>

After you have your pipeline prerequisites set up, [configure the pipeline role](pipeline-security-overview.md#pipeline-security-sink) that you want to use in your pipeline configuration, and add the following permissions in the role:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "SecretsManagerReadAccess",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetSecretValue"
            ],
            "Resource": ["arn:aws:secretsmanager:us-east-1:111122223333:secret:secret-name"]
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:AttachNetworkInterface",
                "ec2:CreateNetworkInterface",
                "ec2:CreateNetworkInterfacePermission",
                "ec2:DeleteNetworkInterface",
                "ec2:DeleteNetworkInterfacePermission",
                "ec2:DetachNetworkInterface",
                "ec2:DescribeNetworkInterfaces"
            ],
            "Resource": [
                "arn:aws:ec2:*:*:network-interface/*",
                "arn:aws:ec2:*:*:subnet/*",
                "arn:aws:ec2:*:*:security-group/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:DescribeDhcpOptions",
                "ec2:DescribeRouteTables",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeSubnets",
                "ec2:DescribeVpcs",
                "ec2:Describe*"
            ],
            "Resource": "*"
        },
        { 
            "Effect": "Allow",
            "Action": [ 
                "ec2:CreateTags"
            ],
            "Resource": "arn:aws:ec2:*:*:network-interface/*",
            "Condition": { 
               "StringEquals": 
                    {
                        "aws:RequestTag/OSISManaged": "true"
                    } 
            } 
        }
    ]
}
```

------

You must provide the above Amazon EC2 permissions on the IAM role that you use to create the OpenSearch Ingestion pipeline because the pipeline uses these permissions to create and delete a network interface in your VPC. The pipeline can only access the Kafka cluster through this network interface.

#### Step 2: Create the pipeline
<a name="self-managed-kafka-private-pipeline"></a>

You can then configure an OpenSearch Ingestion pipeline like the following, which specifies Kafka as the source.

You can specify multiple OpenSearch Service domains as destinations for your data. This capability enables conditional routing or replication of incoming data into multiple OpenSearch Service domains.

You can also migrate data from a source Confluent Kafka cluster to an OpenSearch Serverless VPC collection. Ensure you provide a network access policy within the pipeline configuration. You can use a Confluent schema registry to define a Confluent schema.

```
 version: "2"
kafka-pipeline:
  source:
    kafka:
      encryption:
        type: "ssl"
      topics:
        - name: "topic-name"
          group_id: "group-id"
      bootstrap_servers:
        - "bootstrap-server.us-east-1.aws.private.confluent.cloud:9092"
      authentication:
        sasl:
          plain:
            username: ${aws_secrets:confluent-kafka-secret:username}
            password: ${aws_secrets:confluent-kafka-secret:password}
      schema:
        type: confluent
        registry_url: https://my-registry.us-east-1.aws.confluent.cloud
        api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}"
        api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}"
        basic_auth_credentials_source: "USER_INFO"
  sink:
  - opensearch:
      hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"]
      aws:
          region: "us-east-1"
      index: "confluent-index"
extension:
  aws:
    secrets:
      confluent-kafka-secret:
        secret_id: "my-kafka-secret"
        region: "us-east-1"
      schema-secret:
        secret_id: "my-self-managed-kafka-schema"
        region: "us-east-1"
```

You can use a preconfigured blueprint to create this pipeline. For more information, see [Working with blueprints](pipeline-blueprint.md).

# Migrating data from self-managed OpenSearch clusters using Amazon OpenSearch Ingestion
<a name="configure-client-self-managed-opensearch"></a>

You can use an Amazon OpenSearch Ingestion pipeline with self-managed OpenSearch or Elasticsearch to migrate data to Amazon OpenSearch Service domains and OpenSearch Serverless collections. OpenSearch Ingestion supports both public and private network configurations for the migration of data from self-managed OpenSearch and Elasticsearch. 

## Migrating from public OpenSearch clusters
<a name="self-managaged-opensearch-public"></a>

You can use OpenSearch Ingestion pipelines to migrate data from a self-managed OpenSearch or Elasticsearch cluster with a public configuration, which means that the domain DNS name can be publicly resolved. To do so, set up an OpenSearch Ingestion pipeline with self-managed OpenSearch or Elasticsearch as the source and OpenSearch Service or OpenSearch Serverless as the destination. This effectively migrates your data from a self-managed source cluster to an AWS-managed destination domain or collection.

### Prerequisites
<a name="self-managaged-opensearch-public-prereqs"></a>

Before you create your OpenSearch Ingestion pipeline, perform the following steps:

1. Create a self-managed OpenSearch or Elastisearch cluster that contains the data you want to migrate and configure a public DNS name. For instructions, see [Create a cluster](https://opensearch.org/docs/latest/tuning-your-cluster/) in the OpenSearch documentation.

1. Create an OpenSearch Service domain or OpenSearch Serverless collection where you want to migrate data to. For more information, see [Creating OpenSearch Service domains](createupdatedomains.md#createdomains) and [Creating collections](serverless-create.md).

1. Set up authentication on your self-managed cluster with AWS Secrets Manager. Enable secrets rotation by following the steps in [Rotate AWS Secrets Manager secrets](https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets.html).

1. Attach a [resource-based policy](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource) to your domain or a [data access policy](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-data-access.html) to your collection. These access policies allow OpenSearch Ingestion to write data from your self-managed cluster to your domain or collection. 

   The following sample domain access policy allows the pipeline role, which you create in the next step, to write data to a domain. Make sure that you update the `resource` with your own ARN. 

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "AWS": "arn:aws:iam::444455556666:role/pipeline-role"
         },
         "Action": [
           "es:DescribeDomain",
           "es:ESHttp*"
         ],
         "Resource": [
           "arn:aws:es:us-east-1:111122223333:domain/domain-name"
         ]
       }
     ]
   }
   ```

------

   To create an IAM role with the correct permissions to access write data to the collection or domain, see [Setting up roles and users in Amazon OpenSearch Ingestion](pipeline-security-overview.md).

### Step 1: Configure the pipeline role
<a name="self-managed-opensearch-public-pipeline-role"></a>

After you have your OpenSearch pipeline prerequisites set up, [configure the pipeline role](pipeline-security-overview.md#pipeline-security-sink) that you want to use in your pipeline configuration, and add permission to write to an OpenSearch Service domain or OpenSearch Serverless collection, as well as permission to read secrets from Secrets Manager.

### Step 2: Create the pipeline
<a name="self-managed-opensearch-public-pipeline"></a>

You can then configure an OpenSearch Ingestion pipeline like the following, which specifies OpenSearch as the source. 

You can specify multiple OpenSearch Service domains as destinations for your data. This capability enables conditional routing or replication of incoming data into multiple OpenSearch Service domains.

You can also migrate data from a source OpenSearch or Elasticsearch cluster to an OpenSearch Serverless VPC collection. Ensure you provide a network access policy within the pipeline configuration.

```
version: "2"
opensearch-migration-pipeline:
  source:
    opensearch:
      acknowledgments: true
      host: [ "https://my-self-managed-cluster-name:9200" ]
      indices:
        include:
          - index_name_regex: "include-.*"
        exclude:
          - index_name_regex: '\..*'
      authentication:
        username: ${aws_secrets:secret:username}
        password: ${aws_secrets:secret:password}
        scheduling:
           interval: "PT2H"
           index_read_count: 3
           start_time: "2023-06-02T22:01:30.00Z"
  sink:
  - opensearch:
      hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"]
      aws:
          region: "us-east-1"
          #Uncomment the following lines if your destination is an OpenSearch Serverless collection
          #serverless: true
          # serverless_options:
          #     network_policy_name: "network-policy-name"
      index: "${getMetadata(\"opensearch-index\")}"
      document_id: "${getMetadata(\"opensearch-document_id\")}"
      enable_request_compression: true
      dlq:
        s3:
          bucket: "bucket-name"
          key_path_prefix: "apache-log-pipeline/logs/dlq"
          region: "us-east-1"
extension:
  aws:
    secrets:
      secret:
        secret_id: "my-opensearch-secret"
        region: "us-east-1"
        refresh_interval: PT1H
```

You can use a preconfigured blueprint to create this pipeline. For more information, see [Working with blueprints](pipeline-blueprint.md).

## Migrating data from OpenSearch clusters in a VPC
<a name="self-managaged-opensearch-private"></a>

You can also use OpenSearch Ingestion pipelines to migrate data from a self-managed OpenSearch or Elasticsearch cluster running in a VPC. To do so, set up an OpenSearch Ingestion pipeline with self-managed OpenSearch or Elasticsearch as the source and OpenSearch Service or OpenSearch Serverless as the destination. This effectively migrates your data from a self-managed source cluster to an AWS-managed destination domain or collection.

### Prerequisites
<a name="self-managaged-opensearch-private-prereqs"></a>

Before you create your OpenSearch Ingestion pipeline, perform the following steps:

1. Create a self-managed OpenSearch or Elastisearch cluster with a VPC network configuration that contains the data you want to migrate. 

1. Create an OpenSearch Service domain or OpenSearch Serverless collection where you want to migrate data to. For more information, see [Creating OpenSearch Service domains](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/createupdatedomains.html#createdomains) and [Creating collections](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-manage.html#serverless-create).

1. Set up authentication on your self-managed cluster with AWS Secrets Manager. Enable secrets rotation by following the steps in [Rotate AWS Secrets Manager secrets](https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets.html).

1. Obtain the ID of the VPC that that has access to self-managed OpenSearch or Elasticsearch. Choose the VPC CIDR to be used by OpenSearch Ingestion.
**Note**  
If you're using the AWS Management Console to create your pipeline, you must also attach your OpenSearch Ingestion pipeline to your VPC in order to use self-managed OpenSearch or Elasticsearch. To do so, find the **Source network options** section, select the **Attach to VPC** checkbox, and choose your CIDR from one of the provided default options. You can use any CIDR from a private address space as defined in the [RFC 1918 Best Current Practice](https://datatracker.ietf.org/doc/html/rfc1918).  
To provide a custom CIDR, select **Other** from the dropdown menu. To avoid a collision in IP addresses between OpenSearch Ingestion and self-managed OpenSearch, ensure that the self-managed OpenSearch VPC CIDR is different from the CIDR for OpenSearch Ingestion. 

1. Attach a [resource-based policy](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource) to your domain or a [data access policy](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-data-access.html) to your collection. These access policies allow OpenSearch Ingestion to write data from your self-managed cluster to your domain or collection. 

   The following sample domain access policy allows the pipeline role, which you create in the next step, to write data to a domain. Make sure that you update the `resource` with your own ARN. 

------
#### [ JSON ]

****  

   ```
   {
     "Version":"2012-10-17",		 	 	 
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "AWS": "arn:aws:iam::444455556666:role/pipeline-role"
         },
         "Action": [
           "es:DescribeDomain",
           "es:ESHttp*"
         ],
         "Resource": [
           "arn:aws:es:us-east-1:111122223333:domain/example.com"
         ]
       }
     ]
   }
   ```

------

   To create an IAM role with the correct permissions to access write data to the collection or domain, see [Setting up roles and users in Amazon OpenSearch Ingestion](pipeline-security-overview.md).

### Step 1: Configure the pipeline role
<a name="self-managed-opensearch-private-pipeline-role"></a>

After you have your pipeline prerequisites set up, [configure the pipeline role](pipeline-security-overview.md#pipeline-security-sink) that you want to use in your pipeline configuration, and add the following permissions in the role:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "SecretsManagerReadAccess",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetSecretValue"
            ],
            "Resource": ["arn:aws:secretsmanager:us-east-1:111122223333:secret:secret-name"]
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:AttachNetworkInterface",
                "ec2:CreateNetworkInterface",
                "ec2:CreateNetworkInterfacePermission",
                "ec2:DeleteNetworkInterface",
                "ec2:DeleteNetworkInterfacePermission",
                "ec2:DetachNetworkInterface",
                "ec2:DescribeNetworkInterfaces"
            ],
            "Resource": [
                "arn:aws:ec2:*:*:network-interface/*",
                "arn:aws:ec2:*:*:subnet/*",
                "arn:aws:ec2:*:*:security-group/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:DescribeDhcpOptions",
                "ec2:DescribeRouteTables",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeSubnets",
                "ec2:DescribeVpcs",
                "ec2:Describe*"
            ],
            "Resource": "*"
        },
        { 
            "Effect": "Allow",
            "Action": [ 
                "ec2:CreateTags"
            ],
            "Resource": "arn:aws:ec2:*:*:network-interface/*",
            "Condition": { 
               "StringEquals": 
                    {
                        "aws:RequestTag/OSISManaged": "true"
                    } 
            } 
        }
    ]
}
```

------

You must provide the above Amazon EC2 permissions on the IAM role that you use to create the OpenSearch Ingestion pipeline because the pipeline uses these permissions to create and delete a network interface in your VPC. The pipeline can only access the OpenSearch cluster through this network interface.

### Step 2: Create the pipeline
<a name="self-managed-opensearch-private-pipeline"></a>

You can then configure an OpenSearch Ingestion pipeline like the following, which specifies OpenSearch as the source. 

You can specify multiple OpenSearch Service domains as destinations for your data. This capability enables conditional routing or replication of incoming data into multiple OpenSearch Service domains.

You can also migrate data from a source OpenSearch or Elasticsearch cluster to an OpenSearch Serverless VPC collection. Ensure you provide a network access policy within the pipeline configuration.

```
version: "2"
opensearch-migration-pipeline:
  source:
    opensearch:
      acknowledgments: true
      host: [ "https://my-self-managed-cluster-name:9200" ]
      indices:
        include:
          - index_name_regex: "include-.*"
        exclude:
          - index_name_regex: '\..*'
      authentication:
        username: ${aws_secrets:secret:username}
        password: ${aws_secrets:secret:password}
        scheduling:
           interval: "PT2H"
           index_read_count: 3
           start_time: "2023-06-02T22:01:30.00Z"
  sink:
  - opensearch:
      hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com"]
      aws:
          region: "us-east-1"
          #Uncomment the following lines if your destination is an OpenSearch Serverless collection
          #serverless: true
          # serverless_options:
          #     network_policy_name: "network-policy-name"
      index: "${getMetadata(\"opensearch-index\")}"
      document_id: "${getMetadata(\"opensearch-document_id\")}"
      enable_request_compression: true
      dlq:
        s3:
          bucket: "bucket-name"
          key_path_prefix: "apache-log-pipeline/logs/dlq"
          region: "us-east-1"
extension:
  aws:
    secrets:
      secret:
        secret_id: "my-opensearch-secret"
        region: "us-east-1"
        refresh_interval: PT1H
```

You can use a preconfigured blueprint to create this pipeline. For more information, see [Working with blueprints](pipeline-blueprint.md).

# Use an OpenSearch Ingestion pipeline with Amazon Kinesis Data Streams
<a name="configure-client-kinesis"></a>

Use an OpenSearch Ingestion pipeline with Amazon Kinesis Data Streams to ingest stream records data from multiple streams into Amazon OpenSearch Service domains and collections. The OpenSearch Ingestion pipeline incorporates the streaming ingestion infrastructure to provide a high-scale, low latency way to continuously ingest stream records from Kinesis.

**Topics**
+ [Amazon Kinesis Data Streams as a source](#confluent-cloud-kinesis)
+ [Amazon Kinesis Data Streams cross account as a source](#kinesis-cross-account-source)

## Amazon Kinesis Data Streams as a source
<a name="confluent-cloud-kinesis"></a>

With the following procedure, you'll learn how to set up an OpenSearch Ingestion pipeline that uses Amazon Kinesis Data Streams as the data source. This section covers the necessary prerequisites, such as creating an OpenSearch Service domain or an OpenSearch Serverless Collection, and walking through the steps to configure the pipeline role and create the pipeline.

### Prerequisites
<a name="s3-prereqs"></a>

To set up your pipeline, you need one or more active Kinesis Data Streams. These streams must be either receiving records or ready to receive records from other sources. For more information, see [Overview of OpenSearch Ingestion](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/osis-getting-started-tutorials.html).

**To set up your pipeline**

1. 

**Create an OpenSearch Service domain or an OpenSearch Serverless collection**

   To create a domain or a collection, see [Getting started with OpenSearch Ingestion](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/osis-getting-started-tutorials.html).

   To create an IAM role with the correct permissions to access write data to the collection or domain, see [Resource-based policies](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource).

1. 

**Configure the pipeline role with permissions**

   [Set up the pipeline role](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-security-overview.html#pipeline-security-sink) that you want to use in your pipeline configuration and add the following permissions to it. Replace the *placeholder values* with your own information.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "allowReadFromStream",
               "Effect": "Allow",
               "Action": [
                   "kinesis:DescribeStream",
                   "kinesis:DescribeStreamConsumer",
                   "kinesis:DescribeStreamSummary",
                   "kinesis:GetRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:ListShards",
                   "kinesis:ListStreams",
                   "kinesis:ListStreamConsumers",
                   "kinesis:RegisterStreamConsumer",
                   "kinesis:SubscribeToShard"
               ],
               "Resource": [
                   "arn:aws:kinesis:us-east-1:111122223333:stream/stream-name"
               ]
           }
       ]
   }
   ```

------

   If server-side encryption is enabled on the streams, the following AWS KMS policy allows to decrypt the records. Replace the *placeholder values* with your own information.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "allowDecryptionOfCustomManagedKey",
               "Effect": "Allow",
               "Action": [
                   "kms:Decrypt",
                   "kms:GenerateDataKey"
               ],
               "Resource": "arn:aws:kms:us-east-1:111122223333:key/key-id"
           }
       ]
   }
   ```

------

   In order for a pipeline to write data to a domain, the domain must have a [domain-level access policy](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource) that allows the **sts\$1role\$1arn** pipeline role to access it.

   The following example is a domain access policy that allows the pipeline role created in the previous step (`pipeline-role`), to write data to the `ingestion-domain` domain. Replace the *placeholder values* with your own information.

   ```
   {
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "AWS": "arn:aws:iam::your-account-id:role/pipeline-role"
         },
         "Action": ["es:DescribeDomain", "es:ESHttp*"],
         "Resource": "arn:aws:es:AWS Region:account-id:domain/domain-name/*"
       }
     ]
   }
   ```

1. 

**Create the pipeline**

   Configure an OpenSearch Ingestion pipeline specifying **Kinesis-data-streams** as the source. You can locate a ready made blueprint available on the OpenSearch Ingestion Console for creating such a pipeline. (Optional) To create the pipeline using the AWS CLI, you can use a blueprint named "**`AWS-KinesisDataStreamsPipeline`**". Replace the *placeholder values* with your own information.

   ```
   version: "2"
   kinesis-pipeline:
     source:
       kinesis_data_streams:
         acknowledgments: true
         codec:
           # Based on whether kinesis records are aggregated or not, you could choose json, newline or ndjson codec for processing the records.
           # JSON codec supports parsing nested CloudWatch Events into individual log entries that will be written as documents into OpenSearch.
           # json:
             # key_name: "logEvents"
             # These keys contain the metadata sent by CloudWatch Subscription Filters
             # in addition to the individual log events:
             # include_keys: [ 'owner', 'logGroup', 'logStream' ]
           newline:
         streams:
           - stream_name: "stream name"
             # Enable this if ingestion should start from the start of the stream.
             # initial_position: "EARLIEST"
             # checkpoint_interval: "PT5M"
             # Compression will always be gzip for CloudWatch, but will vary for other sources:
             # compression: "gzip"
           - stream_name: "stream name"
             # Enable this if ingestion should start from the start of the stream.
             # initial_position: "EARLIEST"
             # checkpoint_interval: "PT5M"
             # Compression will always be gzip for CloudWatch, but will vary for other sources:
             # compression: "gzip"
   
           # buffer_timeout: "1s"
           # records_to_accumulate: 100
           # Change the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS.
           # consumer_strategy: "polling"
           # if consumer strategy is set to "polling", enable the polling config below.
           # polling:
             # max_polling_records: 100
             # idle_time_between_reads: "250ms"
         aws:
           # Provide the Role ARN with access to Amazon Kinesis Data Streams. This role should have a trust relationship with osis-pipelines.amazonaws.com
           sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
           # Provide the AWS Region of the Data Stream.
           region: "us-east-1"
   
     sink:
       - opensearch:
           # Provide an Amazon OpenSearch Serverless domain endpoint
           hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ]
           index: "index_${getMetadata(\"stream_name\")}"
           # Ensure adding unique document id as a combination of the metadata attributes available.
           document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}"
           aws:
             # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
             sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
             # Provide the AWS Region of the domain.
             region: "us-east-1"
             # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection
             serverless: false
             # serverless_options:
               # Specify a name here to create or update network policy for the serverless collection
               # network_policy_name: "network-policy-name"
           # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x
           # distribution_version: "es6"
           # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html
           # enable_request_compression: true/false
           # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ.
           dlq:
             s3:
               # Provide an S3 bucket
               bucket: "your-dlq-bucket-name"
               # Provide a key path prefix for the failed requests
               # key_path_prefix: "kinesis-pipeline/logs/dlq"
               # Provide the region of the bucket.
               region: "us-east-1"
               # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
               sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
   ```

**Configuration options**  
For Kinesis configuration options, see [Configuration options](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kinesis/#configuration-options) in the *OpenSearch* documentation.

**Available metadata attributes**
   + **stream\$1name** – Name of the Kinesis Data Streams from where the record has been ingested
   + **partition\$1key** – Partition key of the Kinesis Data Streams record which is being ingested
   + **sequence\$1number** – Sequence number of the Kinesis Data Streams record which is being ingested
   + **sub\$1sequence\$1number** – Sub sequence number of the Kinesis Data Streams record which is being ingested

1. 

**(Optional) Configure recommended compute units (OCUs) for the Kinesis Data Streams pipeline**

   An OpenSearch Kinesis Data Streams source pipeline can also be configured to ingest stream records from more than one stream. For faster ingestion, we recommend you add an additional compute unit per new stream added.

### Data consistency
<a name="confluent-cloud-kinesis-private"></a>

OpenSearch Ingestion supports end-to-end acknowledgement to ensure data durability. When the pipeline reads stream records from Kinesis, it dynamically distributes the work of reading stream records based on the shards associated with the streams. Pipeline will automatically checkpoint streams when it receives an acknowledgement after ingesting all records in the OpenSearch domain or collection. This will avoid duplicate processing of stream records.

To create the index based on the stream name, define the index in the opensearch sink section as **"index\$1\$1\$1getMetadata(\$1"stream\$1name\$1")\$1"**.

## Amazon Kinesis Data Streams cross account as a source
<a name="kinesis-cross-account-source"></a>

You can grant access across accounts with Amazon Kinesis Data Streams so that OpenSearch Ingestion pipelines can access Kinesis Data Streams in another account as source. Complete the following steps to enable cross-account access:

**Configure cross-account access**

1. 

**Set resource policy in the account which has the Kinesis stream**

   Replace the *placeholder values* with your own information.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "StreamReadStatementID",
               "Effect": "Allow",
               "Principal": {
                   "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role"
               },
               "Action": [
                   "kinesis:DescribeStreamSummary",
                   "kinesis:GetRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:ListShards"
               ],
               "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name"
           },
           {
               "Sid": "StreamEFOReadStatementID",
               "Effect": "Allow",
               "Principal": {
                   "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role"
               },
               "Action": [
                   "kinesis:DescribeStreamSummary",
                   "kinesis:ListShards"
               ],
               "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name/consumer/consumer-name"
           }
       ]
   }
   ```

------

1. 

**(Optional) Setup Consumer and Consumer Resource Policy**

   This is an optional step and will only be required if you plan to use Enhanced Fanout Consumer strategy for reading stream records. For more information, see [Develop enhanced fan-out consumers with dedicated throughput](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html).

   1. 

**Setup consumer**

      To reuse an existing consumer, you can skip this step. For more information, see [RegisterStreamConsumer](https://docs.aws.amazon.com/dms/latest/APIReference/API_RegisterStreamConsumer.html) in the *Amazon Kinesis Data Streams API Reference*.

      In the following example CLI command, replace the *placeholder values* with your own information.  
**Example : Example CLI command**  

      ```
      aws kinesis register-stream-consumer \
      --stream-arn "arn:aws:kinesis:AWS Region:account-id:stream/stream-name" \
      --consumer-name consumer-name
      ```

   1. 

**Setup Consumer Resource Policy**

      In the following statement, replace the *placeholder values* with your own information.

------
#### [ JSON ]

****  

      ```
      {
          "Version":"2012-10-17",		 	 	 
          "Statement": [
              {
                  "Sid": "ConsumerEFOReadStatementID",
                  "Effect": "Allow",
                  "Principal": {
                      "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role"
                  },
                  "Action": [
                      "kinesis:DescribeStreamConsumer",
                      "kinesis:SubscribeToShard"
                  ],
                  "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-1/consumer/consumer-name"
              }
          ]
      }
      ```

------

1. 

**Pipeline Configuration**

   For cross account ingestion, add the following attributes under `kinesis_data_streams` for each stream:
   + `stream_arn` - the arn of the stream belonging to the account where the stream exists
   + `consumer_arn` - this is an optional attribute and must be specified if the default enhanced fanout consumer strategy is chosen. Specify the actual consumer arn for this field. Replace the *placeholder values* with your own information.

   ```
   version: "2"
        kinesis-pipeline:
          source:
            kinesis_data_streams:
              acknowledgments: true
              codec:
                newline:
              streams:
                - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name"
                  consumer_arn: "consumer arn"
                  # Enable this if ingestion should start from the start of the stream.
                  # initial_position: "EARLIEST"
                  # checkpoint_interval: "PT5M"
                - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name"
                  consumer_arn: "consumer arn"
                   # initial_position: "EARLIEST"
        
                # buffer_timeout: "1s"
                # records_to_accumulate: 100
                # Enable the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS.
                # consumer_strategy: "polling"
                # if consumer strategy is set to "polling", enable the polling config below.
                # polling:
                  # max_polling_records: 100
                  # idle_time_between_reads: "250ms"
              aws:
                # Provide the Role ARN with access to Kinesis. This role should have a trust relationship with osis-pipelines.amazonaws.com
                sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
                # Provide the AWS Region of the domain.
                region: "us-east-1"
        
          sink:
            - opensearch:
                # Provide an OpenSearch Serverless domain endpoint
                hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ]
                index: "index_${getMetadata(\"stream_name\")}"
                # Mapping for documentid based on partition key, shard sequence number and subsequence number metadata attributes
                document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}"
                aws:
                  # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
                  sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
                  # Provide the AWS Region of the domain.
                  region: "us-east-1"
                  # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection
                  serverless: false
                    # serverless_options:
                    # Specify a name here to create or update network policy for the serverless collection
                  # network_policy_name: network-policy-name
                # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x
                # distribution_version: "es6"
                # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html
                # enable_request_compression: true/false
                # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ.
                dlq:
                  s3:
                    # Provide an Amazon S3 bucket
                    bucket: "your-dlq-bucket-name"
                    # Provide a key path prefix for the failed requests
                    # key_path_prefix: "alb-access-log-pipeline/logs/dlq"
                    # Provide the AWS Region of the bucket.
                    region: "us-east-1"
                    # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
                    sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
   ```

1. 

**OSI Pipeline Role Kinesis Data Streams**

   1. 

**IAM Policy**

      Add the following policy to the pipeline role. Replace the *placeholder values* with your own information.

------
#### [ JSON ]

****  

      ```
      {
          "Version":"2012-10-17",		 	 	 
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "kinesis:DescribeStreamConsumer",
                      "kinesis:SubscribeToShard"
                  ],
                  "Resource": [
                  "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream"
                  ]
              },
              {
                  "Sid": "allowReadFromStream",
                  "Effect": "Allow",
                  "Action": [
                      "kinesis:DescribeStream",
                      "kinesis:DescribeStreamSummary",
                      "kinesis:GetRecords",
                      "kinesis:GetShardIterator",
                      "kinesis:ListShards",
                      "kinesis:ListStreams",
                      "kinesis:ListStreamConsumers",
                      "kinesis:RegisterStreamConsumer"
                  ],
                  "Resource": [
                      "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream"
                  ]
              }
          ]
      }
      ```

------

   1. 

**Trust Policy**

      In order to ingest data from the stream account, you will need to establish a trust relationship between the pipeline ingestion role and the stream account. Add the following to the pipeline role. Replace the *placeholder values* with your own information.

------
#### [ JSON ]

****  

      ```
      {
        "Version":"2012-10-17",		 	 	 
        "Statement": [{
           "Effect": "Allow",
           "Principal": {
             "AWS": "arn:aws:iam::111122223333:root"
            },
           "Action": "sts:AssumeRole"
        }]
      }
      ```

------

## Next steps
<a name="configure-client-next"></a>

After you export your data to a pipeline, you can [query it](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/searching.html) from the OpenSearch Service domain that is configured as a sink for the pipeline. The following resources can help you get started:
+ [Observability in Amazon OpenSearch Service](observability.md)
+ [Discover Traces](observability-analyze-traces.md)
+ [Observability in Amazon OpenSearch Service](observability.md)

# Using an OpenSearch Ingestion pipeline with AWS Lambda
<a name="configure-client-lambda"></a>

Use the [AWS Lambda processor](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/processors/aws-lambda/) to enrich data from any source or destination supported by OpenSearch Ingestion using custom code. With the Lambda processor, you can apply your own data transformations or enrichments and then return the processed events back to your pipeline for further processing. This processor enables customized data processing and gives you full control over how data is manipulated before it moves through the pipeline.

**Note**  
The payload size limit for a single event processed by a Lambda processor is 5 MB. Additionally, the Lambda processor only supports responses in JSON array format.

## Prerequisites
<a name="configure-clients-lambda-prereqs"></a>

Before you create a pipeline with a Lambda processor, create the following resources:
+ An AWS Lambda function that enriches and transforms your source data. For instructions, see [Create your first Lambda function](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html).
+ An OpenSearch Service domain or OpenSearch Serverless collection that will be the pipeline sink. For more information, see [Creating OpenSearch Service domains](createupdatedomains.md#createdomains) and [Creating collections](serverless-create.md).
+ A pipeline role that includes permissions to write to the domain or collection sink. For more information, see [Pipeline role](pipeline-security-overview.md#pipeline-security-sink).

  The pipeline role also needs an attached permissions policy that allows it to invoke the Lambda function specified in the pipeline configuration. For example: 

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Sid": "allowinvokeFunction",
              "Effect": "Allow",
              "Action": [
                  "lambda:invokeFunction",
                  "lambda:InvokeAsync",
                  "lambda:ListFunctions"
              ],
              "Resource": "arn:aws:lambda:us-east-1:111122223333:function:function-name"
              
          }
      ]
  }
  ```

------

## Create a pipeline
<a name="configure-clients-security-lake-pipeline-role"></a>

To use AWS Lambda as a processor, configure an OpenSearch Ingestion pipeline and specify `aws_lambda` as a processor. You can also use the **AWS Lambda custom enrichment** blueprint to create the pipeline. For more information, see [Working with blueprints](pipeline-blueprint.md).

The following example pipeline receives data from an HTTP source, enriches it using a date processor and the AWS Lambda processor, and ingests the processed data to an OpenSearch domain.

```
version: "2"
lambda-processor-pipeline:
  source:
    http:
      path: "/${pipelineName}/logs"
  processor:
      - date:
        destination: "@timestamp"
        from_time_received: true
    - aws_lambda:
        function_name: "my-lambda-function"

        tags_on_failure: ["lambda_failure"]
        batch:
            key_name: "events"
        aws:
          region: region
  sink:
    - opensearch:
        hosts: [ "https://search-mydomain.us-east-1es.amazonaws.com" ]
        index: "table-index"
        aws:
          region: "region"
          serverless: false
```

The following example AWS Lambda function transforms incoming data by adding a new key-value pair (`"transformed": "true"`) to each element in the provided array of events, and then sends back the modified version.

```
import json

def lambda_handler(event, context):
    input_array = event.get('events', [])
    output = []
    for input in input_array:
        input["transformed"] = "true";
        output.append(input)

    return output
```

## Batching
<a name="configure-clients-lambda-batching"></a>

Pipelines send batched events to the Lambda processor, and dynamically adjusts the batch size to ensure it stays below the 5 MB limit.

The following is an example of a pipeline batch:

```
batch:
    key_name: "events"

input_arrary = event.get('events', [])
```

**Note**  
When you create a pipeline, make sure the `key_name` option in the Lambda processor configuration matches the event key in the Lambda handler.

## Conditional filtering
<a name="configure-clients-lambda-conditional-filtering"></a>

Conditional filtering allows you to control when your AWS Lambda processor invokes the Lambda function based on specific conditions in event data. This is particularly useful when you want to selectively process certain types of events while ignoring others.

The following example configuration uses conditional filtering:

```
processors:
  - aws_lambda:
      function_name: "my-lambda-function"
      aws:
        region: "region"
      lambda_when: "/sourceIp == 10.10.10.10"
```