

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 将 OpenSearch 摄取管道与 AWS Lambda
<a name="configure-client-lambda"></a>

使用[AWS Lambda 处理](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/processors/aws-lambda/)器使用自定义代码丰富来自 OpenSearch Ingestion 支持的任何源或目标的数据。使用 Lambda 处理器，您可以应用自己的数据转换或丰富，然后将处理后的事件返回到您的管道，以供进一步处理。该处理器支持自定义数据处理，让您能够在数据进入处理流程前完全掌控其操作方式。

**注意**  
Lambda 处理器处理单个事件的有效载荷大小限制为 5 MB。此外，Lambda 处理器仅支持 JSON 数组格式的响应。

## 先决条件
<a name="configure-clients-lambda-prereqs"></a>

使用 Lambda 处理器创建管道之前，创建以下资源：
+ 一种 AWS Lambda 用于丰富和转换源数据的函数。有关说明，请参阅[创建首个 Lambda 函数](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html)。
+ 将成为管道接收器的 OpenSearch 服务域或 OpenSearch 无服务器集合。有关更多信息，请参阅[创建 OpenSearch 服务域](createupdatedomains.md#createdomains)和[创建集合](serverless-create.md)。
+ 包含向域或集合接收器写入权限的管道角色。有关更多信息，请参阅 [管道角色](pipeline-security-overview.md#pipeline-security-sink)。

  管道角色还需附加权限策略，以允许其调用管道配置中指定的 Lambda 函数。例如：

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Sid": "allowinvokeFunction",
              "Effect": "Allow",
              "Action": [
                  "lambda:invokeFunction",
                  "lambda:InvokeAsync",
                  "lambda:ListFunctions"
              ],
              "Resource": "arn:aws:lambda:{{us-east-1}}:{{111122223333}}:function:{{function-name}}"
              
          }
      ]
  }
  ```

------

## 创建管道
<a name="configure-clients-security-lake-pipeline-role"></a>

要 AWS Lambda 用作处理器，请配置 OpenSearch Ingestion 管道并指定`aws_lambda`为处理器。您也可以使用 **AWS Lambda 自定义增强**蓝图创建管道。有关更多信息，请参阅 [使用蓝图](pipeline-blueprint.md)。

以下示例管道从 HTTP 源接收数据，使用日期处理器和处理器对其进行丰富，然后将 AWS Lambda 处理后的数据提取到域中。 OpenSearch 

```
version: "2"
lambda-processor-pipeline:
  source:
    http:
      path: "/${pipelineName}/logs"
  processor:
      - date:
        destination: "@timestamp"
        from_time_received: true
    - aws_lambda:
        function_name: "my-lambda-function"

        tags_on_failure: ["lambda_failure"]
        batch:
            key_name: "events"
        aws:
          region: {{region}}
  sink:
    - opensearch:
        hosts: [ "https://search-{{mydomain}}.{{us-east-1}}es.amazonaws.com" ]
        index: "table-index"
        aws:
          region: "{{region}}"
          serverless: false
```

以下示例 AWS Lambda 函数通过向提供的事件数组中的每个元素添加新的键值对 (`"transformed": "true"`) 来转换传入的数据，然后发回修改后的版本。

```
import json

def lambda_handler(event, context):
    input_array = event.get('events', [])
    output = []
    for input in input_array:
        input["transformed"] = "true";
        output.append(input)

    return output
```

## 批处理
<a name="configure-clients-lambda-batching"></a>

管道向 Lambda 处理器发送批处理事件，并动态调整批量大小，以确保其始终低于 5 MB 的限制。

下图是管道批处理的示例：

```
batch:
    key_name: "events"

input_arrary = event.get('events', [])
```

**注意**  
创建管道时，确保 Lambda 处理器配置中的 `key_name` 选项与 Lambda 处理程序中的事件密钥相匹配。

## 条件筛选
<a name="configure-clients-lambda-conditional-filtering"></a>

条件筛选允许您根据事件数据中的特定条件控制 AWS Lambda 处理器何时调用 Lambda 函数。当您希望有选择地处理某些类型的事件，同时忽略其他事件时，这尤其有用。

以下示例配置使用条件筛选：

```
processors:
  - aws_lambda:
      function_name: "my-lambda-function"
      aws:
        region: "region"
      lambda_when: "/sourceIp == 10.10.10.10"
```