

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 将流数据加载到 Amazon OpenSearch 服务
<a name="integrations"></a>

您可以使用 OpenSearch Ingestion 将[流数据](https://aws.amazon.com/streaming-data/)直接加载到您的亚马逊 OpenSearch 服务域中，无需使用第三方解决方案。要将数据发送到 OpenSearch Ingestion，您需要配置数据生成器，服务会自动将数据传送到您指定的域或集合。要开始使用 OpenSearch Ingestion，请参阅。[教程：使用 Amazon Ingestion 将数据提取到集合中 OpenSearch](osis-serverless-get-started.md)

您仍然可以使用其他来源加载流数据，例如 Amazon Data Firehose 和 Amazon CloudWatch Logs，它们内置了对 OpenSearch 服务的支持。其他用户（如 Amazon S3、Amazon Kinesis Data Streams 和 Amazon DynamoDB）使用 AWS Lambda 函数作为事件处理程序。Lambda 函数响应新数据的方式是处理数据并将其流式传输到域。

**注意**  
Lambda 支持多种常用编程语言，并且在大多数 AWS 区域中都可用。有关更多信息，请参阅 *AWS Lambda 开发人员指南*中的 [Lambda 入门](https://docs.aws.amazon.com/lambda/latest/dg/lambda-app.html)和 *AWS 一般参考* 中的 [AWS 服务端点](https://docs.aws.amazon.com/general/latest/gr/rande.html#lambda_region)。

# 从 OpenSearch Ingestion 加载流数据
<a name="integrations-osis"></a>

您可以使用 Amazon OpenSearch Ingestion 将数据加载到 OpenSearch 服务域中。您可以将数据生成器配置为将数据发送到 OpenSearch Ingestion，它会自动将数据传输到您指定的集合。您还可以将 OpenSearch Ingestion 配置为在交付数据之前对其进行转换。有关更多信息，请参阅 [Amazon OpenSearch Ingestion 概述](ingestion.md)。

# 从 Amazon S3 表中加载流数据
<a name="integrations-s3-lambda"></a>

您可以使用 Lambda 将数据从 Amazon S3 发送到您的 OpenSearch 服务域。到达 S3 存储桶的新数据将触发事件通知到 Lambda，这将运行自定义代码以执行编制索引。

这种流式传输数据的方式极其灵活。可以[为对象元数据编制索引](https://aws.amazon.com/blogs/database/indexing-metadata-in-amazon-elasticsearch-service-using-aws-lambda-and-python/)，或者如果对象是纯文本，则对对象正文的部分元素进行解析和编制索引。此节包含一些简单的 Python 示例代码，这些代码使用正则表达式解析日志文件并为匹配项编制索引。

## 先决条件
<a name="integrations-s3-lambda-prereq"></a>

继续操作之前，必须具有以下资源。


****  

| 先决条件 | 说明 | 
| --- | --- | 
| 亚马逊 S3 存储桶 | 有关更多信息，请参阅 Amazon Simple Storage Service 用户指南中的[创建您的第一个 S3 存储桶](https://docs.aws.amazon.com/AmazonS3/latest/userguide/CreatingABucket.html)。存储桶必须与您的 OpenSearch 服务域位于同一区域。 | 
| OpenSearch 服务域 | Lambda 函数处理数据之后数据的目的地。有关更多信息，请参阅 [创建 OpenSearch 服务域](createupdatedomains.md#createdomains)。 | 

## 创建 Lambda 部署程序包
<a name="integrations-s3-lambda-deployment-package"></a>

部署程序包为 ZIP 或 JAR 文件，其中包含代码及其依赖项。此节包括 Python 示例代码。对于其他编程语言，请参阅 *AWS Lambda 开发人员指南*中的 [Lambda 部署程序包](https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-package.html)。

1. 创建目录。在此示例中，我们使用名称 `s3-to-opensearch`。

1. 在名为 `sample.py` 的目录中创建一个文件：

   ```
   import boto3
   import re
   import requests
   from requests_aws4auth import AWS4Auth
   
   region = '' # e.g. us-west-1
   service = 'es'
   credentials = boto3.Session().get_credentials()
   awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
   
   host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com
   index = 'lambda-s3-index'
   datatype = '_doc'
   url = host + '/' + index + '/' + datatype
   
   headers = { "Content-Type": "application/json" }
   
   s3 = boto3.client('s3')
   
   # Regular expressions used to parse some simple log lines
   ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)')
   time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]')
   message_pattern = re.compile('\"(.+)\"')
   
   # Lambda execution starts here
   def handler(event, context):
       for record in event['Records']:
   
           # Get the bucket name and key for the new file
           bucket = record['s3']['bucket']['name']
           key = record['s3']['object']['key']
   
           # Get, read, and split the file into lines
           obj = s3.get_object(Bucket=bucket, Key=key)
           body = obj['Body'].read()
           lines = body.splitlines()
   
           # Match the regular expressions to each line and index the JSON
           for line in lines:
               line = line.decode("utf-8")
               ip = ip_pattern.search(line).group(1)
               timestamp = time_pattern.search(line).group(1)
               message = message_pattern.search(line).group(1)
   
               document = { "ip": ip, "timestamp": timestamp, "message": message }
               r = requests.post(url, auth=awsauth, json=document, headers=headers)
   ```

   编辑 `region` 和 `host` 的变量。

1. [安装 pip](https://pip.pypa.io/en/stable/installation/)（如果您尚未安装，则将依赖项安装到 `package` 目录：

   ```
   cd s3-to-opensearch
   
   pip install --target ./package requests
   pip install --target ./package requests_aws4auth
   ```

   所有 Lambda 执行环境都已安装 [Boto3](https://aws.amazon.com/sdk-for-python/)，因此无需将其包含在部署程序包中。

1. 打包应用程序代码和依赖项：

   ```
   cd package
   zip -r ../lambda.zip .
   
   cd ..
   zip -g lambda.zip sample.py
   ```

## 创建 Lambda 函数
<a name="integrations-s3-lambda-create"></a>

创建部署程序包之后，可以创建 Lambda 函数。创建函数时，选择名称、运行时 (例如，Python 3.8) 和 IAM 角色。IAM 角色定义对函数的权限。有关详细说明，请参阅 *AWS Lambda 开发人员指南*中的[通过控制台创建 Lambda 函数](https://docs.aws.amazon.com/lambda/latest/dg/get-started-create-function.html)。

此示例假定使用的是控制台。选择 Python 3.9 以及具有 S3 读取权限和 OpenSearch 服务写入权限的角色，如以下屏幕截图所示：

![\[Lambda 函数的示例配置\]](http://docs.aws.amazon.com/zh_cn/opensearch-service/latest/developerguide/images/lambda-function.png)


在创建此函数后，必须添加一个触发器。在此示例中，我们希望代码在日志文件到达 S3 存储桶中时执行：

1. 选择**添加触发器**并选择 **S3**。

1. 选择存储桶。

1. 对于 **Event type (事件类型)**，选择 **PUT**。

1. 对于 **Prefix (前缀)**，键入 `logs/`。

1. 对于**后缀**，键入 `.log`。

1. 确认递归调用警告，然后选择**添加**。

最后，可以上传部署程序包：

1. 选择**上载自**和 **.zip 文件**，然后按照提示上传部署程序包。

1. 上载完成后，编辑 **Runtime 设置**并更改**处理程序**为 `sample.handler`。此设置告知 Lambda 在触发之后应执行的文件 (`sample.py`) 和方法 (`handler`)。

此时，您拥有一整套资源：一个用于存储日志文件的存储桶、一个在向存储桶中添加日志文件时运行的函数、执行解析和索引的代码，以及一个用于搜索和可视化的 OpenSearch 服务域。

## 测试 Lambda 函数
<a name="integrations-s3-lambda-configure"></a>

在创建此函数之后，可以通过将文件上传到 Amazon S3 存储桶来测试此函数。使用以下示例日志行创建一个名为 `sample.log` 的文件：

```
12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg"
12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"
```

将文件上传到 S3 存储桶的 `logs` 文件夹。有关说明，请参阅 *Amazon Simple Storage Service 用户指南*中的[将对象上传到存储桶](https://docs.aws.amazon.com/AmazonS3/latest/userguide/PuttingAnObjectInABucket.html)。

然后使用 OpenSearch 服务控制台或 OpenSearch 仪表板验证`lambda-s3-index`索引是否包含两个文档。还可以发出标准搜索请求：

```
GET https://domain-name/lambda-s3-index/_search?pretty
{
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "lambda-s3-index",
        "_type" : "_doc",
        "_id" : "vTYXaWIBJWV_TTkEuSDg",
        "_score" : 1.0,
        "_source" : {
          "ip" : "12.345.678.91",
          "message" : "GET /some-file.jpg",
          "timestamp" : "10/Oct/2000:14:56:14 -0700"
        }
      },
      {
        "_index" : "lambda-s3-index",
        "_type" : "_doc",
        "_id" : "vjYmaWIBJWV_TTkEuCAB",
        "_score" : 1.0,
        "_source" : {
          "ip" : "12.345.678.90",
          "message" : "PUT /some-file.jpg",
          "timestamp" : "10/Oct/2000:13:55:36 -0700"
        }
      }
    ]
  }
}
```

# 从 Amazon Kinesis Data Streams 加载流数据
<a name="integrations-kinesis"></a>

您可以将流数据从 Kinesis Data Streams 加载 OpenSearch 到服务。到达此数据流的新数据将向 Lambda 触发事件通知，这将运行自定义代码以执行索引编制。此节包括一些简单的 Python 示例代码。

## 先决条件
<a name="integrations-kinesis-lambda-prereq"></a>

继续操作之前，必须具有以下资源。


| 先决条件 | 说明 | 
| --- | --- | 
| Amazon Kinesis Data Stream | Lambda 函数的事件源。要了解更多信息，请参阅 [Kinesis Data Streams](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)。 | 
| OpenSearch 服务域 | Lambda 函数处理数据之后数据的目的地。有关更多信息，请参阅 [创建 OpenSearch 服务域](createupdatedomains.md#createdomains) | 
| IAM 角色 |  此角色必须具有基本的 OpenSearch 服务、Kinesis 和 Lambda 权限，例如：   JSON   

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "es:ESHttpPost",
        "es:ESHttpPut",
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents",
        "kinesis:GetShardIterator",
        "kinesis:GetRecords",
        "kinesis:DescribeStream",
        "kinesis:ListStreams"
      ],
      "Resource": "*"
    }
  ]
}
```     角色必须拥有以下信任关系：   JSON   

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
```     要了解更多信息，请参阅 *IAM 用户手册*中的[创建 IAM 角色](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create.html)。  | 

# 创建 Lambda 函数
<a name="integrations-kinesis-lambda"></a>

按照[创建 Lambda 部署程序包](integrations-s3-lambda.md#integrations-s3-lambda-deployment-package)中的说明操作，但创建一个名为 `kinesis-to-opensearch` 的目录并对 `sample.py` 使用以下代码：

```
import base64
import boto3
import json
import requests
from requests_aws4auth import AWS4Auth

region = '' # e.g. us-west-1
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com
index = 'lambda-kine-index'
datatype = '_doc'
url = host + '/' + index + '/' + datatype + '/'

headers = { "Content-Type": "application/json" }

def handler(event, context):
    count = 0
    for record in event['Records']:
        id = record['eventID']
        timestamp = record['kinesis']['approximateArrivalTimestamp']

        # Kinesis data is base64-encoded, so decode here
        message = base64.b64decode(record['kinesis']['data'])

        # Create the JSON document
        document = { "id": id, "timestamp": timestamp, "message": message }
        # Index the document
        r = requests.put(url + id, auth=awsauth, json=document, headers=headers)
        count += 1
    return 'Processed ' + str(count) + ' items.'
```

编辑 `region` 和 `host` 的变量。

[安装 pip](https://pip.pypa.io/en/stable/installation/)——如果您尚未安装，则使用以下命令安装依赖项：

```
cd kinesis-to-opensearch

pip install --target ./package requests
pip install --target ./package requests_aws4auth
```

然后按照[创建 Lambda 函数](integrations-s3-lambda.md#integrations-s3-lambda-create)中的说明操作，但指定[先决条件](integrations-kinesis.md#integrations-kinesis-lambda-prereq)中的 IAM 角色和以下触发器设置：
+ **Kinesis stream**：您的 Kinesis stream
+ **批处理大小**：100
+ **起始位置**：时间范围

有关更多信息，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[什么是 Amazon Kinesis Data Streams？](https://docs.aws.amazon.com/streams/latest/dev/working-with-kinesis.html)。

此时，您已拥有一整套资源：Kinesis 数据流、在流接收新数据并索引该数据之后运行的函数，以及用于搜索和可视化的 OpenSearch 服务域。

# 测试 Lambda 函数
<a name="integrations-kinesis-testing"></a>

创建此函数后，可以通过使用 AWS CLI将新记录添加到数据流来测试它：

```
aws kinesis put-record --stream-name test --data "My test data." --partition-key partitionKey1 --region us-west-1
```

然后使用 OpenSearch 服务控制台或 OpenSearch 仪表板验证是否`lambda-kine-index`包含文档。还可使用以下请求：

```
GET https://domain-name/lambda-kine-index/_search
{
  "hits" : [
    {
      "_index": "lambda-kine-index",
      "_type": "_doc",
      "_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042",
      "_score": 1,
      "_source": {
        "timestamp": 1523648740.051,
        "message": "My test data.",
        "id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042"
      }
    }
  ]
}
```

# 从 Amazon DynamoDB 表中加载流数据
<a name="integrations-dynamodb"></a>

您可以使用 AWS Lambda 将数据从亚马逊 DynamoDB 发送到您的 OpenSearch 服务域。到达数据库表的新数据将触发事件通知到 Lambda，这将运行自定义代码以执行编制索引。

## 先决条件
<a name="integrations-dynamodb-prereq"></a>

继续操作之前，必须具有以下资源。


| 先决条件 | 说明 | 
| --- | --- | 
| DynamoDB 表 | 此表包含源数据。有关更多信息，请参阅 *Amazon DynamoDB 开发人员指南*中的 [DynamoDB Tables 中的基本操作](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.Basics.html)。该表必须与您的 OpenSearch 服务域位于同一区域，并且必须将直播设置为 “**新图像**”。要了解更多信息，请参阅[启用流](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling)。 | 
| OpenSearch 服务域 | Lambda 函数处理数据之后数据的目的地。有关更多信息，请参阅 [创建 OpenSearch 服务域](createupdatedomains.md#createdomains)。 | 
| IAM 角色 | 此角色必须具有基本的 OpenSearch 服务、DynamoDB 和 Lambda 执行权限，例如：  JSON   

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "es:ESHttpPost",
        "es:ESHttpPut",
        "dynamodb:DescribeStream",
        "dynamodb:GetRecords",
        "dynamodb:GetShardIterator",
        "dynamodb:ListStreams",
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "*"
    }
  ]
}
```    角色必须拥有以下信任关系：  JSON   

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
```    要了解更多信息，请参阅 *IAM 用户手册*中的[创建 IAM 角色](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create.html)。 | 

## 创建 Lambda 函数
<a name="integrations-dynamodb-lambda"></a>

按照[创建 Lambda 部署程序包](integrations-s3-lambda.md#integrations-s3-lambda-deployment-package)中的说明操作，但创建一个名为 `ddb-to-opensearch` 的目录并对 `sample.py` 使用以下代码：

```
import boto3
import requests
from requests_aws4auth import AWS4Auth

region = '' # e.g. us-east-1
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com
index = 'lambda-index'
datatype = '_doc'
url = host + '/' + index + '/' + datatype + '/'

headers = { "Content-Type": "application/json" }

def handler(event, context):
    count = 0
    for record in event['Records']:
        # Get the primary key for use as the OpenSearch ID
        id = record['dynamodb']['Keys']['id']['S']

        if record['eventName'] == 'REMOVE':
            r = requests.delete(url + id, auth=awsauth)
        else:
            document = record['dynamodb']['NewImage']
            r = requests.put(url + id, auth=awsauth, json=document, headers=headers)
        count += 1
    return str(count) + ' records processed.'
```

编辑 `region` 和 `host` 的变量。

[安装 pip](https://pip.pypa.io/en/stable/installation/)——如果您尚未安装，则使用以下命令安装依赖项：

```
cd ddb-to-opensearch

pip install --target ./package requests
pip install --target ./package requests_aws4auth
```

然后按照[创建 Lambda 函数](integrations-s3-lambda.md#integrations-s3-lambda-create)中的说明操作，但指定[先决条件](#integrations-dynamodb-prereq)中的 IAM 角色和以下触发器设置：
+ **表**：DynamoDB 表
+ **批处理大小**：100
+ **起始位置**：时间范围

要了解更多信息，请参阅 *Amazon DynamoDB 开发人员指南*中[使用 DynamoDB Streams 和 Lambda 处理新项目](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.Lambda.Tutorial.html)。

此时，您已拥有一整套资源：用于存放源数据的 DynamoDB 表、表更改的 DynamoDB 流、在源数据更改并索引这些更改后运行的函数，以及用于搜索和可视化的服务域。 OpenSearch 

## 测试 Lambda 函数
<a name="integrations-dynamodb-lambda-test"></a>

创建此函数后，可以通过使用 AWS CLI将新项目添加到 DynamoDB 表来测试它：

```
aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1
```

然后使用 OpenSearch 服务控制台或 OpenSearch 仪表板验证是否`lambda-index`包含文档。还可使用以下请求：

```
GET https://domain-name/lambda-index/_doc/00001
{
    "_index": "lambda-index",
    "_type": "_doc",
    "_id": "00001",
    "_version": 1,
    "found": true,
    "_source": {
        "director": {
            "S": "Kevin Costner"
        },
        "id": {
            "S": "00001"
        },
        "title": {
            "S": "The Postman"
        }
    }
}
```

# 从 Amazon Data Firehose 加载流数据
<a name="integrations-fh"></a>

Firehose 支持将 OpenSearch 服务作为送货目的地。有关如何将流数据加载到 OpenSearch 服务的说明，请参阅《*亚马逊数据 Fireh [ose 开发者指南》中的创建 Kinesis Data Firehos](https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html) e* [传送流 OpenSearch 和为目的地选择服务](https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-elasticsearch)。

在将数据加载到 S OpenSearch ervice 之前，可能需要对数据执行转换。要了解有关使用 Lambda 函数执行此任务的更多信息，请参阅此同一指南中的 [Amazon Kinesis Data Firehose 数据转换](https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html)。

在您配置传输流时，Firehose 具有 “一键式” IAM 角色，可为其提供向 OpenSearch 服务发送数据、在 Amazon S3 上备份数据以及使用 Lambda 转换数据所需的资源访问权限。由于手动创建此类角色的过程非常复杂，我们建议使用提供的角色。

# 正在加载来自亚马逊的流媒体数据 CloudWatch
<a name="integrations-cloudwatch"></a>

您可以使用 CloudWatch CloudWatch 日志订阅将流数据从 Logs 加载到您的 OpenSearch 服务域。有关 Amazon CloudWatch 订阅的信息，请参阅[通过订阅实时处理日志数据](https://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/Subscriptions.html)。有关配置信息，请参阅《[亚马逊* CloudWatch开发者指南》中的将 CloudWatch 日志数据流式传输到亚马逊 OpenSearch *服务](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_OpenSearch_Stream.html)。

## 加载来自的流数据 AWS IoT
<a name="integrations-cloudwatch-iot"></a>

 AWS IoT 您可以使用[规则](https://docs.aws.amazon.com/iot/latest/developerguide/iot-rules.html)发送数据。要了解更多信息，请参阅《*AWS IoT 开发者指南*》中的[OpenSearch](https://docs.aws.amazon.com/iot/latest/developerguide/opensearch-rule-action.html)操作。