

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 在 Amaz OpenSearch on DynamoDB 上使用采集管道
<a name="configure-client-ddb"></a>

您可以使用 D [ynamoDB](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/dynamo-db/) 插件将表事件（例如创建、更新和删除）流式传输到亚马逊服务域和 OpenSearch 亚马逊 OpenSearch 无服务器集合。该管道采用变更数据捕获（CDC）技术实现大规模、低延迟的流式传输。

无论有没有完整初始快照，都可以处理 DynamoDB 数据。
+ **有了完整快照**，DynamoDB [point-in-time 使用](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/PointInTimeRecovery.html)恢复 (PITR) 来创建备份并将其上传到 Amazon S3。 OpenSearch 然后，Ingestion 会将快照索引到一个或多个 OpenSearch索引中。为了保持一致性，管道会将所有 DynamoDB 更改与同步。 OpenSearch此选项要求同时启用 PITR 和 [DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.CoreComponents.html#HowItWorks.CoreComponents.Streams)。
+ **没有快照** — OpenSearch Ingestion 仅流式传输新的 DynamoDB 事件。如果您已有快照或者需要在无历史数据的情况下进行实时流式传输，则选择此选项。此选项要求仅启用 DynamoDB Streams。

*有关更多信息，请参阅《开发者指南》[中的 DynamoDB 零 ETL 与 OpenSearch 亚马逊服务的集成](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/OpenSearchIngestionForDynamoDB.html)。Amazon DynamoDB *

**Topics**
+ [先决条件](#s3-prereqs)
+ [步骤 1：配置管道角色](#ddb-pipeline-role)
+ [步骤 2：创建管道](#ddb-pipeline)
+ [数据一致性](#ddb-pipeline-consistency)
+ [映射数据类型](#ddb-pipeline-mapping)
+ [限制](#ddb-pipeline-limitations)
+ [DynamoDB 的推荐 CloudWatch 警报](#ddb-pipeline-metrics)

## 先决条件
<a name="s3-prereqs"></a>

要设置管道，您必须有一个已启用 DynamoDB Streams 的 DynamoDB 表。您的流应使用 `NEW_IMAGE` 流视图类型。但是，`NEW_AND_OLD_IMAGES`如果这种流视图类型适合您的用例， OpenSearch Ingestion 管道也可以使用流式传输事件。

如果您使用的是快照，则还必须在表上启用 point-in-time恢复。有关更多信息，请参阅 *Amazon DynamoD* B 开发者[指南中的[创建表](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.Basics.html#WorkingWithTables.Basics.CreateTable)、[启用 point-in-time恢复](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/PointInTimeRecovery_Howitworks.html#howitworks_enabling)和启用流](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling)。

## 步骤 1：配置管道角色
<a name="ddb-pipeline-role"></a>

设置 DynamoDB 表后，[设置要在管道配置中使用的管道角色](pipeline-security-overview.md#pipeline-security-sink)，并在该角色中添加以下 DynamoDB 权限：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "allowRunExportJob",
            "Effect": "Allow",
            "Action": [
                "dynamodb:DescribeTable",
                "dynamodb:DescribeContinuousBackups",
                "dynamodb:ExportTableToPointInTime"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-east-1:111122223333:table/my-table"
            ]
        },
        {
            "Sid": "allowCheckExportjob",
            "Effect": "Allow",
            "Action": [
                "dynamodb:DescribeExport"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-east-1:111122223333:table/my-table/export/*"
            ]
        },
        {
            "Sid": "allowReadFromStream",
            "Effect": "Allow",
            "Action": [
                "dynamodb:DescribeStream",
                "dynamodb:GetRecords",
                "dynamodb:GetShardIterator"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-east-1:111122223333:table/my-table/stream/*"
            ]
        },
        {
            "Sid": "allowReadAndWriteToS3ForExport",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:AbortMultipartUpload",
                "s3:PutObject",
                "s3:PutObjectAcl"
            ],
            "Resource": [
                "arn:aws:s3:::amzn-s3-demo-bucket/export-folder/*"
            ]
        }
    ]
}
```

------

您也可以使用 AWS KMS 客户管理的密钥对导出数据文件进行加密。要解密导出的对象，请在管道的导出配置中指定 `s3_sse_kms_key_id` 作为密钥 ID，格式如下：`arn:aws:kms:region:account-id:key/my-key-id`。以下策略包括使用客户自主管理型密钥时的必需权限：

```
{
    "Sid": "allowUseOfCustomManagedKey",
    "Effect": "Allow",
    "Action": [
        "kms:GenerateDataKey",
        "kms:Decrypt"
    ],
    "Resource": arn:aws:kms:region:account-id:key/my-key-id
}
```

## 步骤 2：创建管道
<a name="ddb-pipeline"></a>

然后，您可以配置如下所示的 OpenSearch 采集管道，将 DynamoDB 指定为来源。此示例管道使用 PITR 快照从 `table-a` 中摄取数据，然后从 DynamoDB Streams 摄取事件。`LATEST` 的起始位置指示管道应从 DynamoDB Streams 读取最新数据。

```
version: "2"
cdc-pipeline:
  source:
    dynamodb:
      tables:
      - table_arn: "arn:aws:dynamodb:region:account-id:table/table-a"  
        export:
          s3_bucket: "my-bucket"
          s3_prefix: "export/"
        stream:
          start_position: "LATEST"
      aws:
        region: "us-east-1"
  sink:
  - opensearch:
      hosts: ["https://search-mydomain.region.es.amazonaws.com"]
      index: "${getMetadata(\"table-name\")}"
      index_type: custom
      normalize_index: true
      document_id: "${getMetadata(\"primary_key\")}"
      action: "${getMetadata(\"opensearch_action\")}"
      document_version: "${getMetadata(\"document_version\")}"
      document_version_type: "external"
```

您可以使用预先配置的 DynamoDB 蓝图，以创建此管道。有关更多信息，请参阅 [使用蓝图](pipeline-blueprint.md)。

## 数据一致性
<a name="ddb-pipeline-consistency"></a>

OpenSearch Ingestion 支持 end-to-end确认，以确保数据的持久性。管道读取快照或流时，它会动态创建分区以进行并行处理。当管道在摄取 OpenSearch 域或集合中的所有记录后收到确认信息时，该管道会将该分区标记为已完成。

如果要收录到 OpenSearch 无服务器*搜索*集合中，可以在管道中生成文档 ID。如果要采集到 OpenSearch 无服务器*时间序列*集合中，请注意该管道不会生成文档 ID。

In OpenSearch gestion 管道还将传入的事件操作映射到相应的批量索引操作中，以帮助采集文档。这样可以保持数据的一致性，因此 DynamoDB 中的每一次数据更改都与中的相应文档更改保持一致。 OpenSearch

## 映射数据类型
<a name="ddb-pipeline-mapping"></a>

OpenSearch 服务将每个传入文档中的数据类型动态映射到 DynamoDB 中的相应数据类型。下表显示了 S OpenSearch ervice 如何自动映射各种数据类型。


| 数据类型 | OpenSearch | DynamoDB | 
| --- | --- | --- | 
| 数字 |  OpenSearch 自动映射数值数据。如果该数字是整数，则将其 OpenSearch 映射为长值。如果数字是小数，则将其 OpenSearch 映射为浮点值。 OpenSearch 根据第一个发送的文档动态映射各种属性。如果您在 DynamoDB 中为同一属性混合了多种数据类型（例如整数和小数），则映射可能会失败。 例如，如果您的第一个文档的属性为整数，而后来的文档具有与小数相同的属性， OpenSearch 则无法采集第二个文档。在这些情况下，应提供一个显式的映射模板，如下所示： <pre>{<br /> "template": {<br />  "mappings": {<br />   "properties": {<br />    "MixedNumberAttribute": {<br />     "type": "float"<br />    }<br />   }<br />  }<br /> }<br />}</pre> 如果需要双精度，请使用字符串类型字段映射。不存在支持 38 位精度的等效数字类型 OpenSearch。  |  DynamoDB 支持[数字](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.Number)。  | 
| 数字集 | OpenSearch 自动将数字集映射到由长值或浮点值组成的数组中。与标量数字一样，这取决于摄取的第一个数字是整数还是小数。您可以像映射标量字符串一样提供数字集的映射。 |  DynamoDB 支持表示[数字集](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.SetTypes)的类型。  | 
| 字符串 |  OpenSearch 自动将字符串值映射为文本。在某些情况下（例如枚举值），您可以映射到关键字类型。 以下示例说明如何将`PartType`名为的 DynamoDB 属性映射到关键字。 OpenSearch  <pre>{<br /> "template": {<br />  "mappings": {<br />   "properties": {<br />    "PartType": {<br />     "type": "keyword"<br />    }<br />   }<br />  }<br /> }<br />}</pre>  |  DynamoDB 支持[字符串](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.String)。  | 
| 字符串集 |  OpenSearch 自动将字符串集映射到字符串数组中。您可以像映射标量字符串一样提供字符串集的映射。  | DynamoDB 支持表示[字符串集](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.SetTypes)的类型。 | 
| 二元 |  OpenSearch 自动将二进制数据映射为文本。您可以提供映射以将它们写成二进制字段 OpenSearch。 以下示例说明如何将`ImageData`名为的 DynamoDB 属性映射到 OpenSearch 二进制字段。 <pre>{<br /> "template": {<br />  "mappings": {<br />   "properties": {<br />    "ImageData": {<br />     "type": "binary"<br />    }<br />   }<br />  }<br /> }<br />}</pre>  | DynamoDB 支持[二进制类型属性](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.Binary)。 | 
| 二进制集 |  OpenSearch 自动将二进制集作为文本映射到二进制数据数组中。您可以像映射标量二进制一样提供数字集的映射。  | DynamoDB 支持表示[二进制值集](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.SetTypes)的类型。 | 
| 布尔值 |  OpenSearch 将 DynamoDB 布尔类型映射为 OpenSearch 布尔类型。  |  DynamoDB 支持[布尔类型属性](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.Boolean)。  | 
| Null |  OpenSearch 可以采集 DynamoDB 空类型的文档。它将该值作为空值保存在文档中。此类型没有映射，并且此字段未编制索引或不可搜索。 如果对空类型使用相同的属性名称，然后更改为其他类型（例如字符串），则会为第一个非空值 OpenSearch 创建动态映射。后续值仍然可以是 DynamoDB 空值。  | DynamoDB 支持[空类型属性](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.Null)。 | 
| Map |  OpenSearch 将 DynamoDB 映射属性映射到嵌套字段。嵌套字段内也适用相同的映射。 以下示例将嵌套字段中的字符串映射到中的关键字类型 OpenSearch： <pre>{<br /> "template": {<br />  "mappings": {<br />   "properties": {<br />    "AdditionalDescriptions": {<br />     "properties": {<br />      "PartType": {<br />       "type": "keyword"<br />      }<br />     }<br />    }<br />   }<br />  }<br /> }<br />}</pre>  | DynamoDB 支持[映射类型属性](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.Document.Map)。 | 
| 列表 |  OpenSearch 根据列表中的内容，为 DynamoDB 列表提供了不同的结果。 当列表包含所有相同类型的标量类型（例如，所有字符串的列表）时，则将该列表作为 OpenSearch 该类型的数组提取。这适用于字符串、数字、布尔值和空类型。其中每种类型的限制与该类型标量的限制相同。 您还可以使用与映射相同的映射，为映射列表提供映射。 您无法提供混合类型的列表。  |  DynamoDB 支持[列表类型属性](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.Document.List)。  | 
| 设置 |  OpenSearch 根据 DynamoDB 集合中的内容，为 DynamoDB 集提供不同的结果。 当一个集合包含所有相同类型的标量类型（例如，所有字符串的集合）时，则将该集合作为该类型的数组进行 OpenSearch 摄取。这适用于字符串、数字、布尔值和空类型。其中每种类型的限制与该类型标量的限制相同。 您还可以使用与映射相同的映射，为映射集合提供映射。 您无法提供混合类型的集合。  | DynamoDB 支持表示[集合](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes.SetTypes)的类型。 | 

我们建议您在摄取管道中配置死信队列 (DLQ)。 OpenSearch 如果您已配置队列，S OpenSearch ervice 会将所有因动态映射失败而无法载入的失败文档发送到队列。

如果自动映射失败，则可以在管道配置中使用 `template_type` 和 `template_content` 来定义显式映射规则。或者，您可以在启动管道之前直接在搜索域或集合中创建映射模板。

## 限制
<a name="ddb-pipeline-limitations"></a>

在为 DynamoDB 设置 OpenSearch 摄取管道时，请考虑以下限制：
+  OpenSearch 采集与 DynamoDB 的集成目前不支持跨区域接入。您的 DynamoDB 表 OpenSearch 和摄取管道必须处于相同的位置。 AWS 区域
+ 您的 DynamoDB 表 OpenSearch 和摄取管道必须处于相同的位置。 AWS 账户
+ 一个 OpenSearch 摄取管道仅支持一个 DynamoDB 表作为其来源。
+ DynamoDB Streams 仅在日志中存储最多 24 小时的数据。如果从大型表的初始快照中摄取数据需要 24 小时或更长时间，则会丢失一些初始数据。为了缓解这种数据丢失，请估计表的大小并配置适当的 OpenSearch 摄取管道计算单元。

## DynamoDB 的推荐 CloudWatch 警报
<a name="ddb-pipeline-metrics"></a>

建议使用以下 CloudWatch 指标来监控您的摄取管道的性能。这些指标可能有助您确定处理的导出数据量、处理的流事件量、处理导出和流事件时的错误数以及写入目标的文档数量。您可以设置 CloudWatch 警报，以便在其中一个指标在指定时间内超过指定值时执行操作。


| 指标 | 说明 | 
| --- |--- |
| dynamodb-pipeline.BlockingBuffer.bufferUsage.value |  指示正在使用的缓冲区数量。  | 
|  dynamodb-pipeline.dynamodb.activeExportS3ObjectConsumers.value  |  显示正在积极处理待导出 OCUs 的 Amazon S3 对象的总数。  | 
|  dynamodb-pipeline.dynamodb.bytesProcessed.count  |  处理的 DynamoDB 源字节数。  | 
|  dynamodb-pipeline.dynamodb.changeEventsProcessed.count  |  处理的 DynamoDB 流更改事件数量。  | 
|  dynamodb-pipeline.dynamodb.changeEventsProcessingErrors.count  |  处理的 DynamoDB 更改事件中的错误数。  | 
|  dynamodb-pipeline.dynamodb.exportJobFailure.count  | 失败的导出任务提交尝试次数。 | 
|  dynamodb-pipeline.dynamodb.exportJobSuccess.count  | 已成功提交的导出任务数量。 | 
|  dynamodb-pipeline.dynamodb.exportRecordsProcessed.count  |  处理的导出记录总数。  | 
|  dynamodb-pipeline.dynamodb.exportRecordsTotal.count  |  从 DynamoDB 导出的记录总数，这对于跟踪数据导出量至关重要。  | 
|  dynamodb-pipeline.dynamodb.exportS3ObjectsProcessed.count  | 从 Amazon S3 成功处理的导出数据文件总数。 | 
|  dynamodb-pipeline.opensearch.bulkBadRequestErrors.count  | 批量请求期间由于请求格式错误而导致的错误计数。 | 
|  dynamodb-pipeline.opensearch.bulkRequestLatency.avg  | 向发出的批量写入请求的平均延迟 OpenSearch。 | 
|  dynamodb-pipeline.opensearch.bulkRequestNotFoundErrors.count  | 由于找不到目标数据而失败的批量请求数。 | 
|  dynamodb-pipeline.opensearch.bulkRequestNumberOfRetries.count  | 采集管道为写 OpenSearch 入集群而进行的重试次数。 OpenSearch | 
|  dynamodb-pipeline.opensearch.bulkRequestSizeBytes.sum  | 向发出的所有批量请求的总大小（以字节为单位） OpenSearch。 | 
|  dynamodb-pipeline.opensearch.documentErrors.count  | 向发送文档时出现的错误数 OpenSearch。导致错误的文件将发送给 DLQ。 | 
|  dynamodb-pipeline.opensearch.documentsSuccess.count  | 成功写入 OpenSearch 集群或集合的文档数。 | 
|  dynamodb-pipeline.opensearch.documentsSuccessFirstAttempt.count  | 第一次尝试成功编入索引 OpenSearch 的文档数。 | 
|  `dynamodb-pipeline.opensearch.documentsVersionConflictErrors.count`  | 处理过程中由于文档版本冲突而导致的错误计数。 | 
|  `dynamodb-pipeline.opensearch.PipelineLatency.avg`  |  OpenSearch Ingestion 管道通过从源读取数据到写入目标来处理数据的平均延迟。 | 
|  dynamodb-pipeline.opensearch.PipelineLatency.max  | 通过从源读取数据到写入目标来处理数据的 OpenSearch Ingestion 管道的最大延迟。 | 
|  dynamodb-pipeline.opensearch.recordsIn.count  | 成功摄入 OpenSearch的记录数。该指标对于跟踪正在处理和存储的数据量至关重要。 | 
|  dynamodb-pipeline.opensearch.s3.dlqS3RecordsFailed.count  | 未能写入 DLQ 的记录数。 | 
|  dynamodb-pipeline.opensearch.s3.dlqS3RecordsSuccess.count  | 写入 DLQ 的记录数。 | 
|  dynamodb-pipeline.opensearch.s3.dlqS3RequestLatency.count  | Amazon S3 死信队列请求的延迟测量次数。 | 
|  dynamodb-pipeline.opensearch.s3.dlqS3RequestLatency.sum  | 向 Amazon S3 死信队列发出的所有请求的总延迟 | 
|  dynamodb-pipeline.opensearch.s3.dlqS3RequestSizeBytes.sum  | 向 Amazon S3 死信队列发出的所有请求的总大小（以字节为单位）。 | 
|  dynamodb-pipeline.recordsProcessed.count  | 管道中处理的记录总数，这是衡量整体吞吐量的关键指标。 | 
|  dynamodb.changeEventsProcessed.count  | 没有从 DynamoDB 流收集任何记录。这可能是由于桌上没有活动、正在进行导出或访问 DynamoDB 直播时出现问题。 | 
|  `dynamodb.exportJobFailure.count`  | 尝试触发向 S3 的导出操作失败。 | 
|  `dynamodb-pipeline.opensearch.bulkRequestInvalidInputErrors.count`  |  OpenSearch 由于输入无效而导致的批量请求错误计数，这对于监控数据质量和操作问题至关重要。 | 
|  opensearch.EndToEndLatency.avg  | 端到端延迟高于从 DynamoDB 数据流读取所需的延迟。这可能是由于 OpenSearch 集群规模不足，或者管道 OCU 的最大容量对于 DynamoDB 表上的 WCU 吞吐量来说过低。这种端到端的延迟在导出后会很高，而且随着时间的推移，它会赶上最新的 DynamoDB 流，因此会随着时间的推移而降低。 | 