

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Memahami kunci partisi
<a name="dynamic-partitioning-partitioning-keys"></a>

Dengan partisi dinamis, Anda membuat kumpulan data yang ditargetkan dari data streaming S3 dengan mempartisi data berdasarkan kunci partisi. Tombol partisi memungkinkan Anda memfilter data streaming berdasarkan nilai tertentu. Misalnya, jika Anda perlu memfilter data berdasarkan ID pelanggan dan negara, Anda dapat menentukan bidang data `customer_id` sebagai satu kunci partisi dan bidang data `country` sebagai kunci partisi lainnya. Kemudian, Anda menentukan ekspresi (menggunakan format yang didukung) untuk menentukan awalan bucket S3 yang akan dikirimkan rekaman data yang dipartisi secara dinamis. 

Anda dapat membuat kunci partisi dengan metode berikut.
+ **Parsing inline** - metode ini menggunakan mekanisme dukungan built-in Firehose, [parser jq](https://stedolan.github.io/jq/), untuk mengekstrak kunci untuk partisi dari catatan data yang dalam format JSON. Saat ini, kami hanya mendukung `jq 1.6` versi.
+ **AWS Fungsi Lambda** — metode ini menggunakan AWS Lambda fungsi tertentu untuk mengekstrak dan mengembalikan bidang data yang diperlukan untuk partisi.

**penting**  
Saat Anda mengaktifkan partisi dinamis, Anda harus mengonfigurasi setidaknya satu dari metode ini untuk mempartisi data Anda. Anda dapat mengonfigurasi salah satu metode ini untuk menentukan kunci partisi Anda atau keduanya secara bersamaan. 

## Buat kunci partisi dengan penguraian sebaris
<a name="dynamic-partitioning-inline-parsing"></a>

Untuk mengonfigurasi penguraian sebaris sebagai metode partisi dinamis untuk data streaming Anda, Anda harus memilih parameter catatan data yang akan digunakan sebagai kunci partisi dan memberikan nilai untuk setiap kunci partisi yang ditentukan.

Catatan data sampel berikut menunjukkan bagaimana Anda dapat menentukan kunci partisi untuk itu dengan parsing inline. Perhatikan bahwa data harus dikodekan dalam format Base64. Anda juga dapat merujuk ke contoh [CLI](https://docs.aws.amazon.com/cli/latest/reference/firehose/put-record.html#examples).

```
{  
   "type": {  
    "device": "mobile",  
    "event": "user_clicked_submit_button" 
  },  
  "customer_id": "1234567890",  
  "event_timestamp": 1565382027,   #epoch timestamp  
  "region": "sample_region"  
}
```

Misalnya, Anda dapat memilih untuk mempartisi data Anda berdasarkan `customer_id` parameter atau `event_timestamp` parameter. Ini berarti Anda ingin nilai `customer_id` parameter atau `event_timestamp` parameter di setiap catatan digunakan dalam menentukan awalan S3 yang akan dikirimkan catatan. Anda juga dapat memilih parameter bersarang, seperti `device` dengan ekspresi`.type.device`. Logika partisi dinamis Anda dapat bergantung pada beberapa parameter.

Setelah memilih parameter data untuk kunci partisi Anda, Anda kemudian memetakan setiap parameter ke ekspresi jq yang valid. Tabel berikut menunjukkan pemetaan parameter ke ekspresi jq:


| Parameter | ekspresi jq | 
| --- | --- | 
| customer\_id | .customer\_id | 
| device | .type.perangkat | 
| year | .event\_timestamp\| strftime (“%Y”) | 
| month | .event\_timestamp\| strftime (“%m”) | 
| day | .event\_timestamp\| strftime (“%d”) | 
| hour | .event\_timestamp\| strftime (“%H”) | 

Saat runtime, Firehose menggunakan kolom kanan di atas untuk mengevaluasi parameter berdasarkan data di setiap record.

## Buat tombol partisi dengan fungsi AWS Lambda
<a name="dynamic-partitioning-with-lambda"></a>

Untuk catatan data terkompresi atau terenkripsi, atau data yang dalam format file apa pun selain JSON, Anda dapat menggunakan AWS Lambda fungsi terintegrasi dengan kode kustom Anda sendiri untuk mendekompresi, mendekripsi, atau mengubah catatan untuk mengekstrak dan mengembalikan bidang data yang diperlukan untuk partisi. Ini adalah perluasan dari fungsi transformasi Lambda yang ada yang tersedia saat ini dengan Firehose. Anda dapat mengubah, mengurai, dan mengembalikan bidang data yang kemudian dapat Anda gunakan untuk partisi dinamis menggunakan fungsi Lambda yang sama.

Berikut ini adalah contoh Firehose stream processing fungsi Lambda di Python yang memutar ulang setiap catatan baca dari input ke output dan mengekstrak kunci partisi dari catatan.

```
from __future__ import print_function
import base64
import json
import datetime
 
# Signature for all Lambda functions that user must implement
def lambda_handler(firehose_records_input, context):
    print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn']
          + ", Region: " + firehose_records_input['region']
          + ", and InvocationId: " + firehose_records_input['invocationId'])
 
    # Create return value.
    firehose_records_output = {'records': []}
 
    # Create result object.
    # Go through records and process them
 
    for firehose_record_input in firehose_records_input['records']:
        # Get user payload
        payload = base64.b64decode(firehose_record_input['data'])
        json_value = json.loads(payload)
 
        print("Record that was received")
        print(json_value)
        print("\n")
        # Create output Firehose record and add modified payload and record ID to it.
        firehose_record_output = {}
        event_timestamp = datetime.datetime.fromtimestamp(json_value['eventTimestamp'])
        partition_keys = {"customerId": json_value['customerId'],
                          "year": event_timestamp.strftime('%Y'),
                          "month": event_timestamp.strftime('%m'),
                          "day": event_timestamp.strftime('%d'),
                          "hour": event_timestamp.strftime('%H'),
                          "minute": event_timestamp.strftime('%M')
                          }
 
        # Create output Firehose record and add modified payload and record ID to it.
        firehose_record_output = {'recordId': firehose_record_input['recordId'],
                                  'data': firehose_record_input['data'],
                                  'result': 'Ok',
                                  'metadata': { 'partitionKeys': partition_keys }}
 
        # Must set proper record ID
        # Add the record to the list of output records.
 
        firehose_records_output['records'].append(firehose_record_output)
 
    # At the end return processed records
    return firehose_records_output
```

Berikut ini adalah contoh Firehose stream processing fungsi Lambda di Go yang memutar ulang setiap catatan baca dari input ke output dan mengekstrak kunci partisi dari catatan.

```
package main

import (
	"fmt"
	"encoding/json"
	"time"
	"strconv"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

type DataFirehoseEventRecordData struct {
	CustomerId string `json:"customerId"`
}

func handleRequest(evnt events.DataFirehoseEvent) (events.DataFirehoseResponse, error) {

	fmt.Printf("InvocationID: %s\n", evnt.InvocationID)
	fmt.Printf("DeliveryStreamArn: %s\n", evnt.DeliveryStreamArn)
	fmt.Printf("Region: %s\n", evnt.Region)

	var response events.DataFirehoseResponse

	for _, record := range evnt.Records {
		fmt.Printf("RecordID: %s\n", record.RecordID)
		fmt.Printf("ApproximateArrivalTimestamp: %s\n", record.ApproximateArrivalTimestamp)

		var transformedRecord events.DataFirehoseResponseRecord
		transformedRecord.RecordID = record.RecordID
		transformedRecord.Result = events.DataFirehoseTransformedStateOk
		transformedRecord.Data = record.Data

		var metaData events.DataFirehoseResponseRecordMetadata
		var recordData DataFirehoseEventRecordData
		partitionKeys := make(map[string]string)

		currentTime := time.Now()
		json.Unmarshal(record.Data, &recordData)
		partitionKeys["customerId"] = recordData.CustomerId
		partitionKeys["year"] = strconv.Itoa(currentTime.Year())
		partitionKeys["month"] = strconv.Itoa(int(currentTime.Month()))
		partitionKeys["date"] = strconv.Itoa(currentTime.Day())
		partitionKeys["hour"] = strconv.Itoa(currentTime.Hour())
		partitionKeys["minute"] = strconv.Itoa(currentTime.Minute())
		metaData.PartitionKeys = partitionKeys
		transformedRecord.Metadata = metaData

		response.Records = append(response.Records, transformedRecord)
	}

	return response, nil
}

func main() {
	lambda.Start(handleRequest)
}
```