

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 搭配 Amazon Kinesis Data Streams 使用 OpenSearch 擷取管道
<a name="configure-client-kinesis"></a>

搭配 Amazon Kinesis Data Streams 使用 OpenSearch Ingestion 管道，將來自多個串流的資料擷取至 Amazon OpenSearch Service 網域和集合。OpenSearch 擷取管道整合串流擷取基礎設施，以提供從 Kinesis 持續擷取串流記錄的高規模、低延遲方式。

**Topics**
+ [Amazon Kinesis Data Streams 作為來源](#confluent-cloud-kinesis)
+ [Amazon Kinesis Data Streams 跨帳戶做為來源](#kinesis-cross-account-source)

## Amazon Kinesis Data Streams 作為來源
<a name="confluent-cloud-kinesis"></a>

透過下列程序，您將了解如何設定使用 Amazon Kinesis Data Streams 做為資料來源的 OpenSearch Ingestion 管道。本節涵蓋必要的先決條件，例如建立 OpenSearch Service 網域或 OpenSearch Serverless Collection，以及逐步完成設定管道角色和建立管道的步驟。

### 先決條件
<a name="s3-prereqs"></a>

若要設定管道，您需要一或多個作用中的 Kinesis Data Streams。這些串流必須正在接收記錄或準備好接收來自其他來源的記錄。如需詳細資訊，請參閱 [ OpenSearch 擷取概觀](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/osis-getting-started-tutorials.html)。

**設定管道**

1. 

**建立 OpenSearch Service 網域或 OpenSearch Serverless 集合**

   若要建立網域或集合，請參閱 [ OpenSearch Ingestion 入門](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/osis-getting-started-tutorials.html)。

   若要建立具有正確許可的 IAM 角色，以存取寫入資料至集合或網域，請參閱以[資源為基礎的政策](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource)。

1. 

**設定具有 許可的管道角色**

   [設定您想要在管道組態中使用的管道角色](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-security-overview.html#pipeline-security-sink)，並將下列許可新增至管道組態。使用您的資訊取代*預留位置的值*。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "allowReadFromStream",
               "Effect": "Allow",
               "Action": [
                   "kinesis:DescribeStream",
                   "kinesis:DescribeStreamConsumer",
                   "kinesis:DescribeStreamSummary",
                   "kinesis:GetRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:ListShards",
                   "kinesis:ListStreams",
                   "kinesis:ListStreamConsumers",
                   "kinesis:RegisterStreamConsumer",
                   "kinesis:SubscribeToShard"
               ],
               "Resource": [
                   "arn:aws:kinesis:us-east-1:111122223333:stream/stream-name"
               ]
           }
       ]
   }
   ```

------

   如果在串流上啟用伺服器端加密，下列 AWS KMS 政策允許 解密記錄。使用您的資訊取代*預留位置的值*。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "allowDecryptionOfCustomManagedKey",
               "Effect": "Allow",
               "Action": [
                   "kms:Decrypt",
                   "kms:GenerateDataKey"
               ],
               "Resource": "arn:aws:kms:us-east-1:111122223333:key/key-id"
           }
       ]
   }
   ```

