

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 將資料串流載入至 Amazon OpenSearch Service
<a name="integrations"></a>

您可以使用 OpenSearch Ingestion 直接將[串流資料](https://aws.amazon.com/streaming-data/)載入 Amazon OpenSearch Service 網域，而不需要使用第三方解決方案。若要將資料傳送至 OpenSearch Ingestion，您可以設定資料生產者，而 服務會自動將資料交付至您指定的網域或集合。若要開始使用 OpenSearch Ingestion，請參閱 [教學課程：使用 Amazon OpenSearch Ingestion 將資料擷取至集合](osis-serverless-get-started.md)。

您仍然可以使用其他來源載入串流資料，例如 Amazon Data Firehose 和 Amazon CloudWatch Logs，這些資料內建支援 OpenSearch Service。諸如 Amazon S3、Amazon Kinesis Data Streams 和 Amazon DynamoDB 等其他來源使用 AWS Lambda 函數作為事件處理程序。Lambda 函數透過處理新資料並將其串流到您的網域來對其進行回應。

**注意**  
Lambda 支援數種熱門的程式設計語言，並且可用於大部分 AWS 區域。如需詳細資訊，請參閱《 *AWS Lambda 開發人員指南*》中的 [Lambda 入門](https://docs.aws.amazon.com/lambda/latest/dg/lambda-app.html)和《》中的[AWS 服務端點](https://docs.aws.amazon.com/general/latest/gr/rande.html#lambda_region)*AWS 一般參考*。

# 從 OpenSearch Ingestion 載入串流資料
<a name="integrations-osis"></a>

您可以使用 Amazon OpenSearch Ingestion 將資料載入 OpenSearch Service 網域。您可以將資料生產者設定為將資料傳送至 OpenSearch Ingestion，並自動將資料交付至您指定的集合。您也可以設定 OpenSearch Ingestion，在交付資料之前轉換資料。如需詳細資訊，請參閱[Amazon OpenSearch 擷取概觀](ingestion.md)。

# 從 Amazon S3 載入串流資料
<a name="integrations-s3-lambda"></a>

您可以使用 Lambda 將資料從 Amazon S3 傳送到您的 OpenSearch Service 網域。送達 S3 儲存貯體的新資料會觸發 Lambda 的事件通知，然後執行您的自訂程式碼以執行索引。

這個串流資料方法非常靈活。您可以[ 索引物件中繼資料 ](https://aws.amazon.com/blogs/database/indexing-metadata-in-amazon-elasticsearch-service-using-aws-lambda-and-python/)，若是純文字的話，便可剖析和索引一些物件本體元素。本節包含一些簡單的 Python 範本程式碼，其使用規則表達式來剖析日誌檔案和索引比對。

## 先決條件
<a name="integrations-s3-lambda-prereq"></a>

繼續之前，您必須準備好以下資源。


****  

| 先決條件 | Description | 
| --- | --- | 
| Amazon S3 儲存貯體 | 如需詳細資訊，請參閱 Amazon Simple Storage Service 使用者指南中的[建立您的第一個 S3 儲存貯體](https://docs.aws.amazon.com/AmazonS3/latest/userguide/CreatingABucket.html)。儲存貯體必須位於與您的 OpenSearch Service 網域相同的區域中。 | 
| OpenSearch Service 網域 | 您的 Lambda 函數處理資料後的資料目的地。如需詳細資訊，請參閱[建立 OpenSearch Service 網域](createupdatedomains.md#createdomains)。 | 

## 建立 Lambda 部署套件
<a name="integrations-s3-lambda-deployment-package"></a>

部署套件是指包含您的程式碼及其相依存項目的 ZIP 或 JAR 檔案。本節包括 Python 範本程式碼。如需其他程式設計語言，請參閱 *AWS Lambda 開發人員指南*中的 [Lambda 部署套件](https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-package.html)。

1. 建立目錄。在此範例中，我們使用 `s3-to-opensearch` 這個名稱。

1. 在目錄中建立名為 `sample.py` 的檔案：

   ```
   import boto3
   import re
   import requests
   from requests_aws4auth import AWS4Auth
   
   region = '' # e.g. us-west-1
   service = 'es'
   credentials = boto3.Session().get_credentials()
   awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
   
   host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com
   index = 'lambda-s3-index'
   datatype = '_doc'
   url = host + '/' + index + '/' + datatype
   
   headers = { "Content-Type": "application/json" }
   
   s3 = boto3.client('s3')
   
   # Regular expressions used to parse some simple log lines
   ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)')
   time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]')
   message_pattern = re.compile('\"(.+)\"')
   
   # Lambda execution starts here
   def handler(event, context):
       for record in event['Records']:
   
           # Get the bucket name and key for the new file
           bucket = record['s3']['bucket']['name']
           key = record['s3']['object']['key']
   
           # Get, read, and split the file into lines
           obj = s3.get_object(Bucket=bucket, Key=key)
           body = obj['Body'].read()
           lines = body.splitlines()
   
           # Match the regular expressions to each line and index the JSON
           for line in lines:
               line = line.decode("utf-8")
               ip = ip_pattern.search(line).group(1)
               timestamp = time_pattern.search(line).group(1)
               message = message_pattern.search(line).group(1)
   
               document = { "ip": ip, "timestamp": timestamp, "message": message }
               r = requests.post(url, auth=awsauth, json=document, headers=headers)
   ```

   編輯 `region` 和 `host` 的變數。

1. [安裝 pip](https://pip.pypa.io/en/stable/installation/) (如果您尚未安裝的話)，然後將相依項目安裝到新的 `package` 目錄：

   ```
   cd s3-to-opensearch
   
   pip install --target ./package requests
   pip install --target ./package requests_aws4auth
   ```

   所有 Lambda 執行環境均已安裝 [Boto3](https://aws.amazon.com/sdk-for-python/)，因此您不需要將其包含在您的部署套件中。

1. 封裝應用程式的程式碼和相依性：

   ```
   cd package
   zip -r ../lambda.zip .
   
   cd ..
   zip -g lambda.zip sample.py
   ```

## 建立 Lambda 函式
<a name="integrations-s3-lambda-create"></a>

建立部署套件後，可建立 Lambda 函數。當您建立函數時，請選擇名稱、執行時間 (例如，Python 3.8) 和 IAM 角色。IAM 角色定義函數的許可。如需詳細說明，請參閱 *AWS Lambda 開發人員指南*中的[使用主控台建立 Lambda 函數](https://docs.aws.amazon.com/lambda/latest/dg/get-started-create-function.html)。

此範例假設您正在使用主控台。選擇 Python 3.9 和具有 S3 讀取許可和 OpenSearch Service 寫入許可的角色，如以下螢幕擷取畫面所示：

![\[Lambda 函數的範本組態\]](http://docs.aws.amazon.com/zh_tw/opensearch-service/latest/developerguide/images/lambda-function.png)


在建立函數，您必須新增觸發。在此範例中，我們希望只要日誌檔案送達 S3 儲存貯體，便執行程式碼：

1. 選擇 **Add trigger** (新增觸發條件)，然後選取 **S3**。

1. 選擇您的儲存貯體。

1. 針對 **Event type (事件類型)** 選擇 **PUT**。

1. 針對 **Prefix (字首)** 輸入 `logs/`。

1. 對於 **Suffix** (尾碼)，輸入 `.log`。

1. 確認遞迴叫用警告，然後選擇 **Add** (新增)。

最後，您可以上傳部署套件：

1. 選擇 **Upload from** (上傳自) 和 **.zip file** (.zip 檔案)，然後依照提示上傳您的部署套件。

1. 上傳完成後，編輯 **Runtime settings** (執行時間設定)，然後將 **Handler** (處理常式) 變更為 `sample.handler`。此設定通知 Lambda 應該在觸發後執行的檔案 (`sample.py`) 和方法 (`handler`)。

此時，您有一整組的資源：日誌檔案的儲存貯體、只要日誌檔案新增到儲存貯體便執行的函數、執行剖析和索引的程式碼，以及可搜尋和視覺化的 OpenSearch Service 網域。

## 測試 Lambda 函數
<a name="integrations-s3-lambda-configure"></a>

在建立函數之後，您可以進行測試，做法是上傳檔案到 Amazon S3 儲存貯體。使用下列範例日誌行，建立名為 `sample.log` 的檔案：

```
12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg"
12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"
```

上傳檔案到您的 S3 儲存貯體的 `logs` 資料夾。如需指示說明，請參閱 *Amazon Simple Storage Service 使用者指南*中的[將物件上傳至您的儲存貯體](https://docs.aws.amazon.com/AmazonS3/latest/userguide/PuttingAnObjectInABucket.html)。

然後使用 OpenSearch Service 主控台或 OpenSearch Dashboards 來確認 `lambda-s3-index` 索引包含兩個文件。您也可以提出標準搜尋請求：

```
GET https://domain-name/lambda-s3-index/_search?pretty
{
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "lambda-s3-index",
        "_type" : "_doc",
        "_id" : "vTYXaWIBJWV_TTkEuSDg",
        "_score" : 1.0,
        "_source" : {
          "ip" : "12.345.678.91",
          "message" : "GET /some-file.jpg",
          "timestamp" : "10/Oct/2000:14:56:14 -0700"
        }
      },
      {
        "_index" : "lambda-s3-index",
        "_type" : "_doc",
        "_id" : "vjYmaWIBJWV_TTkEuCAB",
        "_score" : 1.0,
        "_source" : {
          "ip" : "12.345.678.90",
          "message" : "PUT /some-file.jpg",
          "timestamp" : "10/Oct/2000:13:55:36 -0700"
        }
      }
    ]
  }
}
```

# 從 Amazon Kinesis Data Streams 中載入串流資料
<a name="integrations-kinesis"></a>

您可以將串流資料從 Kinesis Data Streams 中載入至 OpenSearch Service。送達資料串流的新資料會觸發 Lambda 的事件通知，然後執行您的自訂程式碼以執行索引。本節包括一些簡單的 Python 範本程式碼。

## 先決條件
<a name="integrations-kinesis-lambda-prereq"></a>

繼續之前，您必須準備好以下資源。


| 先決條件 | Description | 
| --- | --- | 
| Amazon Kinesis Data Stream | Lambda 函數的事件來源。如需進一步了解，請參閱 [Kinesis Data Streams](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)。 | 
| OpenSearch Service 網域 | 您的 Lambda 函數處理資料後的資料目的地。如需詳細資訊，請參閱 [建立 OpenSearch Service 網域](createupdatedomains.md#createdomains)。 | 
| IAM 角色 |  此角色必須擁有基本 OpenSearch Service、Kinesis 和 Lambda 許可，例如下列項目：   JSON   

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "es:ESHttpPost",
        "es:ESHttpPut",
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents",
        "kinesis:GetShardIterator",
        "kinesis:GetRecords",
        "kinesis:DescribeStream",
        "kinesis:ListStreams"
      ],
      "Resource": "*"
    }
  ]
}
```     角色必須具有下列信任關係：   JSON   

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
```     如需進一步了解，請參閱 *IAM 使用者指南*中的[建立 IAM 角色](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create.html)。  | 

# 建立 Lambda 函式
<a name="integrations-kinesis-lambda"></a>

遵循[建立 Lambda 部署套件](integrations-s3-lambda.md#integrations-s3-lambda-deployment-package)中的指示，但要建立名為 `kinesis-to-opensearch` 的目錄，並使用以下適用於 `sample.py` 的程式碼：

```
import base64
import boto3
import json
import requests
from requests_aws4auth import AWS4Auth

region = '' # e.g. us-west-1
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com
index = 'lambda-kine-index'
datatype = '_doc'
url = host + '/' + index + '/' + datatype + '/'

headers = { "Content-Type": "application/json" }

def handler(event, context):
    count = 0
    for record in event['Records']:
        id = record['eventID']
        timestamp = record['kinesis']['approximateArrivalTimestamp']

        # Kinesis data is base64-encoded, so decode here
        message = base64.b64decode(record['kinesis']['data'])

        # Create the JSON document
        document = { "id": id, "timestamp": timestamp, "message": message }
        # Index the document
        r = requests.put(url + id, auth=awsauth, json=document, headers=headers)
        count += 1
    return 'Processed ' + str(count) + ' items.'
```

編輯 `region` 和 `host` 的變數。

[安裝 pip](https://pip.pypa.io/en/stable/installation/) (如果您尚未安裝的話)，然後使用下列命令安裝相依項目：

```
cd kinesis-to-opensearch

pip install --target ./package requests
pip install --target ./package requests_aws4auth
```

然後，遵循[建立 Lambda 函式](integrations-s3-lambda.md#integrations-s3-lambda-create)中的指示，但要從[先決條件](integrations-kinesis.md#integrations-kinesis-lambda-prereq)指定 IAM 角色和下列用於觸發的設定：
+ **Kinesis 串流**：您的 Kinesis 串流
+ **批次大小**：100
+ **開始位置**：水平修剪

如需進一步了解，請參閱 *Amazon Kinesis Data Streams 開發人員指南* 中的[什麼是 Amazon Kinesis Data Streams？](https://docs.aws.amazon.com/streams/latest/dev/working-with-kinesis.html)。

此時，您有一整組的資源：Kinesis 資料串流、在串流收到新資料並索引該資料後執行的函數，以及可搜尋和視覺化的 OpenSearch Service 網域。

# 測試 Lambda 函數
<a name="integrations-kinesis-testing"></a>

在建立函數後，您可以做測試，方法是將新記錄加入到使用 AWS CLI的資料串流：

```
aws kinesis put-record --stream-name test --data "My test data." --partition-key partitionKey1 --region us-west-1
```

然後使用 OpenSearch Service 主控台或 OpenSearch Dashboards 來確認 `lambda-kine-index` 包含一個文件。您也可以使用以下請求：

```
GET https://domain-name/lambda-kine-index/_search
{
  "hits" : [
    {
      "_index": "lambda-kine-index",
      "_type": "_doc",
      "_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042",
      "_score": 1,
      "_source": {
        "timestamp": 1523648740.051,
        "message": "My test data.",
        "id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042"
      }
    }
  ]
}
```

# 從 Amazon DynamoDB 中載入串流資料
<a name="integrations-dynamodb"></a>

您可以使用 從 Amazon DynamoDB AWS Lambda 將資料傳送至 OpenSearch Service 網域。送達資料庫資料表的新資料會觸發 Lambda 的事件通知，然後執行您的自訂程式碼以執行索引。

## 先決條件
<a name="integrations-dynamodb-prereq"></a>

繼續之前，您必須準備好以下資源。


| 先決條件 | Description | 
| --- | --- | 
| DynamoDB 表 | 表格中包含您的來源資料。如需詳細資訊，請參閱 *Amazon DynamoDB 開發人員指南*中的 [DynamoDB 資料表上的基本操作](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.Basics.html)。資料表必須位於和 OpenSearch Service 網域相同的區域，並將串流設定為 **New image** (新映像)。如需進一步了解，請參閱[啟用串流](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling)。 | 
| OpenSearch Service 域 | 您的 Lambda 函數處理資料後的資料目的地。如需詳細資訊，請參閱[建立 OpenSearch Service 網域](createupdatedomains.md#createdomains)。 | 
| IAM 角色 | 此角色必須擁有基本 OpenSearch Service、DynamoDB 和 Lambda 執行許可，例如下列項目：  JSON   

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "es:ESHttpPost",
        "es:ESHttpPut",
        "dynamodb:DescribeStream",
        "dynamodb:GetRecords",
        "dynamodb:GetShardIterator",
        "dynamodb:ListStreams",
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "*"
    }
  ]
}
```    角色必須具有下列信任關係：  JSON   

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
```    如需進一步了解，請參閱 *IAM 使用者指南*中的[建立 IAM 角色](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create.html)。 | 

## 建立 Lambda 函式
<a name="integrations-dynamodb-lambda"></a>

遵循[建立 Lambda 部署套件](integrations-s3-lambda.md#integrations-s3-lambda-deployment-package)中的指示，但要建立名為 `ddb-to-opensearch` 的目錄，並使用以下適用於 `sample.py` 的程式碼：

```
import boto3
import requests
from requests_aws4auth import AWS4Auth

region = '' # e.g. us-east-1
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com
index = 'lambda-index'
datatype = '_doc'
url = host + '/' + index + '/' + datatype + '/'

headers = { "Content-Type": "application/json" }

def handler(event, context):
    count = 0
    for record in event['Records']:
        # Get the primary key for use as the OpenSearch ID
        id = record['dynamodb']['Keys']['id']['S']

        if record['eventName'] == 'REMOVE':
            r = requests.delete(url + id, auth=awsauth)
        else:
            document = record['dynamodb']['NewImage']
            r = requests.put(url + id, auth=awsauth, json=document, headers=headers)
        count += 1
    return str(count) + ' records processed.'
```

編輯 `region` 和 `host` 的變數。

[安裝 pip](https://pip.pypa.io/en/stable/installation/) (如果您尚未安裝的話)，然後使用下列命令安裝相依項目：

```
cd ddb-to-opensearch

pip install --target ./package requests
pip install --target ./package requests_aws4auth
```

然後，遵循[建立 Lambda 函式](integrations-s3-lambda.md#integrations-s3-lambda-create)中的指示，但要從[先決條件](#integrations-dynamodb-prereq)指定 IAM 角色和下列用於觸發的設定：
+ **資料表**：您的 DynamoDB 資料表
+ **批次大小**：100
+ **開始位置**：水平修剪

如需進一步了解，請參閱 *Amazon DynamoDB 開發人員指南*中的[使用 DynamoDB Streams 和 Lambda 來處理新項目](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.Lambda.Tutorial.html)。

此時，您有一整組的資源：來源資料的 DynamoDB 資料表、資料表的 DynamoDB 串流變更、在您的來源資料變更並索引這些變更後所執行的函數，以及可搜尋和視覺化的 OpenSearch Service 網域。

## 測試 Lambda 函數
<a name="integrations-dynamodb-lambda-test"></a>

在建立函數後，您可以進行測試，方法是將新項目新增至使用 AWS CLI的 DynamoDB 資料表：

```
aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1
```

然後使用 OpenSearch Service 主控台或 OpenSearch Dashboards 來確認 `lambda-index` 包含一個文件。您也可以使用以下請求：

```
GET https://domain-name/lambda-index/_doc/00001
{
    "_index": "lambda-index",
    "_type": "_doc",
    "_id": "00001",
    "_version": 1,
    "found": true,
    "_source": {
        "director": {
            "S": "Kevin Costner"
        },
        "id": {
            "S": "00001"
        },
        "title": {
            "S": "The Postman"
        }
    }
}
```

# 從 Amazon Data Firehose 載入串流資料
<a name="integrations-fh"></a>

Firehose 支援 OpenSearch Service 作為交付目的地。如需如何將串流資料載入 OpenSearch Service 的指示，請參閱《*Amazon* [Data Firehose 開發人員指南》中的建立 Kinesis Data Firehose 交付串流](https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html)和[為您的目的地選擇 OpenSearch Service](https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-elasticsearch)。

將資料載入到 OpenSearch Service 之前，您可能需要執行資料轉換。如需進一步了解如何使用 Lambda 函數來執行此任務，請參閱相同指南中的 [Amazon Kinesis Data Firehose 資料轉換](https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html)。

當您設定交付串流時，Firehose 具有「一鍵式」IAM 角色，提供它將資料傳送至 OpenSearch Service、備份 Amazon S3 上的資料，以及使用 Lambda 轉換資料所需的資源存取權。由於手動建立角色涉及複雜度，我們建議您使用所提供角色。

# 從 Amazon CloudWatch 中載入串流資料
<a name="integrations-cloudwatch"></a>

透過使用 CloudWatch Logs 訂閱，您可以將串流資料從 CloudWatch Logs 載入 OpenSearch Service 網域。如需有關 Amazon CloudWatch 訂閱的資訊，請參閱[使用訂閱即時處理日誌資料](https://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/Subscriptions.html)。如需組態資訊，請參閱 *Amazon CloudWatch 開發人員指南*中的[將 CloudWatch Logs 資料串流到 Amazon OpenSearch Service](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_OpenSearch_Stream.html)。

## 從 載入串流資料 AWS IoT
<a name="integrations-cloudwatch-iot"></a>

您可以使用 AWS IoT [規則](https://docs.aws.amazon.com/iot/latest/developerguide/iot-rules.html)從 傳送資料。如需進一步了解，請參閱 *AWS IoT 開發人員指南*中的 [OpenSearch](https://docs.aws.amazon.com/iot/latest/developerguide/opensearch-rule-action.html) 動作。