

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Noções básicas de chaves de particionamento
<a name="dynamic-partitioning-partitioning-keys"></a>

Com o particionamento dinâmico, você cria conjuntos de dados direcionados a partir dos dados do S3 em streaming particionando os dados com base em chaves de particionamento. As chaves de particionamento permitem que você filtre os dados em streaming com base em valores específicos. Por exemplo, se você precisar filtrar os dados com base no ID do cliente e no país, poderá especificar o campo de dados de `customer_id` como uma chave de particionamento e o campo de dados de `country` como outra chave de particionamento. Em seguida, você especifica as expressões (usando os formatos com suporte) para definir os prefixos de bucket do S3 aos quais os registros de dados particionados dinamicamente devem ser entregues. 

É possível criar chaves de particionamento com os métodos a seguir.
+ **Análise em linha**: esse método usa o mecanismo de suporte integrado do Firehose, um [analisador jq](https://stedolan.github.io/jq/), para extrair as chaves para particionamento dos registros de dados que estão no formato JSON. Atualmente, oferecemos suporte apenas à versão `jq 1.6`.
+ **AWS Função Lambda** — esse método usa uma AWS Lambda função especificada para extrair e retornar os campos de dados necessários para o particionamento.

**Importante**  
Ao habilitar o particionamento dinâmico, você deve configurar pelo menos um desses métodos para particionar seus dados. É possível configurar qualquer um desses métodos para especificar as chaves de particionamento ou ambos ao mesmo tempo. 

## Criação de chaves de particionamento com análise em linha
<a name="dynamic-partitioning-inline-parsing"></a>

Para configurar a análise em linha como o método de particionamento dinâmico para os dados em streaming, você deve escolher os parâmetros de registro de dados a serem usados como chaves de particionamento e fornecer um valor para cada chave de particionamento especificada.

O exemplo de registro de dados a seguir mostra como é possível definir chaves de particionamento para ele com análise em linha. Observe que os dados devem ser codificados no formato Base64. Você também pode consultar o [exemplo da CLI](https://docs.aws.amazon.com/cli/latest/reference/firehose/put-record.html#examples).

```
{  
   "type": {  
    "device": "mobile",  
    "event": "user_clicked_submit_button" 
  },  
  "customer_id": "1234567890",  
  "event_timestamp": 1565382027,   #epoch timestamp  
  "region": "sample_region"  
}
```

Por exemplo, é possível escolher particionar os dados com base no parâmetro `customer_id` ou no parâmetro `event_timestamp`. Isso significa que você deseja que o valor do parâmetro `customer_id` ou do parâmetro `event_timestamp` em cada registro seja usado para determinar o prefixo do S3 ao qual o registro deve ser entregue. Você também pode escolher um parâmetro aninhado, como `device` com uma expressão `.type.device`. A lógica de particionamento dinâmico pode depender de vários parâmetros.

Depois de selecionar os parâmetros dos dados para as chaves de particionamento, você mapeia cada parâmetro para uma expressão jq válida. A tabela a seguir mostra esse mapeamento de parâmetros para expressões jq:


| Parâmetro | Expressão jq | 
| --- | --- | 
| customer\$1id | .customer\$1id | 
| device |  .type.device  | 
| year |  .event\$1timestamp\$1 strftime("%Y")  | 
| month |  .event\$1timestamp\$1 strftime("%m")  | 
| day |  .event\$1timestamp\$1 strftime("%d")  | 
| hour |  .event\$1timestamp\$1 strftime("%H")  | 

No runtime, o Firehose usa a coluna direita acima para avaliar os parâmetros com base nos dados de cada registro.

## Crie chaves de particionamento com uma função AWS Lambda
<a name="dynamic-partitioning-with-lambda"></a>

Para registros de dados compactados ou criptografados, ou dados em qualquer formato de arquivo que não seja JSON, você pode usar a AWS Lambda função integrada com seu próprio código personalizado para descompactar, descriptografar ou transformar os registros a fim de extrair e retornar os campos de dados necessários para o particionamento. Essa é uma expansão da função do Lambda de transformação existente que está disponível atualmente com o Firehose. É possível transformar, analisar e retornar os campos de dados que podem ser usados para particionamento dinâmico usando a mesma função do Lambda.

Veja a seguir um exemplo de função do Lambda de processamento de fluxo do Firehose em Python que reproduz cada registro lido da entrada na saída e extrai as chaves de particionamento dos registros.

```
from __future__ import print_function
import base64
import json
import datetime
 
# Signature for all Lambda functions that user must implement
def lambda_handler(firehose_records_input, context):
    print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn']
          + ", Region: " + firehose_records_input['region']
          + ", and InvocationId: " + firehose_records_input['invocationId'])
 
    # Create return value.
    firehose_records_output = {'records': []}
 
    # Create result object.
    # Go through records and process them
 
    for firehose_record_input in firehose_records_input['records']:
        # Get user payload
        payload = base64.b64decode(firehose_record_input['data'])
        json_value = json.loads(payload)
 
        print("Record that was received")
        print(json_value)
        print("\n")
        # Create output Firehose record and add modified payload and record ID to it.
        firehose_record_output = {}
        event_timestamp = datetime.datetime.fromtimestamp(json_value['eventTimestamp'])
        partition_keys = {"customerId": json_value['customerId'],
                          "year": event_timestamp.strftime('%Y'),
                          "month": event_timestamp.strftime('%m'),
                          "day": event_timestamp.strftime('%d'),
                          "hour": event_timestamp.strftime('%H'),
                          "minute": event_timestamp.strftime('%M')
                          }
 
        # Create output Firehose record and add modified payload and record ID to it.
        firehose_record_output = {'recordId': firehose_record_input['recordId'],
                                  'data': firehose_record_input['data'],
                                  'result': 'Ok',
                                  'metadata': { 'partitionKeys': partition_keys }}
 
        # Must set proper record ID
        # Add the record to the list of output records.
 
        firehose_records_output['records'].append(firehose_record_output)
 
    # At the end return processed records
    return firehose_records_output
```

Veja a seguir um exemplo de função do Lambda de processamento de fluxo do Firehose em Go que reproduz cada registro lido da entrada na saída e extrai as chaves de particionamento dos registros.

```
package main

import (
	"fmt"
	"encoding/json"
	"time"
	"strconv"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

type DataFirehoseEventRecordData struct {
	CustomerId string `json:"customerId"`
}

func handleRequest(evnt events.DataFirehoseEvent) (events.DataFirehoseResponse, error) {

	fmt.Printf("InvocationID: %s\n", evnt.InvocationID)
	fmt.Printf("DeliveryStreamArn: %s\n", evnt.DeliveryStreamArn)
	fmt.Printf("Region: %s\n", evnt.Region)

	var response events.DataFirehoseResponse

	for _, record := range evnt.Records {
		fmt.Printf("RecordID: %s\n", record.RecordID)
		fmt.Printf("ApproximateArrivalTimestamp: %s\n", record.ApproximateArrivalTimestamp)

		var transformedRecord events.DataFirehoseResponseRecord
		transformedRecord.RecordID = record.RecordID
		transformedRecord.Result = events.DataFirehoseTransformedStateOk
		transformedRecord.Data = record.Data

		var metaData events.DataFirehoseResponseRecordMetadata
		var recordData DataFirehoseEventRecordData
		partitionKeys := make(map[string]string)

		currentTime := time.Now()
		json.Unmarshal(record.Data, &recordData)
		partitionKeys["customerId"] = recordData.CustomerId
		partitionKeys["year"] = strconv.Itoa(currentTime.Year())
		partitionKeys["month"] = strconv.Itoa(int(currentTime.Month()))
		partitionKeys["date"] = strconv.Itoa(currentTime.Day())
		partitionKeys["hour"] = strconv.Itoa(currentTime.Hour())
		partitionKeys["minute"] = strconv.Itoa(currentTime.Minute())
		metaData.PartitionKeys = partitionKeys
		transformedRecord.Metadata = metaData

		response.Records = append(response.Records, transformedRecord)
	}

	return response, nil
}

func main() {
	lambda.Start(handleRequest)
}
```