

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Verwenden einer OpenSearch Ingestion-Pipeline mit Amazon S3
<a name="configure-client-s3"></a>

Mit OpenSearch Ingestion können Sie Amazon S3 als Quelle oder als Ziel verwenden. Wenn Sie Amazon S3 als Quelle verwenden, senden Sie Daten an eine OpenSearch Ingestion-Pipeline. Wenn Sie Amazon S3 als Ziel verwenden, schreiben Sie Daten aus einer OpenSearch Ingestion-Pipeline in einen oder mehrere S3-Buckets.

**Topics**
+ [Amazon S3 als Quelle](#s3-source)
+ [Amazon S3 als Ziel](#s3-destination)
+ [Amazon S3 Cross-Konto als Quelle](#fdsf)

## Amazon S3 als Quelle
<a name="s3-source"></a>

*Es gibt zwei Möglichkeiten, Amazon S3 als Quelle für die Datenverarbeitung zu verwenden — mit der *S3-SQS-Verarbeitung und mit geplanten Scans*.* 

Verwenden Sie die S3-SQS-Verarbeitung, wenn Sie Dateien fast in Echtzeit scannen möchten, nachdem sie in S3 geschrieben wurden. Sie können Amazon S3 S3-Buckets so konfigurieren, dass sie jedes Mal ein Ereignis auslösen, wenn ein Objekt im Bucket gespeichert oder geändert wird. Verwenden Sie einen einmaligen oder wiederkehrenden geplanten Scan, um Daten in einem S3-Bucket stapelweise zu verarbeiten. 

**Topics**
+ [Voraussetzungen](#s3-prereqs)
+ [Schritt 1: Konfigurieren Sie die Pipeline-Rolle](#s3-pipeline-role)
+ [Schritt 2: Erstellen Sie die Pipeline](#s3-pipeline)

### Voraussetzungen
<a name="s3-prereqs"></a>

[Um Amazon S3 als Quelle für eine OpenSearch Ingestion-Pipeline sowohl für einen geplanten Scan als auch für eine S3-SQS-Verarbeitung zu verwenden, erstellen Sie zunächst einen S3-Bucket.](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html)

**Anmerkung**  
Wenn sich der S3-Bucket, der als Quelle in der OpenSearch Ingestion-Pipeline verwendet wird, in einem anderen befindet AWS-Konto, müssen Sie auch kontoübergreifende Leseberechtigungen für den Bucket aktivieren. Dadurch kann die Pipeline die Daten lesen und verarbeiten. Informationen zum Aktivieren kontoübergreifender Berechtigungen finden Sie unter [Bucket-Besitzer, der kontoübergreifende Bucket-Berechtigungen erteilt](https://docs.aws.amazon.com/AmazonS3/latest/userguide/example-walkthroughs-managing-access-example2.html) im *Amazon S3 S3-Benutzerhandbuch*.  
Wenn sich Ihre S3-Buckets in mehreren Konten befinden, verwenden Sie eine Map. `bucket_owners` Ein Beispiel finden Sie in der Dokumentation unter [Kontoübergreifender S3-Zugriff](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/#cross-account-s3-access). OpenSearch

Um die S3-SQS-Verarbeitung einzurichten, müssen Sie außerdem die folgenden Schritte ausführen:

1. [Erstellen Sie eine Amazon SQS SQS-Warteschlange](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/step-create-queue.html).

1. [Aktivieren Sie Ereignisbenachrichtigungen](https://docs.aws.amazon.com/AmazonS3/latest/userguide/enable-event-notifications.html) im S3-Bucket mit der SQS-Warteschlange als Ziel.

### Schritt 1: Konfigurieren Sie die Pipeline-Rolle
<a name="s3-pipeline-role"></a>

Im Gegensatz zu anderen Quell-Plugins, *die* Daten in eine Pipeline übertragen, verfügt das [S3-Quell-Plug-In](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/) über eine lesebasierte Architektur, bei der die Pipeline Daten *aus* der Quelle bezieht. 

Damit eine Pipeline aus S3 lesen kann, müssen Sie daher eine Rolle in der S3-Quellkonfiguration der Pipeline angeben, die Zugriff sowohl auf den S3-Bucket als auch auf die Amazon SQS SQS-Warteschlange hat. Die Pipeline übernimmt diese Rolle, um Daten aus der Warteschlange zu lesen.

**Anmerkung**  
Die Rolle, die Sie in der S3-Quellkonfiguration angeben, muss die [Pipeline-Rolle]() sein. Daher muss Ihre Pipeline-Rolle zwei separate Berechtigungsrichtlinien enthalten — eine zum Schreiben in eine Senke und eine zum Abrufen aus der S3-Quelle. Sie müssen dasselbe `sts_role_arn` in allen Pipeline-Komponenten verwenden.

Die folgende Beispielrichtlinie zeigt die erforderlichen Berechtigungen für die Verwendung von S3 als Quelle:

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action":[
          "s3:ListBucket",
          "s3:GetBucketLocation",
          "s3:GetObject"
       ],
      "Resource": "arn:aws:s3:::{{amzn-s3-demo-bucket}}/*"
    },
    {
       "Effect":"Allow",
       "Action":"s3:ListAllMyBuckets",
       "Resource":"arn:aws:s3:::*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": "arn:aws:sqs:{{us-east-1}}:{{111122223333}}:{{MyS3EventSqsQueue}}"
    }
  ]
}
```

------

 Sie müssen diese Berechtigungen an die IAM-Rolle anhängen, die Sie in der `sts_role_arn` Option in der Konfiguration des S3-Quell-Plug-ins angeben:

```
version: "2"
source:
  s3:
    ...
    aws:
      ...
processor:
  ...
sink:
  - opensearch:
      ...
```

### Schritt 2: Erstellen Sie die Pipeline
<a name="s3-pipeline"></a>

Nachdem Sie Ihre Berechtigungen eingerichtet haben, können Sie je nach Ihrem Amazon OpenSearch S3-Anwendungsfall eine Ingestion-Pipeline konfigurieren.

#### S3-SQS-Verarbeitung
<a name="s3-sqs-processing"></a>

Um die S3-SQS-Verarbeitung einzurichten, konfigurieren Sie Ihre Pipeline so, dass sie S3 als Quelle angibt, und richten Sie Amazon SQS-Benachrichtigungen ein:

```
version: "2"
s3-pipeline:
  source:
    s3:
      notification_type: "sqs"
      codec:
        newline: null
      sqs:
        queue_url: "https://sqs.{{us-east-1}}amazonaws.com/{{account-id}}/{{ingestion-queue}}"
      compression: "none"
      aws:
        region: "{{region}}"
  processor:
  - grok:
      match:
        message:
        - "%{COMMONAPACHELOG}"
  - date:
      destination: "@timestamp"
      from_time_received: true
  sink:
  - opensearch:
      hosts: ["https://search-{{domain-endpoint}}.{{us-east-1}}es.amazonaws.com"]
      index: "{{index-name}}"
      aws:
        region: "{{region}}"
```

Wenn Sie bei der Verarbeitung kleiner Dateien auf Amazon S3 eine geringe CPU-Auslastung feststellen, sollten Sie erwägen, den Durchsatz zu erhöhen, indem Sie den Wert der `workers` Option ändern. Weitere Informationen finden Sie in den [Konfigurationsoptionen des S3-Plug-ins](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/#configuration).

#### Geplanter Scan
<a name="s3-scheduled-scan"></a>

Um einen geplanten Scan einzurichten, konfigurieren Sie Ihre Pipeline mit einem Zeitplan auf Scanebene, der für alle Ihre S3-Buckets gilt, oder auf Bucket-Ebene. Ein Zeitplan auf Bucket-Ebene oder eine Konfiguration mit Scan-Intervallen überschreibt immer eine Konfiguration auf Scan-Ebene. 

Sie können geplante Scans entweder mit einem *einmaligen Scan* konfigurieren, der sich ideal für die Datenmigration eignet, oder mit einem *wiederkehrenden Scan*, der sich ideal für die Stapelverarbeitung eignet. 

Verwenden Sie die vorkonfigurierten Amazon S3-Blueprints, um Ihre Pipeline für das Lesen aus Amazon S3 zu konfigurieren. Sie können den `scan` Teil Ihrer Pipeline-Konfiguration bearbeiten, um Ihre Planungsanforderungen zu erfüllen. Weitere Informationen finden Sie unter [Mit Blueprints arbeiten](pipeline-blueprint.md).

**Einmaliger Scan**

Ein einmaliger geplanter Scan wird einmal ausgeführt. In Ihrer Pipeline-Konfiguration können Sie mit einem `start_time` und angeben`end_time`, wann die Objekte im Bucket gescannt werden sollen. Alternativ können `range` Sie das Zeitintervall im Verhältnis zur aktuellen Uhrzeit angeben, in dem die Objekte im Bucket gescannt werden sollen. 

Zum Beispiel ein Bereich, der so eingestellt ist, dass alle Dateien `PT4H` gescannt werden, die in den letzten vier Stunden erstellt wurden. Um einen einmaligen Scan so zu konfigurieren, dass er ein zweites Mal ausgeführt wird, müssen Sie die Pipeline beenden und neu starten. Wenn Sie keinen Bereich konfiguriert haben, müssen Sie auch die Start- und Endzeiten aktualisieren.

Die folgende Konfiguration richtet einen einmaligen Scan für alle Buckets und alle Objekte in diesen Buckets ein:

```
version: "2"
log-pipeline:
  source:
    s3:
      codec:
        csv:
      compression: "none"
      aws:
        region: "{{region}}"
      acknowledgments: true
      scan:
        buckets:
          - bucket:
              name: {{my-bucket}}
              filter:
                include_prefix:
                  - {{Objects1}}/
                exclude_suffix:
                  - .jpeg
                  - .png
          - bucket:
              name: {{my-bucket-2}}
              key_prefix:
                include:
                  - {{Objects2}}/
                exclude_suffix:
                  - .jpeg
                  - .png
      delete_s3_objects_on_read: false
  processor:
    - date:
        destination: "@timestamp"
        from_time_received: true
  sink:
    - opensearch:
        hosts: ["https://{{search-domain-endpoint}}.{{us-east-1}}es.amazonaws.com"]
        index: "{{index-name}}"
        aws:
          region: "{{region}}"
        dlq:
          s3:
            bucket: "{{dlq-bucket}}"
            region: "{{us-east-1}}"
```

Die folgende Konfiguration richtet einen einmaligen Scan für alle Buckets während eines bestimmten Zeitfensters ein. Das bedeutet, dass S3 nur die Objekte verarbeitet, deren Erstellungszeiten in dieses Fenster fallen.

```
scan:
  start_time: 2023-01-21T18:00:00.000Z
  end_time: 2023-04-21T18:00:00.000Z
  buckets:
    - bucket:
        name: {{my-bucket-1}}
        filter:
          include:
            - {{Objects1}}/
          exclude_suffix:
            - .jpeg
            - .png
    - bucket:
        name: {{my-bucket-2}}
        filter:
          include:
            - {{Objects2}}/
          exclude_suffix:
            - .jpeg
            - .png
```

Die folgende Konfiguration richtet einen einmaligen Scan sowohl auf Scan- als auch auf Bucket-Ebene ein. Start- und Endzeiten auf Bucket-Ebene haben Vorrang vor Start- und Endzeiten auf Scan-Ebene. 

```
scan:
  start_time: 2023-01-21T18:00:00.000Z
  end_time: 2023-04-21T18:00:00.000Z
  buckets:
    - bucket:
        start_time: 2023-01-21T18:00:00.000Z
        end_time: 2023-04-21T18:00:00.000Z
        name: {{my-bucket-1}}
        filter:
          include:
            - {{Objects1}}/
          exclude_suffix:
            - .jpeg
            - .png
    - bucket:
        start_time: 2023-01-21T18:00:00.000Z
        end_time: 2023-04-21T18:00:00.000Z
        name: {{my-bucket-2}}
        filter:
          include:
            - {{Objects2}}/
          exclude_suffix:
            - .jpeg
            - .png
```

Beim Stoppen einer Pipeline werden alle bereits vorhandenen Verweise darauf entfernt, welche Objekte vor dem Stopp von der Pipeline gescannt wurden. Wenn eine einzelne Scan-Pipeline gestoppt wird, werden alle Objekte nach dem Start erneut gescannt, auch wenn sie bereits gescannt wurden. Wenn Sie eine einzelne Scan-Pipeline beenden müssen, empfiehlt es sich, Ihr Zeitfenster zu ändern, bevor Sie die Pipeline erneut starten.

Wenn Sie Objekte nach Start- und Endzeit filtern müssen, ist das Stoppen und Starten der Pipeline die einzige Option. Wenn Sie nicht nach Start- und Endzeit filtern müssen, können Sie Objekte nach Namen filtern. Um nach Namen zu filtern, müssen Sie Ihre Pipeline nicht beenden und starten. Verwenden `include_prefix` Sie dazu und. `exclude_suffix`

**Wiederkehrender Scan**

Bei einem wiederkehrenden geplanten Scan werden Ihre angegebenen S3-Buckets in regelmäßigen, geplanten Intervallen gescannt. Sie können diese Intervalle nur auf Scanebene konfigurieren, da einzelne Konfigurationen auf Bucket-Ebene nicht unterstützt werden. 

In Ihrer Pipeline-Konfiguration `interval` gibt der die Häufigkeit des wiederkehrenden Scans an und kann zwischen 30 Sekunden und 365 Tagen liegen. Der erste dieser Scans erfolgt immer, wenn Sie die Pipeline erstellen. Das `count` definiert die Gesamtzahl der Scan-Instanzen.

Die folgende Konfiguration richtet einen wiederkehrenden Scan mit einer Verzögerung von 12 Stunden zwischen den Scans ein:

```
scan:
  scheduling:
    interval: PT12H
    count: 4
  buckets:
    - bucket:
        name: {{my-bucket-1}}
        filter:
          include:
            - {{Objects1}}/
          exclude_suffix:
            - .jpeg
            - .png
    - bucket:
        name: {{my-bucket-2}}
        filter:
          include:
            - {{Objects2}}/
          exclude_suffix:
            - .jpeg
            - .png
```

## Amazon S3 als Ziel
<a name="s3-destination"></a>

[Um Daten aus einer OpenSearch Ingestion-Pipeline in einen S3-Bucket zu schreiben, verwenden Sie den vorkonfigurierten S3-Blueprint, um eine Pipeline mit einer S3-Senke zu erstellen.](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sinks/s3/) Diese Pipeline leitet selektive Daten an eine OpenSearch Senke weiter und sendet gleichzeitig alle Daten zur Archivierung in S3. Weitere Informationen finden Sie unter [Mit Blueprints arbeiten](pipeline-blueprint.md).

Wenn Sie Ihre S3-Senke erstellen, können Sie Ihre bevorzugte Formatierung anhand einer Vielzahl von [Senken-Codecs](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sinks/s3/#codec) angeben. Wenn Sie beispielsweise Daten im Spaltenformat schreiben möchten, wählen Sie den Parquet- oder Avro-Codec. Wenn Sie ein zeilenbasiertes Format bevorzugen, wählen Sie JSON oder NDJSON. [Um Daten in einem bestimmten Schema nach S3 zu schreiben, können Sie mithilfe des Avro-Formats auch ein Inline-Schema innerhalb von Sink-Codecs definieren.](https://avro.apache.org/docs/current/specification/#schema-declaration) 

Das folgende Beispiel definiert ein Inline-Schema in einer S3-Senke:

```
- s3:
  codec:
    parquet:
      schema: >
        {
           "type" : "record",
           "namespace" : "org.vpcFlowLog.examples",
           "name" : "VpcFlowLog",
           "fields" : [
             { "name" : "version", "type" : "string"},
             { "name" : "srcport", "type": "int"},
             { "name" : "dstport", "type": "int"},
             { "name" : "start", "type": "int"},
             { "name" : "end", "type": "int"},
             { "name" : "protocol", "type": "int"},
             { "name" : "packets", "type": "int"},
             { "name" : "bytes", "type": "int"},
             { "name" : "action", "type": "string"},
             { "name" : "logStatus", "type" : "string"}
           ]
         }
```

Wenn Sie dieses Schema definieren, geben Sie eine Obermenge aller Schlüssel an, die in den verschiedenen Ereignistypen vorhanden sein könnten, die Ihre Pipeline an eine Senke übermittelt. 

Wenn bei einem Ereignis beispielsweise die Möglichkeit besteht, dass ein Schlüssel fehlt, fügen Sie diesen Schlüssel mit einem `null` Wert in Ihr Schema ein. Nullwertdeklarationen ermöglichen es dem Schema, uneinheitliche Daten zu verarbeiten (wobei einige Ereignisse diese Schlüssel haben und andere nicht). Wenn bei eingehenden Ereignissen diese Schlüssel vorhanden sind, werden ihre Werte in Senken geschrieben. 

Diese Schemadefinition fungiert als Filter, der nur das Senden definierter Schlüssel an Senken ermöglicht und undefinierte Schlüssel aus eingehenden Ereignissen löscht. 

Sie können auch `include_keys` und `exclude_keys` in Ihrer Senke verwenden, um Daten zu filtern, die an andere Senken weitergeleitet werden. Diese beiden Filter schließen sich gegenseitig aus, sodass Sie in Ihrem Schema jeweils nur einen Filter verwenden können. Darüber hinaus können Sie sie nicht in benutzerdefinierten Schemas verwenden. 

Verwenden Sie den vorkonfigurierten Senkenfilter-Blueprint, um Pipelines mit solchen Filtern zu erstellen. Weitere Informationen finden Sie unter [Mit Blueprints arbeiten](pipeline-blueprint.md).

## Amazon S3 Cross-Konto als Quelle
<a name="fdsf"></a>

Sie können bei Amazon S3 kontenübergreifenden Zugriff gewähren, sodass OpenSearch Ingestion-Pipelines auf S3-Buckets in einem anderen Konto als Quelle zugreifen können. Informationen zum Aktivieren des kontoübergreifenden Zugriffs finden Sie unter [Bucket-Besitzer, der kontoübergreifende Bucket-Berechtigungen erteilt](https://docs.aws.amazon.com/AmazonS3/latest/userguide/example-walkthroughs-managing-access-example2.html) im *Amazon S3 S3-Benutzerhandbuch*. Nachdem Sie den Zugriff gewährt haben, stellen Sie sicher, dass Ihre Pipeline-Rolle über die erforderlichen Berechtigungen verfügt.

Anschließend können Sie eine Pipeline erstellen, `bucket_owners` um den kontoübergreifenden Zugriff auf einen Amazon S3 S3-Bucket als Quelle zu ermöglichen:

```
s3-pipeline:
 source:
  s3:
   notification_type: "sqs"
   codec:
    csv:
     delimiter: ","
     quote_character: "\""
     detect_header: True
   sqs:
    queue_url: "https://sqs.ap-northeast-1.amazonaws.com/401447383613/test-s3-queue"
   bucket_owners:
    my-bucket-01: 123456789012
    my-bucket-02: 999999999999
   compression: "gzip"
```