

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 在 Amazon S3 中使用 OpenSearch 摄取管道
<a name="configure-client-s3"></a>

借助 OpenSearch Ingestion，您可以将 Amazon S3 用作源或目标。当您使用 Amazon S3 作为数据源时，会将数据发送到 OpenSearch 摄取管道。当您使用 Amazon S3 作为目标时，会将数据从 OpenSearch 摄取管道写入到一个或多个 S3 存储桶。

**Topics**
+ [亚马逊 S3 作为来源](#s3-source)
+ [Amazon S3 作为目标](#s3-destination)
+ [将 Amazon S3 跨账户作为源](#fdsf)

## 亚马逊 S3 作为来源
<a name="s3-source"></a>

您可以通过两种方式使用 Amazon S3 作为源处理数据：*S3-SQS 处理*和*计划扫描*。

如果您需要在文件写入 S3 后近实时扫描文件，请使用 S3-SQS 处理。您可以配置 Amazon S3 存储桶，在存储桶中存储或修改对象时随时触发事件。使用一次性扫描或定期计划扫描批处理 S3 存储桶中的数据。

**Topics**
+ [先决条件](#s3-prereqs)
+ [步骤 1：配置管道角色](#s3-pipeline-role)
+ [步骤 2：创建管道](#s3-pipeline)

### 先决条件
<a name="s3-prereqs"></a>

要使用 Amazon S3 作为预定扫描或 S3-SQS 处理的 OpenSearch 摄取管道的来源，[请先创建一个 S3 存储桶](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html)。

**注意**  
如果 OpenSearch Ingestion 管道中用作源的 S3 存储桶位于不同的存储桶中 AWS 账户，则还需要对该存储桶启用跨账户读取权限。这样管道将可读取和处理数据。要启用跨账户权限，请参阅 *Amazon S3 用户指南*中的[存储桶拥有者授予跨账户存储桶权限](https://docs.aws.amazon.com/AmazonS3/latest/userguide/example-walkthroughs-managing-access-example2.html)。  
如果您的 S3 存储桶位于多个账户中，请使用 `bucket_owners` 映射。有关示例，请参阅 OpenSearch文档中的[跨账户 S3 访问权限](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/#cross-account-s3-access)。

要设置 S3-SQS 处理，还需要执行以下步骤：

1. [创建 Amazon SQS 队列](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/step-create-queue.html)。

1. 在以 SQS 队列为目标的 S3 存储桶上[启用事件通知](https://docs.aws.amazon.com/AmazonS3/latest/userguide/enable-event-notifications.html)。

### 步骤 1：配置管道角色
<a name="s3-pipeline-role"></a>

与其他将数据*推送*到管道的源插件不同，[S3 源插件](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/)采用基于读取的架构，管道从源中*拉取*数据。

因此，为使管道能够从 S3 读取，必须在管道的 S3 源配置中指定一个可以同时访问 S3 存储桶和 Amazon SQS 队列的角色。管道将担任此角色，以便从队列中读取数据。

**注意**  
在 S3 源配置中指定的角色必须是[管道角色]()。因此，管道角色必须包含两个单独的权限策略，一个用于写入接收器，另一个用于从 S3 源中拉取。您必须在所有管道组件中使用相同的 `sts_role_arn`。

以下示例策略显示了使用 S3 作为源所需的权限：

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action":[
          "s3:ListBucket",
          "s3:GetBucketLocation",
          "s3:GetObject"
       ],
      "Resource": "arn:aws:s3:::amzn-s3-demo-bucket/*"
    },
    {
       "Effect":"Allow",
       "Action":"s3:ListAllMyBuckets",
       "Resource":"arn:aws:s3:::*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": "arn:aws:sqs:us-east-1:111122223333:MyS3EventSqsQueue"
    }
  ]
}
```

------

 必须将以下权限附加到在 S3 源插件配置的 `sts_role_arn` 选项中指定的 IAM 角色：

```
version: "2"
source:
  s3:
    ...
    aws:
      ...
processor:
  ...
sink:
  - opensearch:
      ...
```

### 步骤 2：创建管道
<a name="s3-pipeline"></a>

设置权限后，您可以根据您的 Amazon S3 用例配置 OpenSearch 摄取管道。

#### S3-SQS 处理
<a name="s3-sqs-processing"></a>

要设置 S3-SQS 处理，请配置您的管道，指定 S3 作为源并设置 Amazon SQS 通知：

```
version: "2"
s3-pipeline:
  source:
    s3:
      notification_type: "sqs"
      codec:
        newline: null
      sqs:
        queue_url: "https://sqs.us-east-1amazonaws.com/account-id/ingestion-queue"
      compression: "none"
      aws:
        region: "region"
  processor:
  - grok:
      match:
        message:
        - "%{COMMONAPACHELOG}"
  - date:
      destination: "@timestamp"
      from_time_received: true
  sink:
  - opensearch:
      hosts: ["https://search-domain-endpoint.us-east-1es.amazonaws.com"]
      index: "index-name"
      aws:
        region: "region"
```

如果您在处理 Amazon S3 上的小文件时发现 CPU 利用率较低，可考虑通过修改 `workers` 选项的值来提高吞吐量。有关更多信息，请参阅 [S3 plugin configuration options](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/#configuration)。

#### 计划扫描
<a name="s3-scheduled-scan"></a>

要设置计划扫描，请使用适用于所有 S3 存储桶的扫描级别或存储桶级别的计划来配置管道。存储桶级别计划或扫描间隔配置始终覆盖扫描级别配置。

您可以使用*一次性扫描*（非常适合数据迁移）或*定期扫描*（非常适合批处理）配置计划扫描。

要将管道配置为从 Amazon S3 读取，请使用预先配置的 Amazon S3 蓝图。您可以编辑管道配置的 `scan` 部分以满足计划需求。有关更多信息，请参阅 [使用蓝图](pipeline-blueprint.md)。

**一次性扫描**

一次性计划扫描运行一次。在管道配置中，您可以使用 `start_time` 和 `end_time` 指定希望何时扫描存储桶中的对象。或者，您也可以使用 `range` 指定相对于当前时间的时间间隔，以该时间间隔扫描存储桶中的对象。

例如，范围设置为 `PT4H` 将扫描最近四个小时内创建的所有文件。要配置再次运行一次性扫描，必须先停止管道，然后再重新启动。如果未配置范围，则还必须更新开始时间和结束时间。

以下配置为所有存储桶及这些存储桶中的所有对象设置一次性扫描：

```
version: "2"
log-pipeline:
  source:
    s3:
      codec:
        csv:
      compression: "none"
      aws:
        region: "region"
      acknowledgments: true
      scan:
        buckets:
          - bucket:
              name: my-bucket
              filter:
                include_prefix:
                  - Objects1/
                exclude_suffix:
                  - .jpeg
                  - .png
          - bucket:
              name: my-bucket-2
              key_prefix:
                include:
                  - Objects2/
                exclude_suffix:
                  - .jpeg
                  - .png
      delete_s3_objects_on_read: false
  processor:
    - date:
        destination: "@timestamp"
        from_time_received: true
  sink:
    - opensearch:
        hosts: ["https://search-domain-endpoint.us-east-1es.amazonaws.com"]
        index: "index-name"
        aws:
          region: "region"
        dlq:
          s3:
            bucket: "dlq-bucket"
            region: "us-east-1"
```

以下配置设置在指定时段内对所有存储桶运行一次性扫描。这意味着 S3 仅处理创建时间在此时段内的对象。

```
scan:
  start_time: 2023-01-21T18:00:00.000Z
  end_time: 2023-04-21T18:00:00.000Z
  buckets:
    - bucket:
        name: my-bucket-1
        filter:
          include:
            - Objects1/
          exclude_suffix:
            - .jpeg
            - .png
    - bucket:
        name: my-bucket-2
        filter:
          include:
            - Objects2/
          exclude_suffix:
            - .jpeg
            - .png
```

以下配置设置扫描级别和存储桶级别一次性扫描。存储桶级别开始时间和结束时间将覆盖扫描级别开始时间和结束时间。

```
scan:
  start_time: 2023-01-21T18:00:00.000Z
  end_time: 2023-04-21T18:00:00.000Z
  buckets:
    - bucket:
        start_time: 2023-01-21T18:00:00.000Z
        end_time: 2023-04-21T18:00:00.000Z
        name: my-bucket-1
        filter:
          include:
            - Objects1/
          exclude_suffix:
            - .jpeg
            - .png
    - bucket:
        start_time: 2023-01-21T18:00:00.000Z
        end_time: 2023-04-21T18:00:00.000Z
        name: my-bucket-2
        filter:
          include:
            - Objects2/
          exclude_suffix:
            - .jpeg
            - .png
```

停止管道会移除对管道在停止之前所扫描对象的任何先前引用。如果停止单个扫描管道，则将在启动后重新扫描所有对象，即使这些已被扫描。如果您需要停止单个扫描管道，建议在重新启动管道之前更改时间段。

如果您需要按开始时间和结束时间筛选对象，则停止和启动管道是唯一的选择。如果您不需要按开始时间和结束时间进行筛选，则可以按名称筛选对象。按名称筛选对象不需要停止和启动管道。要执行此操作，请使用 `include_prefix` 和 `exclude_suffix`。

**定期扫描**

定期计划扫描按定期计划时间间隔对您指定的 S3 存储桶运行扫描。只能在扫描级别配置间隔，因为不支持单独执行存储桶级别配置。

在管道配置中，`interval` 将指定定期扫描频率，范围介于 30 秒到 365 天之间。始终在创建管道时运行首次扫描。`count` 定义扫描实例总数。

以下配置设置定期扫描，两次扫描之间的延迟为 12 小时：

```
scan:
  scheduling:
    interval: PT12H
    count: 4
  buckets:
    - bucket:
        name: my-bucket-1
        filter:
          include:
            - Objects1/
          exclude_suffix:
            - .jpeg
            - .png
    - bucket:
        name: my-bucket-2
        filter:
          include:
            - Objects2/
          exclude_suffix:
            - .jpeg
            - .png
```

## Amazon S3 作为目标
<a name="s3-destination"></a>

[要将数据从 OpenSearch 摄取管道写入 S3 存储桶，请使用预配置的 S3 蓝图创建带有 S3 接收器的管道。](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sinks/s3/)该管道将选择性数据路由到 OpenSearch 接收器，同时将所有数据发送到 S3 中进行存档。有关更多信息，请参阅 [使用蓝图](pipeline-blueprint.md)。

创建 S3 接收器时，您可以从各种不同的[接收器编解码器](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sinks/s3/#codec)指定首选格式。例如，如果要以列式格式写入数据，请选择 Parquet 或 Avro 编解码器。如果您更喜欢基于行的格式，请选择 JSON 或 NDJSON。要将数据写入指定架构中的 S3，您还可以使用 [Avro](https://avro.apache.org/docs/current/specification/#schema-declaration) 格式在接收器编解码器中定义内联架构。

以下示例在 S3 接收器中定义内联架构：

```
- s3:
  codec:
    parquet:
      schema: >
        {
           "type" : "record",
           "namespace" : "org.vpcFlowLog.examples",
           "name" : "VpcFlowLog",
           "fields" : [
             { "name" : "version", "type" : "string"},
             { "name" : "srcport", "type": "int"},
             { "name" : "dstport", "type": "int"},
             { "name" : "start", "type": "int"},
             { "name" : "end", "type": "int"},
             { "name" : "protocol", "type": "int"},
             { "name" : "packets", "type": "int"},
             { "name" : "bytes", "type": "int"},
             { "name" : "action", "type": "string"},
             { "name" : "logStatus", "type" : "string"}
           ]
         }
```

定义此架构时，请指定管道向接收器发送的不同类型事件中可能存在的所有键的超集。

例如，如果事件可能缺少键，则在架构中添加值为 `null` 的键。Null 值声明允许架构处理非统一数据（一些事件具有这些键，另一些事件则没有）。当传入事件确实存在这些键时，则将键值写入接收器。

此架构定义充当筛选器，仅允许将定义的键发送到接收器，并从传入事件中删除未定义的键。

您也可以在接收器中使用 `include_keys` 和 `exclude_keys` 筛选路由到其他接收器的数据。两个筛选器互斥，因此在架构中一次只能使用一个筛选器。此外，不能在用户定义的架构中使用它们。

要使用上述筛选条件创建管道，请使用预先配置的接收器筛选条件蓝图。有关更多信息，请参阅 [使用蓝图](pipeline-blueprint.md)。

## 将 Amazon S3 跨账户作为源
<a name="fdsf"></a>

您可以通过 Amazon S3 授予跨账户访问权限，这样 OpenSearch Ingestion 管道就可以访问另一个账户中的 S3 存储桶作为来源。要启用跨账户访问，请参阅《Amazon S3 用户指南》中的 [Bucket owner granting cross-account bucket permissions](https://docs.aws.amazon.com/AmazonS3/latest/userguide/example-walkthroughs-managing-access-example2.html)**。授予访问权限后，请确保您的管道角色具有所需的权限。

然后可以使用 `bucket_owners` 创建管道，从而对用作源的 Amazon S3 存储桶启用跨账户访问：

```
s3-pipeline:
 source:
  s3:
   notification_type: "sqs"
   codec:
    csv:
     delimiter: ","
     quote_character: "\""
     detect_header: True
   sqs:
    queue_url: "https://sqs.ap-northeast-1.amazonaws.com/401447383613/test-s3-queue"
   bucket_owners:
    my-bucket-01: 123456789012
    my-bucket-02: 999999999999
   compression: "gzip"
```