

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Comprendi le chiavi di partizionamento
<a name="dynamic-partitioning-partitioning-keys"></a>

Con il partizionamento dinamico, crei set di dati mirati dai dati S3 in streaming partizionandoli in base alle chiavi di partizionamento. Le chiavi di partizionamento consentono di filtrare i dati in streaming in base a valori specifici. Ad esempio, se è necessario filtrare i dati in base all'ID cliente e al paese, è possibile specificare il campo dati `customer_id` come una chiave di partizionamento e il campo dati `country` come un'altra chiave di partizionamento. Quindi, specificare le espressioni (utilizzando i formati supportati) per definire i prefissi dei bucket S3 a cui devono essere distribuiti i record di dati partizionati in modo dinamico. 

È possibile creare chiavi di partizionamento con i seguenti metodi.
+ Analisi **in linea: questo metodo utilizza** il meccanismo di supporto integrato di Firehose, un [parser jq](https://stedolan.github.io/jq/), per estrarre le chiavi per il partizionamento dai record di dati in formato JSON. Al `jq 1.6` momento, supportiamo solo la versione.
+ **AWS Funzione Lambda**: questo metodo utilizza una AWS Lambda funzione specificata per estrarre e restituire i campi di dati necessari per il partizionamento.

**Importante**  
Quando abiliti il partizionamento dinamico, devi configurare almeno uno di questi metodi per partizionare i dati. Puoi configurare uno di questi metodi per specificare le chiavi di partizionamento o entrambi contemporaneamente. 

## Crea chiavi di partizionamento con analisi in linea
<a name="dynamic-partitioning-inline-parsing"></a>

Per configurare l'analisi in linea come metodo di partizionamento dinamico per i dati di streaming, è necessario scegliere i parametri del record di dati da utilizzare come chiavi di partizionamento e fornire un valore per ogni chiave di partizionamento specificata.

Il seguente record di dati di esempio mostra come definire le relative chiavi di partizionamento con l'analisi in linea. Nota che i dati devono essere codificati nel formato Base64. Puoi anche fare riferimento all'esempio [CLI](https://docs.aws.amazon.com/cli/latest/reference/firehose/put-record.html#examples).

```
{  
   "type": {  
    "device": "mobile",  
    "event": "user_clicked_submit_button" 
  },  
  "customer_id": "1234567890",  
  "event_timestamp": 1565382027,   #epoch timestamp  
  "region": "sample_region"  
}
```

Ad esempio, puoi scegliere di partizionare i dati in base al parametro `customer_id` o al parametro `event_timestamp`. Ciò significa che desideri che il valore del parametro `customer_id` o del parametro `event_timestamp` in ogni record venga utilizzato per determinare il prefisso S3 a cui deve essere distribuito il record. Puoi anche scegliere un parametro nidificato, ad esempio `device` con un'espressione `.type.device`. La logica di partizionamento dinamico può dipendere da più parametri.

Dopo aver selezionato i parametri dei dati per le chiavi di partizionamento, mappa ogni parametro a un'espressione jq valida. La tabella seguente mostra una tale mappatura dei parametri alle espressioni jq:


| Parametro | espressione jq | 
| --- | --- | 
| customer\$1id | .customer\$1id | 
| device |  .type.device  | 
| year |  .event\$1timestamp\$1 strftime("%Y")  | 
| month |  .event\$1timestamp\$1 strftime("%m")  | 
| day |  .event\$1timestamp\$1 strftime("%d")  | 
| hour |  .event\$1timestamp\$1 strftime("%H")  | 

In fase di esecuzione, Firehose utilizza la colonna destra in alto per valutare i parametri in base ai dati di ogni record.

## Crea chiavi di partizionamento con una funzione AWS Lambda
<a name="dynamic-partitioning-with-lambda"></a>

Per i record di dati compressi o crittografati o i dati in qualsiasi formato di file diverso da JSON, puoi utilizzare la AWS Lambda funzione integrata con il tuo codice personalizzato per decomprimere, decrittografare o trasformare i record al fine di estrarre e restituire i campi di dati necessari per il partizionamento. Si tratta di un'espansione della funzione di trasformazione Lambda esistente oggi disponibile con Firehose. Puoi quindi trasformare, analizzare e restituire i campi di dati da utilizzare quindi per il partizionamento dinamico usando la stessa funzione Lambda.

Di seguito è riportato un esempio di funzione Lambda di elaborazione del flusso Firehose in Python che riproduce ogni record letto dall'input all'output ed estrae le chiavi di partizionamento dai record.

```
from __future__ import print_function
import base64
import json
import datetime
 
# Signature for all Lambda functions that user must implement
def lambda_handler(firehose_records_input, context):
    print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn']
          + ", Region: " + firehose_records_input['region']
          + ", and InvocationId: " + firehose_records_input['invocationId'])
 
    # Create return value.
    firehose_records_output = {'records': []}
 
    # Create result object.
    # Go through records and process them
 
    for firehose_record_input in firehose_records_input['records']:
        # Get user payload
        payload = base64.b64decode(firehose_record_input['data'])
        json_value = json.loads(payload)
 
        print("Record that was received")
        print(json_value)
        print("\n")
        # Create output Firehose record and add modified payload and record ID to it.
        firehose_record_output = {}
        event_timestamp = datetime.datetime.fromtimestamp(json_value['eventTimestamp'])
        partition_keys = {"customerId": json_value['customerId'],
                          "year": event_timestamp.strftime('%Y'),
                          "month": event_timestamp.strftime('%m'),
                          "day": event_timestamp.strftime('%d'),
                          "hour": event_timestamp.strftime('%H'),
                          "minute": event_timestamp.strftime('%M')
                          }
 
        # Create output Firehose record and add modified payload and record ID to it.
        firehose_record_output = {'recordId': firehose_record_input['recordId'],
                                  'data': firehose_record_input['data'],
                                  'result': 'Ok',
                                  'metadata': { 'partitionKeys': partition_keys }}
 
        # Must set proper record ID
        # Add the record to the list of output records.
 
        firehose_records_output['records'].append(firehose_record_output)
 
    # At the end return processed records
    return firehose_records_output
```

Di seguito è riportato un esempio di funzione Lambda di elaborazione del flusso Firehose in Go che riproduce ogni record letto dall'input all'output ed estrae le chiavi di partizionamento dai record.

```
package main

import (
	"fmt"
	"encoding/json"
	"time"
	"strconv"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

type DataFirehoseEventRecordData struct {
	CustomerId string `json:"customerId"`
}

func handleRequest(evnt events.DataFirehoseEvent) (events.DataFirehoseResponse, error) {

	fmt.Printf("InvocationID: %s\n", evnt.InvocationID)
	fmt.Printf("DeliveryStreamArn: %s\n", evnt.DeliveryStreamArn)
	fmt.Printf("Region: %s\n", evnt.Region)

	var response events.DataFirehoseResponse

	for _, record := range evnt.Records {
		fmt.Printf("RecordID: %s\n", record.RecordID)
		fmt.Printf("ApproximateArrivalTimestamp: %s\n", record.ApproximateArrivalTimestamp)

		var transformedRecord events.DataFirehoseResponseRecord
		transformedRecord.RecordID = record.RecordID
		transformedRecord.Result = events.DataFirehoseTransformedStateOk
		transformedRecord.Data = record.Data

		var metaData events.DataFirehoseResponseRecordMetadata
		var recordData DataFirehoseEventRecordData
		partitionKeys := make(map[string]string)

		currentTime := time.Now()
		json.Unmarshal(record.Data, &recordData)
		partitionKeys["customerId"] = recordData.CustomerId
		partitionKeys["year"] = strconv.Itoa(currentTime.Year())
		partitionKeys["month"] = strconv.Itoa(int(currentTime.Month()))
		partitionKeys["date"] = strconv.Itoa(currentTime.Day())
		partitionKeys["hour"] = strconv.Itoa(currentTime.Hour())
		partitionKeys["minute"] = strconv.Itoa(currentTime.Minute())
		metaData.PartitionKeys = partitionKeys
		transformedRecord.Metadata = metaData

		response.Records = append(response.Records, transformedRecord)
	}

	return response, nil
}

func main() {
	lambda.Start(handleRequest)
}
```