

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 搭配 Amazon S3 使用 OpenSearch 擷取管道
<a name="configure-client-s3"></a>

使用 OpenSearch Ingestion，您可以使用 Amazon S3 做為來源或目的地。當您使用 Amazon S3 做為來源時，會將資料傳送至 OpenSearch Ingestion 管道。當您使用 Amazon S3 做為目的地時，您可以將資料從 OpenSearch 擷取管道寫入一或多個 S3 儲存貯體。

**Topics**
+ [Amazon S3 做為來源](#s3-source)
+ [Amazon S3 做為目的地](#s3-destination)
+ [Amazon S3 跨帳戶做為來源](#fdsf)

## Amazon S3 做為來源
<a name="s3-source"></a>

有兩種方式可以使用 Amazon S3 作為來源來處理資料：透過 *S3-SQS 處理*和*排程掃描*。

當您在檔案寫入 S3 後需要近乎即時的檔案掃描時，請使用 S3-SQS 處理。 S3 您可以將 Amazon S3 儲存貯體設定為在儲存貯體中存放或修改物件時引發事件。使用一次性或重複排程掃描來批次處理 S3 儲存貯體中的資料。

**Topics**
+ [先決條件](#s3-prereqs)
+ [步驟 1：設定管道角色](#s3-pipeline-role)
+ [步驟 2：建立管道](#s3-pipeline)

### 先決條件
<a name="s3-prereqs"></a>

若要使用 Amazon S3 作為排程掃描或 S3-SQS 處理之 OpenSearch Ingestion 管道的來源，請先[建立 S3 儲存貯](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html)體。

**注意**  
如果做為 OpenSearch Ingestion 管道來源的 S3 儲存貯體位於不同的 中 AWS 帳戶，您也需要在儲存貯體上啟用跨帳戶讀取許可。這可讓管道讀取和處理資料。若要啟用跨帳戶許可，請參閱《*Amazon S3 使用者指南*》中的[授予跨帳戶儲存貯體許可的儲存貯體擁有者](https://docs.aws.amazon.com/AmazonS3/latest/userguide/example-walkthroughs-managing-access-example2.html)。  
如果您的 S3 儲存貯體位於多個帳戶中，請使用`bucket_owners`映射。如需範例，請參閱 OpenSearch 文件中的[跨帳戶 S3 存取](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/#cross-account-s3-access)。

若要設定 S3-SQS 處理，您也需要執行下列步驟：

1. [建立 Amazon SQS 佇列](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/step-create-queue.html)。

1. 在目的地為 SQS 佇列的 S3 儲存貯體上[啟用事件通知](https://docs.aws.amazon.com/AmazonS3/latest/userguide/enable-event-notifications.html)。

### 步驟 1：設定管道角色
<a name="s3-pipeline-role"></a>

與其他將資料*推送*至管道的來源外掛程式不同，[S3 來源外掛程式](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/)具有讀取型架構，其中管道會從來源*提取*資料。

因此，若要從 S3 讀取管道，您必須在管道的 S3 來源組態中指定可存取 S3 儲存貯體和 Amazon SQS 佇列的角色。管道將擔任此角色，以便從佇列讀取資料。

**注意**  
您在 S3 來源組態中指定的角色必須是[管道角色]()。因此，您的管道角色必須包含兩個單獨的許可政策，一個用於寫入接收器，另一個用於從 S3 來源提取。您必須在所有管道元件`sts_role_arn`中使用相同的 。

下列範例政策顯示使用 S3 做為來源的必要許可：

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action":[
          "s3:ListBucket",
          "s3:GetBucketLocation",
          "s3:GetObject"
       ],
      "Resource": "arn:aws:s3:::amzn-s3-demo-bucket/*"
    },
    {
       "Effect":"Allow",
       "Action":"s3:ListAllMyBuckets",
       "Resource":"arn:aws:s3:::*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": "arn:aws:sqs:us-east-1:111122223333:MyS3EventSqsQueue"
    }
  ]
}
```

------

 您必須將這些許可連接到您在 S3 來源外掛程式組態中的 `sts_role_arn`選項中指定的 IAM 角色：

```
version: "2"
source:
  s3:
    ...
    aws:
      ...
processor:
  ...
sink:
  - opensearch:
      ...
```

### 步驟 2：建立管道
<a name="s3-pipeline"></a>

設定許可後，您可以根據 Amazon S3 使用案例設定 OpenSearch Ingestion 管道。

#### S3-SQS 處理
<a name="s3-sqs-processing"></a>

若要設定 S3-SQS 處理，請設定管道以指定 S3 做為來源，並設定 Amazon SQS 通知：

```
version: "2"
s3-pipeline:
  source:
    s3:
      notification_type: "sqs"
      codec:
        newline: null
      sqs:
        queue_url: "https://sqs.us-east-1amazonaws.com/account-id/ingestion-queue"
      compression: "none"
      aws:
        region: "region"
  processor:
  - grok:
      match:
        message:
        - "%{COMMONAPACHELOG}"
  - date:
      destination: "@timestamp"
      from_time_received: true
  sink:
  - opensearch:
      hosts: ["https://search-domain-endpoint.us-east-1es.amazonaws.com"]
      index: "index-name"
      aws:
        region: "region"
```

如果您在 Amazon S3 上處理小型檔案時觀察到低 CPU 使用率，請考慮透過修改 `workers`選項的值來增加輸送量。如需詳細資訊，請參閱 [S3 外掛程式組態選項](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/#configuration)。

#### 排程掃描
<a name="s3-scheduled-scan"></a>

若要設定排程掃描，請在套用至所有 S3 儲存貯體的掃描層級或儲存貯體層級使用排程來設定管道。儲存貯體層級排程或掃描間隔組態一律會覆寫掃描層級組態。

您可以使用適合資料遷移的*一次性掃描*，或適合批次處理的*週期性掃描*來設定排程掃描。

若要將管道設定為從 Amazon S3 讀取，請使用預先設定的 Amazon S3 藍圖。您可以編輯管道組態`scan`的部分，以符合排程需求。如需詳細資訊，請參閱[使用藍圖](pipeline-blueprint.md)。

**一次性掃描**

一次性排程掃描執行一次。在管道組態中，您可以使用 `start_time`和 `end_time` 來指定何時掃描儲存貯體中的物件。或者，您可以使用 `range` 來指定相對於您希望掃描儲存貯體中物件之目前時間的時間間隔。

例如，設定為`PT4H`掃描過去四小時內建立的所有檔案的範圍。若要設定一次性掃描以執行第二次，您必須停止並重新啟動管道。如果您沒有設定範圍，您還必須更新開始和結束時間。

下列組態會為這些儲存貯體中的所有儲存貯體和所有物件設定一次性掃描：

```
version: "2"
log-pipeline:
  source:
    s3:
      codec:
        csv:
      compression: "none"
      aws:
        region: "region"
      acknowledgments: true
      scan:
        buckets:
          - bucket:
              name: my-bucket
              filter:
                include_prefix:
                  - Objects1/
                exclude_suffix:
                  - .jpeg
                  - .png
          - bucket:
              name: my-bucket-2
              key_prefix:
                include:
                  - Objects2/
                exclude_suffix:
                  - .jpeg
                  - .png
      delete_s3_objects_on_read: false
  processor:
    - date:
        destination: "@timestamp"
        from_time_received: true
  sink:
    - opensearch:
        hosts: ["https://search-domain-endpoint.us-east-1es.amazonaws.com"]
        index: "index-name"
        aws:
          region: "region"
        dlq:
          s3:
            bucket: "dlq-bucket"
            region: "us-east-1"
```

下列組態會為指定時段中的所有儲存貯體設定一次性掃描。這表示 S3 只會處理建立時間落在此時段內的物件。

```
scan:
  start_time: 2023-01-21T18:00:00.000Z
  end_time: 2023-04-21T18:00:00.000Z
  buckets:
    - bucket:
        name: my-bucket-1
        filter:
          include:
            - Objects1/
          exclude_suffix:
            - .jpeg
            - .png
    - bucket:
        name: my-bucket-2
        filter:
          include:
            - Objects2/
          exclude_suffix:
            - .jpeg
            - .png
```

下列組態會在掃描層級和儲存貯體層級設定一次性掃描。儲存貯體層級的開始和結束時間覆寫掃描層級的開始和結束時間。

```
scan:
  start_time: 2023-01-21T18:00:00.000Z
  end_time: 2023-04-21T18:00:00.000Z
  buckets:
    - bucket:
        start_time: 2023-01-21T18:00:00.000Z
        end_time: 2023-04-21T18:00:00.000Z
        name: my-bucket-1
        filter:
          include:
            - Objects1/
          exclude_suffix:
            - .jpeg
            - .png
    - bucket:
        start_time: 2023-01-21T18:00:00.000Z
        end_time: 2023-04-21T18:00:00.000Z
        name: my-bucket-2
        filter:
          include:
            - Objects2/
          exclude_suffix:
            - .jpeg
            - .png
```

停止管道會移除管道在停止之前已掃描哪些物件的任何預先存在參考。如果單一掃描管道停止，它將在啟動之後重新掃描所有物件，即使它們已經掃描。如果您需要停止單一掃描管道，建議您在再次啟動管道之前變更時間範圍。

如果您需要依開始時間和結束時間篩選物件，則停止和啟動管道是唯一的選項。如果您不需要依開始時間和結束時間篩選，您可以依名稱篩選物件。依名稱轉譯不需要您停止和啟動管道。若要這樣做，請使用 `include_prefix`和 `exclude_suffix`。

**重複掃描**

定期排程掃描會以定期排程的間隔執行指定 S3 儲存貯體的掃描。您只能在掃描層級設定這些間隔，因為不支援個別儲存貯體層級組態。

在您的管道組態中， `interval`會指定重複掃描的頻率，而且可以介於 30 秒到 365 天之間。這些掃描的第一個一律會在您建立管道時發生。`count` 定義掃描執行個體的總數。

下列組態會設定重複掃描，掃描之間延遲 12 小時：

```
scan:
  scheduling:
    interval: PT12H
    count: 4
  buckets:
    - bucket:
        name: my-bucket-1
        filter:
          include:
            - Objects1/
          exclude_suffix:
            - .jpeg
            - .png
    - bucket:
        name: my-bucket-2
        filter:
          include:
            - Objects2/
          exclude_suffix:
            - .jpeg
            - .png
```

## Amazon S3 做為目的地
<a name="s3-destination"></a>

若要將 OpenSearch Ingestion 管道的資料寫入 S3 儲存貯體，請使用預先設定的 S3 藍圖來建立具有 [S3 接收器](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sinks/s3/)的管道。此管道會將選擇性資料路由至 OpenSearch 接收器，並同時傳送所有資料以供 S3 中封存。如需詳細資訊，請參閱[使用藍圖](pipeline-blueprint.md)。

建立 S3 接收器時，您可以從各種[接收器轉碼器](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sinks/s3/#codec)指定偏好的格式。例如，如果您想要以單欄式格式寫入資料，請選擇 Parquet 或 Avro 轉碼器。如果您偏好以資料列為基礎的格式，請選擇 JSON 或 NDJSON。若要在指定的結構描述中將資料寫入 S3，您也可以使用 [Avro 格式](https://avro.apache.org/docs/current/specification/#schema-declaration)在接收器轉碼器中定義內嵌結構描述。

下列範例定義 S3 接收器中的內嵌結構描述：

```
- s3:
  codec:
    parquet:
      schema: >
        {
           "type" : "record",
           "namespace" : "org.vpcFlowLog.examples",
           "name" : "VpcFlowLog",
           "fields" : [
             { "name" : "version", "type" : "string"},
             { "name" : "srcport", "type": "int"},
             { "name" : "dstport", "type": "int"},
             { "name" : "start", "type": "int"},
             { "name" : "end", "type": "int"},
             { "name" : "protocol", "type": "int"},
             { "name" : "packets", "type": "int"},
             { "name" : "bytes", "type": "int"},
             { "name" : "action", "type": "string"},
             { "name" : "logStatus", "type" : "string"}
           ]
         }
```

當您定義此結構描述時，請指定可能存在於管道交付至接收器之不同類型事件的所有金鑰超級集。

例如，如果事件可能遺失索引鍵，請在結構描述中將該索引鍵加上 `null`值。Null 值宣告允許結構描述處理不均勻的資料 （其中某些事件具有這些索引鍵，而其他則否）。當傳入事件確實存在這些索引鍵時，其值會寫入目的地。

此結構描述定義可做為篩選條件，僅允許將定義的金鑰傳送至目的地，並從傳入事件捨棄未定義的金鑰。

您也可以在目的地`exclude_keys`中使用 `include_keys`和 來篩選路由到其他目的地的資料。這兩個篩選條件是互斥的，因此您一次只能在結構描述中使用一個。此外，您無法在使用者定義的結構描述中使用它們。

若要使用此類篩選條件建立管道，請使用預先設定的目的地篩選條件藍圖。如需詳細資訊，請參閱[使用藍圖](pipeline-blueprint.md)。

## Amazon S3 跨帳戶做為來源
<a name="fdsf"></a>

您可以使用 Amazon S3 跨帳戶授予存取權，以便 OpenSearch Ingestion 管道可以存取另一個帳戶中的 S3 儲存貯體作為來源。若要啟用跨帳戶存取，請參閱《*Amazon S3 使用者指南*》中的[授予跨帳戶儲存貯體許可的儲存貯體擁有者](https://docs.aws.amazon.com/AmazonS3/latest/userguide/example-walkthroughs-managing-access-example2.html)。授予存取權之後，請確定您的管道角色具有必要的許可。

然後，您可以使用 建立管道`bucket_owners`，以啟用 Amazon S3 儲存貯體的跨帳戶存取做為來源：

```
s3-pipeline:
 source:
  s3:
   notification_type: "sqs"
   codec:
    csv:
     delimiter: ","
     quote_character: "\""
     detect_header: True
   sqs:
    queue_url: "https://sqs.ap-northeast-1.amazonaws.com/401447383613/test-s3-queue"
   bucket_owners:
    my-bucket-01: 123456789012
    my-bucket-02: 999999999999
   compression: "gzip"
```