

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 了解分区键
<a name="dynamic-partitioning-partitioning-keys"></a>

通过动态分区，您可以根据分区键对数据进行分区，从 S3 流数据创建目标数据集。分区键使您能够根据特定值筛选流数据。例如，如果您需要根据客户 ID 和国家/地区筛选数据，则可以将 `customer_id` 的数据字段指定为一个分区键，将 `country` 的数据字段指定为另一个分区键。然后，指定表达式（使用支持的格式）来定义动态分区数据记录要传输到的 S3 存储桶前缀。

您可以使用以下方法来创建分区键。
+ **内联解析**：此方法使用 Firehose 内置支持机制（[jq 解析器](https://stedolan.github.io/jq/)），从 JSON 格式的数据记录中提取用于分区的键。目前，我们仅支持 `jq 1.6` 版本。
+ **AWS Lambda 函数** — 此方法使用指定的 AWS Lambda 函数提取并返回分区所需的数据字段。

**重要**  
启用动态分区时，必须至少配置其中一种方法来对数据进行分区。您可以配置其中一种方法来指定分区键，也可以同时配置这两种方法。

## 使用内联解析创建分区键
<a name="dynamic-partitioning-inline-parsing"></a>

要将内联解析配置为流数据的动态分区方法，必须选择要用作分区键的数据记录参数，并为每个指定的分区键提供一个值。

以下示例数据记录显示如何通过内联解析为其定义分区键。请注意，应以 Base64 格式对数据进行编码。您也可以参考 [CLI 示例](https://docs.aws.amazon.com/cli/latest/reference/firehose/put-record.html#examples)。

```
{  
   "type": {  
    "device": "mobile",  
    "event": "user_clicked_submit_button" 
  },  
  "customer_id": "1234567890",  
  "event_timestamp": 1565382027,   #epoch timestamp  
  "region": "sample_region"  
}
```

例如，您可以选择根据 `customer_id` 参数或 `event_timestamp` 参数对数据进行分区。这意味着您希望使用每条记录中的 `customer_id` 参数或 `event_timestamp` 参数的值来确定要向其传输传递记录的 S3 前缀。您也可以选择嵌套参数，例如表达式为 `.type.device` 的 `device`。您的动态分区逻辑可能取决于多个参数。

为分区键选择数据参数后，将每个参数映射到有效的 jq 表达式。下表显示了参数到 jq 表达式的映射：


| 参数 | jq 表达式 | 
| --- | --- | 
| customer\$1id | .customer\$1id | 
| device |  .type.device  | 
| year |  .event\$1timestamp\$1 strftime("%Y")  | 
| month |  .event\$1timestamp\$1 strftime("%m")  | 
| day |  .event\$1timestamp\$1 strftime("%d")  | 
| hour |  .event\$1timestamp\$1 strftime("%H")  | 

在运行时系统，Firehose 使用上面的右列根据每条记录中的数据来评估参数。

## 使用函数创建分区密 AWS Lambda 钥
<a name="dynamic-partitioning-with-lambda"></a>

对于压缩或加密的数据记录，或除 JSON 以外的任何文件格式的数据，您可以使用集成 AWS Lambda 函数和自己的自定义代码来解压、解密或转换记录，以便提取和返回分区所需的数据字段。这是对现有转换 Lambda 函数的扩展，该函数现已随 Firehose 一起提供。您可以转换、解析和返回数据字段，然后使用相同的 Lambda 函数进行动态分区。

以下是一个用 Python 编写的 Firehose 流处理 Lambda 函数示例，该函数从输入到输出重放每一条读取记录，并从记录中提取分区键。

```
from __future__ import print_function
import base64
import json
import datetime
 
# Signature for all Lambda functions that user must implement
def lambda_handler(firehose_records_input, context):
    print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn']
          + ", Region: " + firehose_records_input['region']
          + ", and InvocationId: " + firehose_records_input['invocationId'])
 
    # Create return value.
    firehose_records_output = {'records': []}
 
    # Create result object.
    # Go through records and process them
 
    for firehose_record_input in firehose_records_input['records']:
        # Get user payload
        payload = base64.b64decode(firehose_record_input['data'])
        json_value = json.loads(payload)
 
        print("Record that was received")
        print(json_value)
        print("\n")
        # Create output Firehose record and add modified payload and record ID to it.
        firehose_record_output = {}
        event_timestamp = datetime.datetime.fromtimestamp(json_value['eventTimestamp'])
        partition_keys = {"customerId": json_value['customerId'],
                          "year": event_timestamp.strftime('%Y'),
                          "month": event_timestamp.strftime('%m'),
                          "day": event_timestamp.strftime('%d'),
                          "hour": event_timestamp.strftime('%H'),
                          "minute": event_timestamp.strftime('%M')
                          }
 
        # Create output Firehose record and add modified payload and record ID to it.
        firehose_record_output = {'recordId': firehose_record_input['recordId'],
                                  'data': firehose_record_input['data'],
                                  'result': 'Ok',
                                  'metadata': { 'partitionKeys': partition_keys }}
 
        # Must set proper record ID
        # Add the record to the list of output records.
 
        firehose_records_output['records'].append(firehose_record_output)
 
    # At the end return processed records
    return firehose_records_output
```

以下是一个用 Go 编写的 Firehose 流处理 Lambda 函数示例，该函数从输入到输出重放每一条读取记录，并从记录中提取分区键。

```
package main

import (
	"fmt"
	"encoding/json"
	"time"
	"strconv"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

type DataFirehoseEventRecordData struct {
	CustomerId string `json:"customerId"`
}

func handleRequest(evnt events.DataFirehoseEvent) (events.DataFirehoseResponse, error) {

	fmt.Printf("InvocationID: %s\n", evnt.InvocationID)
	fmt.Printf("DeliveryStreamArn: %s\n", evnt.DeliveryStreamArn)
	fmt.Printf("Region: %s\n", evnt.Region)

	var response events.DataFirehoseResponse

	for _, record := range evnt.Records {
		fmt.Printf("RecordID: %s\n", record.RecordID)
		fmt.Printf("ApproximateArrivalTimestamp: %s\n", record.ApproximateArrivalTimestamp)

		var transformedRecord events.DataFirehoseResponseRecord
		transformedRecord.RecordID = record.RecordID
		transformedRecord.Result = events.DataFirehoseTransformedStateOk
		transformedRecord.Data = record.Data

		var metaData events.DataFirehoseResponseRecordMetadata
		var recordData DataFirehoseEventRecordData
		partitionKeys := make(map[string]string)

		currentTime := time.Now()
		json.Unmarshal(record.Data, &recordData)
		partitionKeys["customerId"] = recordData.CustomerId
		partitionKeys["year"] = strconv.Itoa(currentTime.Year())
		partitionKeys["month"] = strconv.Itoa(int(currentTime.Month()))
		partitionKeys["date"] = strconv.Itoa(currentTime.Day())
		partitionKeys["hour"] = strconv.Itoa(currentTime.Hour())
		partitionKeys["minute"] = strconv.Itoa(currentTime.Minute())
		metaData.PartitionKeys = partitionKeys
		transformedRecord.Metadata = metaData

		response.Records = append(response.Records, transformedRecord)
	}

	return response, nil
}

func main() {
	lambda.Start(handleRequest)
}
```