------

   為了讓管道將資料寫入網域，網域必須具有[網域層級存取政策](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/ac.html#ac-types-resource)，允許 **sts\$1role\$1arn** 管道角色存取它。

   下列範例是網域存取政策，允許在上一個步驟 (`pipeline-role`) 中建立的管道角色將資料寫入`ingestion-domain`網域。使用您的資訊取代*預留位置的值*。

   ```
   {
     "Statement": [
       {
         "Effect": "Allow",
         "Principal": {
           "AWS": "arn:aws:iam::your-account-id:role/pipeline-role"
         },
         "Action": ["es:DescribeDomain", "es:ESHttp*"],
         "Resource": "arn:aws:es:AWS 區域:account-id:domain/domain-name/*"
       }
     ]
   }
   ```

1. 

**建立管道**

   設定指定 **Kinesis-data-streams** 做為來源的 OpenSearch Ingestion 管道。您可以在 OpenSearch 擷取主控台中找到可用於建立此類管道的準備就緒藍圖。（選用） 若要使用 建立管道 AWS CLI，您可以使用名為 "**`AWS-KinesisDataStreamsPipeline`**" 的藍圖。使用您的資訊取代*預留位置的值*。

   ```
   version: "2"
   kinesis-pipeline:
     source:
       kinesis_data_streams:
         acknowledgments: true
         codec:
           # Based on whether kinesis records are aggregated or not, you could choose json, newline or ndjson codec for processing the records.
           # JSON codec supports parsing nested CloudWatch Events into individual log entries that will be written as documents into OpenSearch.
           # json:
             # key_name: "logEvents"
             # These keys contain the metadata sent by CloudWatch Subscription Filters
             # in addition to the individual log events:
             # include_keys: [ 'owner', 'logGroup', 'logStream' ]
           newline:
         streams:
           - stream_name: "stream name"
             # Enable this if ingestion should start from the start of the stream.
             # initial_position: "EARLIEST"
             # checkpoint_interval: "PT5M"
             # Compression will always be gzip for CloudWatch, but will vary for other sources:
             # compression: "gzip"
           - stream_name: "stream name"
             # Enable this if ingestion should start from the start of the stream.
             # initial_position: "EARLIEST"
             # checkpoint_interval: "PT5M"
             # Compression will always be gzip for CloudWatch, but will vary for other sources:
             # compression: "gzip"
   
           # buffer_timeout: "1s"
           # records_to_accumulate: 100
           # Change the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS.
           # consumer_strategy: "polling"
           # if consumer strategy is set to "polling", enable the polling config below.
           # polling:
             # max_polling_records: 100
             # idle_time_between_reads: "250ms"
         aws:
           # Provide the Role ARN with access to Amazon Kinesis Data Streams. This role should have a trust relationship with osis-pipelines.amazonaws.com
           sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
           # Provide the AWS 區域 of the Data Stream.
           region: "us-east-1"
   
     sink:
       - opensearch:
           # Provide an Amazon OpenSearch Serverless domain endpoint
           hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ]
           index: "index_${getMetadata(\"stream_name\")}"
           # Ensure adding unique document id as a combination of the metadata attributes available.
           document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}"
           aws:
             # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
             sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
             # Provide the AWS 區域 of the domain.
             region: "us-east-1"
             # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection
             serverless: false
             # serverless_options:
               # Specify a name here to create or update network policy for the serverless collection
               # network_policy_name: "network-policy-name"
           # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x
           # distribution_version: "es6"
           # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html
           # enable_request_compression: true/false
           # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ.
           dlq:
             s3:
               # Provide an S3 bucket
               bucket: "your-dlq-bucket-name"
               # Provide a key path prefix for the failed requests
               # key_path_prefix: "kinesis-pipeline/logs/dlq"
               # Provide the region of the bucket.
               region: "us-east-1"
               # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
               sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
   ```

**組態選項**  
如需 Kinesis 組態選項，請參閱 *OpenSearch* 文件中的[組態選項](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kinesis/#configuration-options)。

**可用的中繼資料屬性**
   + **stream\$1name** – 從中擷取記錄的 Kinesis Data Streams 名稱
   + **partition\$1key** – 正在擷取的 Kinesis Data Streams 記錄的分割區索引鍵
   + **sequence\$1number** – 正在擷取的 Kinesis Data Streams 記錄序號
   + **sub\$1sequence\$1number** – 正在擷取的 Kinesis Data Streams 記錄的子序號

1. 

**（選用） 設定 Kinesis Data Streams 管道的建議運算單位 (OCUs)**

   OpenSearch Kinesis Data Streams 來源管道也可以設定為從多個串流擷取串流記錄。為了加快擷取速度，我們建議您為每個新增的串流新增額外的運算單位。

### 資料一致性
<a name="confluent-cloud-kinesis-private"></a>

OpenSearch Ingestion end-to-end確認，以確保資料耐久性。當管道從 Kinesis 讀取串流記錄時，它會根據與串流相關聯的碎片動態分配讀取串流記錄的工作。在擷取 OpenSearch 網域或集合中的所有記錄之後，管道會在收到確認時自動檢查點串流。這可避免重複處理串流記錄。

若要根據串流名稱建立索引，請在 opensearch sink 區段中將索引定義為 **"index\$1\$1\$1getMetadata(\$1"stream\$1name\$1")\$1"**。

## Amazon Kinesis Data Streams 跨帳戶做為來源
<a name="kinesis-cross-account-source"></a>

您可以使用 Amazon Kinesis Data Streams 跨帳戶授予存取權，以便 OpenSearch Ingestion 管道可以存取另一個帳戶中的 Kinesis Data Streams 作為來源。完成下列步驟以啟用跨帳戶存取：

**設定跨帳戶存取**

1. 

**在具有 Kinesis 串流的帳戶中設定資源政策**

   使用您的資訊取代*預留位置的值*。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "StreamReadStatementID",
               "Effect": "Allow",
               "Principal": {
                   "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role"
               },
               "Action": [
                   "kinesis:DescribeStreamSummary",
                   "kinesis:GetRecords",
                   "kinesis:GetShardIterator",
                   "kinesis:ListShards"
               ],
               "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name"
           },
           {
               "Sid": "StreamEFOReadStatementID",
               "Effect": "Allow",
               "Principal": {
                   "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role"
               },
               "Action": [
                   "kinesis:DescribeStreamSummary",
                   "kinesis:ListShards"
               ],
               "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-name/consumer/consumer-name"
           }
       ]
   }
   ```

------

1. 

**（選用） 設定消費者和消費者資源政策**

   這是選用步驟，只有在您計劃使用增強型廣發消費者策略來讀取串流記錄時才需要。如需詳細資訊，請參閱[使用專用輸送量開發增強型廣發消費者](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html)。

   1. 

**設定取用者**

      若要重複使用現有的消費者，您可以略過此步驟。如需詳細資訊，請參閱《*Amazon Kinesis Data Streams API 參考*》中的 [RegisterStreamConsumer](https://docs.aws.amazon.com/dms/latest/APIReference/API_RegisterStreamConsumer.html)。

      在下列範例 CLI 命令中，將*預留位置值*取代為您自己的資訊。  
**Example ：CLI 命令範例**  

      ```
      aws kinesis register-stream-consumer \
      --stream-arn "arn:aws:kinesis:AWS 區域:account-id:stream/stream-name" \
      --consumer-name consumer-name
      ```

   1. 

**設定消費者資源政策**

      在下列陳述式中，將*預留位置值*取代為您自己的資訊。

------
#### [ JSON ]

****  

      ```
      {
          "Version":"2012-10-17",		 	 	 
          "Statement": [
              {
                  "Sid": "ConsumerEFOReadStatementID",
                  "Effect": "Allow",
                  "Principal": {
                      "AWS": "arn:aws:iam::111122223333:role/Pipeline-Role"
                  },
                  "Action": [
                      "kinesis:DescribeStreamConsumer",
                      "kinesis:SubscribeToShard"
                  ],
                  "Resource": "arn:aws:kinesis:us-east-1:444455556666:stream/stream-1/consumer/consumer-name"
              }
          ]
      }
      ```

------

1. 

**管道組態**

   對於跨帳戶擷取，`kinesis_data_streams`請為每個串流在 下新增下列屬性：
   + `stream_arn` - 屬於串流所在帳戶之串流的 arn
   + `consumer_arn` - 這是選用屬性，如果選擇預設增強型廣發消費者策略，則必須指定此屬性。指定此欄位的實際消費者來源。使用您的資訊取代*預留位置的值*。

   ```
   version: "2"
        kinesis-pipeline:
          source:
            kinesis_data_streams:
              acknowledgments: true
              codec:
                newline:
              streams:
                - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name"
                  consumer_arn: "consumer arn"
                  # Enable this if ingestion should start from the start of the stream.
                  # initial_position: "EARLIEST"
                  # checkpoint_interval: "PT5M"
                - stream_arn: "arn:aws:kinesis:region:stream-account-id:stream/stream-name"
                  consumer_arn: "consumer arn"
                   # initial_position: "EARLIEST"
        
                # buffer_timeout: "1s"
                # records_to_accumulate: 100
                # Enable the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS.
                # consumer_strategy: "polling"
                # if consumer strategy is set to "polling", enable the polling config below.
                # polling:
                  # max_polling_records: 100
                  # idle_time_between_reads: "250ms"
              aws:
                # Provide the Role ARN with access to Kinesis. This role should have a trust relationship with osis-pipelines.amazonaws.com
                sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
                # Provide the AWS 區域 of the domain.
                region: "us-east-1"
        
          sink:
            - opensearch:
                # Provide an OpenSearch Serverless domain endpoint
                hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ]
                index: "index_${getMetadata(\"stream_name\")}"
                # Mapping for documentid based on partition key, shard sequence number and subsequence number metadata attributes
                document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}"
                aws:
                  # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
                  sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
                  # Provide the AWS 區域 of the domain.
                  region: "us-east-1"
                  # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection
                  serverless: false
                    # serverless_options:
                    # Specify a name here to create or update network policy for the serverless collection
                  # network_policy_name: network-policy-name
                # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x
                # distribution_version: "es6"
                # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html
                # enable_request_compression: true/false
                # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ.
                dlq:
                  s3:
                    # Provide an Amazon S3 bucket
                    bucket: "your-dlq-bucket-name"
                    # Provide a key path prefix for the failed requests
                    # key_path_prefix: "alb-access-log-pipeline/logs/dlq"
                    # Provide the AWS 區域 of the bucket.
                    region: "us-east-1"
                    # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
                    sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role"
   ```

1. 

**OSI 管道角色 Kinesis 資料串流**

   1. 

**IAM 政策**

      將下列政策新增至管道角色。使用您的資訊取代*預留位置的值*。

------
#### [ JSON ]

****  

      ```
      {
          "Version":"2012-10-17",		 	 	 
          "Statement": [
              {
                  "Effect": "Allow",
                  "Action": [
                      "kinesis:DescribeStreamConsumer",
                      "kinesis:SubscribeToShard"
                  ],
                  "Resource": [
                  "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream"
                  ]
              },
              {
                  "Sid": "allowReadFromStream",
                  "Effect": "Allow",
                  "Action": [
                      "kinesis:DescribeStream",
                      "kinesis:DescribeStreamSummary",
                      "kinesis:GetRecords",
                      "kinesis:GetShardIterator",
                      "kinesis:ListShards",
                      "kinesis:ListStreams",
                      "kinesis:ListStreamConsumers",
                      "kinesis:RegisterStreamConsumer"
                  ],
                  "Resource": [
                      "arn:aws:kinesis:us-east-1:111122223333:stream/my-stream"
                  ]
              }
          ]
      }
      ```

------

   1. 

**信任政策**

      若要從串流帳戶擷取資料，您需要在管道擷取角色和串流帳戶之間建立信任關係。將下列項目新增至管道角色。使用您的資訊取代*預留位置的值*。

------
#### [ JSON ]

****  

      ```
      {
        "Version":"2012-10-17",		 	 	 
        "Statement": [{
           "Effect": "Allow",
           "Principal": {
             "AWS": "arn:aws:iam::111122223333:root"
            },
           "Action": "sts:AssumeRole"
        }]
      }
      ```

------