Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Memahami kunci partisi
Dengan partisi dinamis, Anda membuat kumpulan data yang ditargetkan dari data streaming S3 dengan mempartisi data berdasarkan kunci partisi. Tombol partisi memungkinkan Anda memfilter data streaming berdasarkan nilai tertentu. Misalnya, jika Anda perlu memfilter data berdasarkan ID pelanggan dan negara, Anda dapat menentukan bidang data customer_id
sebagai satu kunci partisi dan bidang data country
sebagai kunci partisi lainnya. Kemudian, Anda menentukan ekspresi (menggunakan format yang didukung) untuk menentukan awalan bucket S3 yang akan dikirimkan rekaman data yang dipartisi secara dinamis.
Anda dapat membuat kunci partisi dengan metode berikut.
-
Parsing inline - metode ini menggunakan mekanisme dukungan built-in Firehose, parser jq
, untuk mengekstrak kunci untuk partisi dari catatan data yang dalam format JSON. Saat ini, kami hanya mendukung jq 1.6
versi. -
AWS Fungsi Lambda — metode ini menggunakan AWS Lambda fungsi tertentu untuk mengekstrak dan mengembalikan bidang data yang diperlukan untuk partisi.
penting
Saat Anda mengaktifkan partisi dinamis, Anda harus mengonfigurasi setidaknya satu dari metode ini untuk mempartisi data Anda. Anda dapat mengonfigurasi salah satu metode ini untuk menentukan kunci partisi Anda atau keduanya secara bersamaan.
Buat kunci partisi dengan penguraian sebaris
Untuk mengonfigurasi penguraian sebaris sebagai metode partisi dinamis untuk data streaming Anda, Anda harus memilih parameter catatan data yang akan digunakan sebagai kunci partisi dan memberikan nilai untuk setiap kunci partisi yang ditentukan.
Catatan data sampel berikut menunjukkan bagaimana Anda dapat menentukan kunci partisi untuk itu dengan parsing inline. Perhatikan bahwa data harus dikodekan dalam format Base64. Anda juga dapat merujuk ke contoh CLI.
{ "type": { "device": "mobile", "event": "user_clicked_submit_button" }, "customer_id": "1234567890", "event_timestamp": 1565382027, #epoch timestamp "region": "sample_region" }
Misalnya, Anda dapat memilih untuk mempartisi data Anda berdasarkan customer_id
parameter atau event_timestamp
parameter. Ini berarti Anda ingin nilai customer_id
parameter atau event_timestamp
parameter di setiap catatan digunakan dalam menentukan awalan S3 yang akan dikirimkan catatan. Anda juga dapat memilih parameter bersarang, seperti device
dengan ekspresi.type.device
. Logika partisi dinamis Anda dapat bergantung pada beberapa parameter.
Setelah memilih parameter data untuk kunci partisi Anda, Anda kemudian memetakan setiap parameter ke ekspresi jq yang valid. Tabel berikut menunjukkan pemetaan parameter ke ekspresi jq:
Parameter | ekspresi jq |
---|---|
customer_id |
.customer_id |
device |
.type.perangkat |
year |
.event_timestamp| strftime (“%Y”) |
month |
.event_timestamp| strftime (“%m”) |
day |
.event_timestamp| strftime (“%d”) |
hour |
.event_timestamp| strftime (“%H”) |
Saat runtime, Firehose menggunakan kolom kanan di atas untuk mengevaluasi parameter berdasarkan data di setiap record.
Buat tombol partisi dengan fungsi AWS Lambda
Untuk catatan data terkompresi atau terenkripsi, atau data yang dalam format file apa pun selain JSON, Anda dapat menggunakan AWS Lambda fungsi terintegrasi dengan kode kustom Anda sendiri untuk mendekompresi, mendekripsi, atau mengubah catatan untuk mengekstrak dan mengembalikan bidang data yang diperlukan untuk partisi. Ini adalah perluasan dari fungsi transformasi Lambda yang ada yang tersedia saat ini dengan Firehose. Anda dapat mengubah, mengurai, dan mengembalikan bidang data yang kemudian dapat Anda gunakan untuk partisi dinamis menggunakan fungsi Lambda yang sama.
Berikut ini adalah contoh Firehose stream processing fungsi Lambda di Python yang memutar ulang setiap catatan baca dari input ke output dan mengekstrak kunci partisi dari catatan.
from __future__ import print_function import base64 import json import datetime # Signature for all Lambda functions that user must implement def lambda_handler(firehose_records_input, context): print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn'] + ", Region: " + firehose_records_input['region'] + ", and InvocationId: " + firehose_records_input['invocationId']) # Create return value. firehose_records_output = {'records': []} # Create result object. # Go through records and process them for firehose_record_input in firehose_records_input['records']: # Get user payload payload = base64.b64decode(firehose_record_input['data']) json_value = json.loads(payload) print("Record that was received") print(json_value) print("\n") # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {} event_timestamp = datetime.datetime.fromtimestamp(json_value['eventTimestamp']) partition_keys = {"customerId": json_value['customerId'], "year": event_timestamp.strftime('%Y'), "month": event_timestamp.strftime('%m'), "day": event_timestamp.strftime('%d'), "hour": event_timestamp.strftime('%H'), "minute": event_timestamp.strftime('%M') } # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {'recordId': firehose_record_input['recordId'], 'data': firehose_record_input['data'], 'result': 'Ok', 'metadata': { 'partitionKeys': partition_keys }} # Must set proper record ID # Add the record to the list of output records. firehose_records_output['records'].append(firehose_record_output) # At the end return processed records return firehose_records_output
Berikut ini adalah contoh Firehose stream processing fungsi Lambda di Go yang memutar ulang setiap catatan baca dari input ke output dan mengekstrak kunci partisi dari catatan.
package main import ( "fmt" "encoding/json" "time" "strconv" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) type DataFirehoseEventRecordData struct { CustomerId string `json:"customerId"` } func handleRequest(evnt events.DataFirehoseEvent) (events.DataFirehoseResponse, error) { fmt.Printf("InvocationID: %s\n", evnt.InvocationID) fmt.Printf("DeliveryStreamArn: %s\n", evnt.DeliveryStreamArn) fmt.Printf("Region: %s\n", evnt.Region) var response events.DataFirehoseResponse for _, record := range evnt.Records { fmt.Printf("RecordID: %s\n", record.RecordID) fmt.Printf("ApproximateArrivalTimestamp: %s\n", record.ApproximateArrivalTimestamp) var transformedRecord events.DataFirehoseResponseRecord transformedRecord.RecordID = record.RecordID transformedRecord.Result = events.DataFirehoseTransformedStateOk transformedRecord.Data = record.Data var metaData events.DataFirehoseResponseRecordMetadata var recordData DataFirehoseEventRecordData partitionKeys := make(map[string]string) currentTime := time.Now() json.Unmarshal(record.Data, &recordData) partitionKeys["customerId"] = recordData.CustomerId partitionKeys["year"] = strconv.Itoa(currentTime.Year()) partitionKeys["month"] = strconv.Itoa(int(currentTime.Month())) partitionKeys["date"] = strconv.Itoa(currentTime.Day()) partitionKeys["hour"] = strconv.Itoa(currentTime.Hour()) partitionKeys["minute"] = strconv.Itoa(currentTime.Minute()) metaData.PartitionKeys = partitionKeys transformedRecord.Metadata = metaData response.Records = append(response.Records, transformedRecord) } return response, nil } func main() { lambda.Start(handleRequest) }