

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 從 Amazon Kinesis Data Streams 中載入串流資料
<a name="integrations-kinesis"></a>

您可以將串流資料從 Kinesis Data Streams 中載入至 OpenSearch Service。送達資料串流的新資料會觸發 Lambda 的事件通知，然後執行您的自訂程式碼以執行索引。本節包括一些簡單的 Python 範本程式碼。

## 先決條件
<a name="integrations-kinesis-lambda-prereq"></a>

繼續之前，您必須準備好以下資源。


| 先決條件 | Description | 
| --- | --- | 
| Amazon Kinesis Data Stream | Lambda 函數的事件來源。如需進一步了解，請參閱 [Kinesis Data Streams](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)。 | 
| OpenSearch Service 網域 | 您的 Lambda 函數處理資料後的資料目的地。如需詳細資訊，請參閱 [建立 OpenSearch Service 網域](createupdatedomains.md#createdomains)。 | 
| IAM 角色 |  此角色必須擁有基本 OpenSearch Service、Kinesis 和 Lambda 許可，例如下列項目：   JSON   

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "es:ESHttpPost",
        "es:ESHttpPut",
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents",
        "kinesis:GetShardIterator",
        "kinesis:GetRecords",
        "kinesis:DescribeStream",
        "kinesis:ListStreams"
      ],
      "Resource": "*"
    }
  ]
}
```     角色必須具有下列信任關係：   JSON   

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
```     如需進一步了解，請參閱 *IAM 使用者指南*中的[建立 IAM 角色](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create.html)。  | 

# 建立 Lambda 函式
<a name="integrations-kinesis-lambda"></a>

遵循[建立 Lambda 部署套件](integrations-s3-lambda.md#integrations-s3-lambda-deployment-package)中的指示，但要建立名為 `kinesis-to-opensearch` 的目錄，並使用以下適用於 `sample.py` 的程式碼：

```
import base64
import boto3
import json
import requests
from requests_aws4auth import AWS4Auth

region = '' # e.g. us-west-1
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com
index = 'lambda-kine-index'
datatype = '_doc'
url = host + '/' + index + '/' + datatype + '/'

headers = { "Content-Type": "application/json" }

def handler(event, context):
    count = 0
    for record in event['Records']:
        id = record['eventID']
        timestamp = record['kinesis']['approximateArrivalTimestamp']

        # Kinesis data is base64-encoded, so decode here
        message = base64.b64decode(record['kinesis']['data'])

        # Create the JSON document
        document = { "id": id, "timestamp": timestamp, "message": message }
        # Index the document
        r = requests.put(url + id, auth=awsauth, json=document, headers=headers)
        count += 1
    return 'Processed ' + str(count) + ' items.'
```

編輯 `region` 和 `host` 的變數。

[安裝 pip](https://pip.pypa.io/en/stable/installation/) (如果您尚未安裝的話)，然後使用下列命令安裝相依項目：

```
cd kinesis-to-opensearch

pip install --target ./package requests
pip install --target ./package requests_aws4auth
```

然後，遵循[建立 Lambda 函式](integrations-s3-lambda.md#integrations-s3-lambda-create)中的指示，但要從[先決條件](integrations-kinesis.md#integrations-kinesis-lambda-prereq)指定 IAM 角色和下列用於觸發的設定：
+ **Kinesis 串流**：您的 Kinesis 串流
+ **批次大小**：100
+ **開始位置**：水平修剪

如需進一步了解，請參閱 *Amazon Kinesis Data Streams 開發人員指南* 中的[什麼是 Amazon Kinesis Data Streams？](https://docs.aws.amazon.com/streams/latest/dev/working-with-kinesis.html)。

此時，您有一整組的資源：Kinesis 資料串流、在串流收到新資料並索引該資料後執行的函數，以及可搜尋和視覺化的 OpenSearch Service 網域。

# 測試 Lambda 函數
<a name="integrations-kinesis-testing"></a>

在建立函數後，您可以做測試，方法是將新記錄加入到使用 AWS CLI的資料串流：

```
aws kinesis put-record --stream-name test --data "My test data." --partition-key partitionKey1 --region us-west-1
```

然後使用 OpenSearch Service 主控台或 OpenSearch Dashboards 來確認 `lambda-kine-index` 包含一個文件。您也可以使用以下請求：

```
GET https://domain-name/lambda-kine-index/_search
{
  "hits" : [
    {
      "_index": "lambda-kine-index",
      "_type": "_doc",
      "_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042",
      "_score": 1,
      "_source": {
        "timestamp": 1523648740.051,
        "message": "My test data.",
        "id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042"
      }
    }
  ]
}
```