

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Verstehen Sie die Partitionierungsschlüssel
<a name="dynamic-partitioning-partitioning-keys"></a>

Mit dynamischem Partitionierungs-Mechanismus erstellen Sie gezielte Datensätze aus den Streaming-S3-Daten, indem sie auf der Grundlage von Partitionsschlüsseln partitioniert werden. Mit Partitionierungsschlüsseln können Sie Ihre Streaming-Daten anhand bestimmter Werte filtern. Wenn Sie Ihre Daten beispielsweise nach Kunden-ID und Land filtern müssen, können Sie das Datenfeld der `customer_id` als einen Partitionsschlüssel und das Datenfeld des `country` als einen weiteren Partitionsschlüssel angeben. Anschließend geben Sie die Ausdrücke (unter Verwendung der unterstützten Formate) an, um die S3-Bucket-Präfixe zu definieren, an die die dynamisch partitionierten Datensätze geliefert werden sollen. 

Sie können Partitionierungsschlüssel mit den folgenden Methoden erstellen.
+ **Inline-Parsing** — Diese Methode verwendet den integrierten Unterstützungsmechanismus von Firehose, einen [JQ-Parser](https://stedolan.github.io/jq/), zum Extrahieren der Schlüssel für die Partitionierung aus Datensätzen im JSON-Format. Derzeit unterstützen wir nur die Version. `jq 1.6`
+ **AWS Lambda-Funktion** — Diese Methode verwendet eine angegebene AWS Lambda Funktion, um die für die Partitionierung benötigten Datenfelder zu extrahieren und zurückzugeben.

**Wichtig**  
Wenn Sie die dynamische Partitionierung aktivieren, müssen Sie mindestens eine dieser Methoden konfigurieren, um Ihre Daten zu partitionieren. Sie können eine dieser Methoden konfigurieren, um Ihre Partitionierungsschlüssel anzugeben, oder beide gleichzeitig. 

## Erstellen Sie Partitionierungsschlüssel mit Inline-Parsing
<a name="dynamic-partitioning-inline-parsing"></a>

Um Inline-Parsing als dynamische Partitionierungsmethode für Ihre Streaming-Daten zu konfigurieren, müssen Sie Datensatzparameter auswählen, die als Partitionierungsschlüssel verwendet werden sollen, und für jeden angegebenen Partitionierungsschlüssel einen Wert angeben.

Der folgende Beispieldatensatz zeigt, wie Sie mit Inline-Parsing Partitionierungsschlüssel dafür definieren können. Beachten Sie, dass die Daten im Base64-Format codiert sein sollten. Sie können sich auch auf das [CLI-Beispiel](https://docs.aws.amazon.com/cli/latest/reference/firehose/put-record.html#examples) beziehen.

```
{  
   "type": {  
    "device": "mobile",  
    "event": "user_clicked_submit_button" 
  },  
  "customer_id": "1234567890",  
  "event_timestamp": 1565382027,   #epoch timestamp  
  "region": "sample_region"  
}
```

Sie können beispielsweise wählen, ob Sie Ihre Daten auf der Grundlage des `customer_id`-Parameters oder des `event_timestamp`-Parameters partitionieren möchten. Das bedeutet, dass Sie möchten, dass der Wert des `customer_id`-Parameters oder des `event_timestamp`-Parameters in jedem Datensatz verwendet wird, um das S3-Präfix zu bestimmen, an das der Datensatz gesendet werden soll. Sie können auch einen verschachtelten Parameter wählen, z. B. `device` bei einem `.type.device`-Ausdruck. Ihre dynamische Partitionierungslogik kann von mehreren Parametern abhängen.

Nachdem Sie Datenparameter für Ihre Partitionierungsschlüssel ausgewählt haben, ordnen Sie jeden Parameter einem gültigen JQ-Ausdruck zu. Die folgende Tabelle zeigt eine solche Zuordnung von Parametern zu JQ-Ausdrücken:


| Parameter | jq-Ausdruck | 
| --- | --- | 
| customer\$1id | .customer\$1id | 
| device |  .type.device  | 
| year |  .event\$1timestamp\$1 strftime("%Y")  | 
| month |  .event\$1timestamp\$1 strftime("%m")  | 
| day |  .event\$1timestamp\$1 strftime("%d")  | 
| hour |  .event\$1timestamp\$1 strftime("%H")  | 

Zur Laufzeit verwendet Firehose die rechte Spalte oben, um die Parameter auf der Grundlage der Daten in den einzelnen Datensätzen auszuwerten.

## Erstellen Sie Partitionierungsschlüssel mit einer Funktion AWS Lambda
<a name="dynamic-partitioning-with-lambda"></a>

Für komprimierte oder verschlüsselte Datensätze oder Daten, die in einem anderen Dateiformat als JSON vorliegen, können Sie die integrierte AWS Lambda Funktion mit Ihrem eigenen benutzerdefinierten Code verwenden, um die Datensätze zu dekomprimieren, zu entschlüsseln oder zu transformieren, um die für die Partitionierung benötigten Datenfelder zu extrahieren und zurückzugeben. Dies ist eine Erweiterung der bestehenden Transform-Lambda-Funktion, die heute mit Firehose verfügbar ist. Sie können die Datenfelder transformieren, analysieren und zurückgeben, die Sie dann mit derselben Lambda-Funktion für die dynamische Partitionierung verwenden können.

Im Folgenden finden Sie ein Beispiel für eine Lambda-Funktion zur Firehose-Stream-Verarbeitung in Python, die jeden gelesenen Datensatz von der Eingabe bis zur Ausgabe wiedergibt und Partitionierungsschlüssel aus den Datensätzen extrahiert.

```
from __future__ import print_function
import base64
import json
import datetime
 
# Signature for all Lambda functions that user must implement
def lambda_handler(firehose_records_input, context):
    print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn']
          + ", Region: " + firehose_records_input['region']
          + ", and InvocationId: " + firehose_records_input['invocationId'])
 
    # Create return value.
    firehose_records_output = {'records': []}
 
    # Create result object.
    # Go through records and process them
 
    for firehose_record_input in firehose_records_input['records']:
        # Get user payload
        payload = base64.b64decode(firehose_record_input['data'])
        json_value = json.loads(payload)
 
        print("Record that was received")
        print(json_value)
        print("\n")
        # Create output Firehose record and add modified payload and record ID to it.
        firehose_record_output = {}
        event_timestamp = datetime.datetime.fromtimestamp(json_value['eventTimestamp'])
        partition_keys = {"customerId": json_value['customerId'],
                          "year": event_timestamp.strftime('%Y'),
                          "month": event_timestamp.strftime('%m'),
                          "day": event_timestamp.strftime('%d'),
                          "hour": event_timestamp.strftime('%H'),
                          "minute": event_timestamp.strftime('%M')
                          }
 
        # Create output Firehose record and add modified payload and record ID to it.
        firehose_record_output = {'recordId': firehose_record_input['recordId'],
                                  'data': firehose_record_input['data'],
                                  'result': 'Ok',
                                  'metadata': { 'partitionKeys': partition_keys }}
 
        # Must set proper record ID
        # Add the record to the list of output records.
 
        firehose_records_output['records'].append(firehose_record_output)
 
    # At the end return processed records
    return firehose_records_output
```

Im Folgenden finden Sie ein Beispiel für eine Lambda-Funktion zur Firehose-Stream-Verarbeitung in Go, die jeden gelesenen Datensatz von der Eingabe bis zur Ausgabe wiedergibt und Partitionierungsschlüssel aus den Datensätzen extrahiert.

```
package main

import (
	"fmt"
	"encoding/json"
	"time"
	"strconv"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

type DataFirehoseEventRecordData struct {
	CustomerId string `json:"customerId"`
}

func handleRequest(evnt events.DataFirehoseEvent) (events.DataFirehoseResponse, error) {

	fmt.Printf("InvocationID: %s\n", evnt.InvocationID)
	fmt.Printf("DeliveryStreamArn: %s\n", evnt.DeliveryStreamArn)
	fmt.Printf("Region: %s\n", evnt.Region)

	var response events.DataFirehoseResponse

	for _, record := range evnt.Records {
		fmt.Printf("RecordID: %s\n", record.RecordID)
		fmt.Printf("ApproximateArrivalTimestamp: %s\n", record.ApproximateArrivalTimestamp)

		var transformedRecord events.DataFirehoseResponseRecord
		transformedRecord.RecordID = record.RecordID
		transformedRecord.Result = events.DataFirehoseTransformedStateOk
		transformedRecord.Data = record.Data

		var metaData events.DataFirehoseResponseRecordMetadata
		var recordData DataFirehoseEventRecordData
		partitionKeys := make(map[string]string)

		currentTime := time.Now()
		json.Unmarshal(record.Data, &recordData)
		partitionKeys["customerId"] = recordData.CustomerId
		partitionKeys["year"] = strconv.Itoa(currentTime.Year())
		partitionKeys["month"] = strconv.Itoa(int(currentTime.Month()))
		partitionKeys["date"] = strconv.Itoa(currentTime.Day())
		partitionKeys["hour"] = strconv.Itoa(currentTime.Hour())
		partitionKeys["minute"] = strconv.Itoa(currentTime.Minute())
		metaData.PartitionKeys = partitionKeys
		transformedRecord.Metadata = metaData

		response.Records = append(response.Records, transformedRecord)
	}

	return response, nil
}

func main() {
	lambda.Start(handleRequest)
}
```