

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 了解分割金鑰
<a name="dynamic-partitioning-partitioning-keys"></a>

透過動態分割，您可以根據分割索引鍵分割資料，從串流 S3 資料建立目標資料集。分割索引鍵可讓您根據特定值篩選串流資料。例如，如果您需要根據客戶 ID 和國家/地區篩選資料，可以將 `customer_id` 的資料欄位指定為一個分割索引鍵，而將 `country` 的資料欄位指定為另一個分割索引鍵。然後，您可以指定運算式 (使用支援的格式) 來定義要傳送動態分割資料記錄的 S3 儲存貯體字首。

您可以使用下列方法建立分割金鑰。
+ **內嵌剖析** – 此方法使用 Firehose 內建支援機制 [jq 剖析器](https://stedolan.github.io/jq/)，從 JSON 格式的資料記錄擷取分割金鑰。目前，我們僅支援 `jq 1.6`版本。
+ **AWS Lambda 函數** – 此方法使用指定的 AWS Lambda 函數來擷取和傳回分割所需的資料欄位。

**重要**  
啟用動態分割時，您必須至少設定下列一種方法，以分割您的資料。您可以設定其中一種方法來同時指定您的分割索引鍵，或同時指定兩個分割索引鍵。

## 使用內嵌剖析建立分割金鑰
<a name="dynamic-partitioning-inline-parsing"></a>

若要將內嵌剖析設定為串流資料的動態分割方法，您必須選擇要用作分割索引鍵的資料記錄參數，並為每個指定的分割索引鍵提供一個值。

下列範例資料記錄顯示如何使用內嵌剖析為其定義分割索引鍵。請注意，資料應以 Base64 格式編碼。您也可以參考 [CLI 範例](https://docs.aws.amazon.com/cli/latest/reference/firehose/put-record.html#examples)。

```
{  
   "type": {  
    "device": "mobile",  
    "event": "user_clicked_submit_button" 
  },  
  "customer_id": "1234567890",  
  "event_timestamp": 1565382027,   #epoch timestamp  
  "region": "sample_region"  
}
```

例如，您可以選擇根據 `customer_id` 參數或 `event_timestamp` 參數對資料進行分割。這表示您希望在決定要交付記錄的 S3 字首時，使用每個記錄中的 `customer_id` 參數或 `event_timestamp` 參數值。您也可以選擇巢狀參數，就像使用運算式 `.type.device` 的 `device` 一樣。您的動態分割邏輯可以依賴於多個參數。

選取分割索引鍵的資料參數之後，您可以將每個參數映射至有效的 jq 運算式。下表顯示了參數到 jq 運算式的映射：


| 參數 | jq 運算式 | 
| --- | --- | 
| customer\$1id | .customer\$1id | 
| device |  .type.device  | 
| year |  .event\$1timestamp\$1 strftime("%Y")  | 
| month |  .event\$1timestamp\$1 strftime("%m")  | 
| day |  .event\$1timestamp\$1 strftime("%d")  | 
| hour |  .event\$1timestamp\$1 strftime("%H")  | 

在執行時間，Firehose 會使用上面的右欄，根據每個記錄中的資料來評估參數。

## 使用 AWS Lambda 函數建立分割金鑰
<a name="dynamic-partitioning-with-lambda"></a>

對於壓縮或加密的資料記錄，或 JSON 以外的任何檔案格式的資料，您可以使用整合式 AWS Lambda 函數搭配您自己的自訂程式碼來解壓縮、解密或轉換記錄，以擷取和傳回分割所需的資料欄位。這是目前 Firehose 提供的現有轉換 Lambda 函數的擴展。您可以轉換、剖析和傳回資料欄位，然後使用相同的 Lambda 函數來進行動態分割。

以下是 Python 中的 Firehose 串流處理 Lambda 函數範例，該函數會將每個讀取記錄從輸入重播至輸出，並從記錄擷取分割金鑰。

```
from __future__ import print_function
import base64
import json
import datetime
 
# Signature for all Lambda functions that user must implement
def lambda_handler(firehose_records_input, context):
    print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn']
          + ", Region: " + firehose_records_input['region']
          + ", and InvocationId: " + firehose_records_input['invocationId'])
 
    # Create return value.
    firehose_records_output = {'records': []}
 
    # Create result object.
    # Go through records and process them
 
    for firehose_record_input in firehose_records_input['records']:
        # Get user payload
        payload = base64.b64decode(firehose_record_input['data'])
        json_value = json.loads(payload)
 
        print("Record that was received")
        print(json_value)
        print("\n")
        # Create output Firehose record and add modified payload and record ID to it.
        firehose_record_output = {}
        event_timestamp = datetime.datetime.fromtimestamp(json_value['eventTimestamp'])
        partition_keys = {"customerId": json_value['customerId'],
                          "year": event_timestamp.strftime('%Y'),
                          "month": event_timestamp.strftime('%m'),
                          "day": event_timestamp.strftime('%d'),
                          "hour": event_timestamp.strftime('%H'),
                          "minute": event_timestamp.strftime('%M')
                          }
 
        # Create output Firehose record and add modified payload and record ID to it.
        firehose_record_output = {'recordId': firehose_record_input['recordId'],
                                  'data': firehose_record_input['data'],
                                  'result': 'Ok',
                                  'metadata': { 'partitionKeys': partition_keys }}
 
        # Must set proper record ID
        # Add the record to the list of output records.
 
        firehose_records_output['records'].append(firehose_record_output)
 
    # At the end return processed records
    return firehose_records_output
```

以下是 Go 中 Firehose 串流處理 Lambda 函數的範例，該函數會將每個讀取記錄從輸入重播至輸出，並從記錄擷取分割金鑰。

```
package main

import (
	"fmt"
	"encoding/json"
	"time"
	"strconv"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

type DataFirehoseEventRecordData struct {
	CustomerId string `json:"customerId"`
}

func handleRequest(evnt events.DataFirehoseEvent) (events.DataFirehoseResponse, error) {

	fmt.Printf("InvocationID: %s\n", evnt.InvocationID)
	fmt.Printf("DeliveryStreamArn: %s\n", evnt.DeliveryStreamArn)
	fmt.Printf("Region: %s\n", evnt.Region)

	var response events.DataFirehoseResponse

	for _, record := range evnt.Records {
		fmt.Printf("RecordID: %s\n", record.RecordID)
		fmt.Printf("ApproximateArrivalTimestamp: %s\n", record.ApproximateArrivalTimestamp)

		var transformedRecord events.DataFirehoseResponseRecord
		transformedRecord.RecordID = record.RecordID
		transformedRecord.Result = events.DataFirehoseTransformedStateOk
		transformedRecord.Data = record.Data

		var metaData events.DataFirehoseResponseRecordMetadata
		var recordData DataFirehoseEventRecordData
		partitionKeys := make(map[string]string)

		currentTime := time.Now()
		json.Unmarshal(record.Data, &recordData)
		partitionKeys["customerId"] = recordData.CustomerId
		partitionKeys["year"] = strconv.Itoa(currentTime.Year())
		partitionKeys["month"] = strconv.Itoa(int(currentTime.Month()))
		partitionKeys["date"] = strconv.Itoa(currentTime.Day())
		partitionKeys["hour"] = strconv.Itoa(currentTime.Hour())
		partitionKeys["minute"] = strconv.Itoa(currentTime.Minute())
		metaData.PartitionKeys = partitionKeys
		transformedRecord.Metadata = metaData

		response.Records = append(response.Records, transformedRecord)
	}

	return response, nil
}

func main() {
	lambda.Start(handleRequest)
}
```