기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
파티셔닝 키 이해
동적 파티셔닝을 사용하여, 파티션 키를 기반으로 데이터를 분할하고 스트리밍 S3 데이터에서 대상 데이터 세트를 생성합니다. 파티션 키를 사용하면 특정 값에 기반하여 스트리밍 데이터를 필터링할 수 있습니다. 예를 들어, 고객 ID 및 국가를 기준으로 데이터를 필터링해야 하는 경우 customer_id의 데이터 필드를 하나의 파티션 키로 지정하고 country의 데이터 필드는 또 다른 파티션 키로 지정할 수 있습니다. 그런 다음 표현식을 (지원되는 형식을 사용해) 지정하여 동적으로 파티셔닝된 데이터 레코드를 전송할 S3 버킷 접두사를 정의합니다.
다음 방법을 사용하여 파티셔닝 키를 생성할 수 있습니다.
-
인라인 구문 분석 - 이 방법은 Firehose의 내장 지원 메커니즘인 jq 구문 분석기
를 사용하여 JSON 형식의 데이터 레코드에서 파티션 키를 추출합니다. 현재는 jq 1.6버전만 지원합니다. -
AWS Lambda 함수 - 이 방법은 지정된 AWS Lambda 함수를 사용하여 파티셔닝에 필요한 데이터 필드를 추출하고 반환합니다.
중요
동적 파티셔닝을 사용할 경우, 이러한 방법 중 하나 이상을 구성하여 데이터를 분할하도록 해야 합니다. 두 방법 중 하나를 구성하여 파티션 키를 지정하거나 두 방법을 동시에 지정할 수 있습니다.
인라인 구문 분석 방법으로 파티션 키 만들기
인라인 구문 분석 방법으로 스트리밍 데이터의 동적 파티셔닝을 구성하려면, 파티션 키로 사용할 데이터 레코드 파라미터를 선택하고 지정된 각 파티션 키의 값을 입력해야 합니다.
다음 샘플 데이터 레코드는 인라인 구문 분석을 사용하여 레코드에 대한 파티션 키를 정의하는 방법을 보여줍니다. 데이터는 Base64 형식으로 인코딩되어야 합니다. CLI 예제를 참조할 수도 있습니다.
{ "type": { "device": "mobile", "event": "user_clicked_submit_button" }, "customer_id": "1234567890", "event_timestamp": 1565382027, #epoch timestamp "region": "sample_region" }
예를 들면 customer_id 파라미터 또는 event_timestamp 파라미터를 기반으로 데이터를 분할하도록 선택할 수 있습니다. 즉, 각 레코드의 customer_id 파라미터 또는 event_timestamp 파라미터의 값을 사용하여 레코드가 전송될 S3 접두사를 결정하는 것입니다. .type.device 표현식이 있는 device와(과) 같이 중첩된 파라미터를 선택할 수도 있습니다. 동적 파티셔닝 로직은 여러 가지 파라미터에 따라 달라질 수 있습니다.
파티션 키의 데이터 파라미터를 선택한 다음 각 파라미터를 유효한 jq 표현식으로 매핑합니다. 다음 표에는 파라미터를 jq 표현식으로 매핑한 내용이 나와 있습니다.
| 파라미터 | jq 표현식 |
|---|---|
customer_id |
.customer_id |
device |
.type.device |
year |
.event_timestamp| strftime("%Y") |
month |
.event_timestamp| strftime("%m") |
day |
.event_timestamp| strftime("%d") |
hour |
.event_timestamp| strftime("%H") |
런타임에서, Firehose는 위의 오른쪽 열을 사용하여 각 레코드의 데이터를 기준으로 파라미터를 평가합니다.
AWS Lambda 함수를 사용하여 파티셔닝 키 생성
압축 또는 암호화된 데이터 레코드 또는 JSON 이외의 파일 형식으로 된 데이터의 경우, 통합 AWS Lambda 함수를 사용자 지정 코드와 함께 사용하여 레코드를 압축 해제, 암호 해독 또는 변환하여 파티셔닝에 필요한 데이터 필드를 추출하고 반환할 수 있습니다. 이는 현재 Firehose와 함께 사용 가능한 기존 변환 Lambda 함수의 확장 기능입니다. 해당 데이터 필드를 변환, 구문 분석, 반환한 다음 동일한 Lambda 함수를 사용하여 동적 파티셔닝에 사용할 수 있습니다.
다음은 Python으로 된 Firehose 스트림 처리 Lambda 함수의 예시로, 입력에서 출력까지 모든 읽기 레코드를 재생하고 레코드에서 파티셔닝 키를 추출합니다.
from __future__ import print_function import base64 import json import datetime # Signature for all Lambda functions that user must implement def lambda_handler(firehose_records_input, context): print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn'] + ", Region: " + firehose_records_input['region'] + ", and InvocationId: " + firehose_records_input['invocationId']) # Create return value. firehose_records_output = {'records': []} # Create result object. # Go through records and process them for firehose_record_input in firehose_records_input['records']: # Get user payload payload = base64.b64decode(firehose_record_input['data']) json_value = json.loads(payload) print("Record that was received") print(json_value) print("\n") # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {} event_timestamp = datetime.datetime.fromtimestamp(json_value['eventTimestamp']) partition_keys = {"customerId": json_value['customerId'], "year": event_timestamp.strftime('%Y'), "month": event_timestamp.strftime('%m'), "day": event_timestamp.strftime('%d'), "hour": event_timestamp.strftime('%H'), "minute": event_timestamp.strftime('%M') } # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {'recordId': firehose_record_input['recordId'], 'data': firehose_record_input['data'], 'result': 'Ok', 'metadata': { 'partitionKeys': partition_keys }} # Must set proper record ID # Add the record to the list of output records. firehose_records_output['records'].append(firehose_record_output) # At the end return processed records return firehose_records_output
다음은 Go로 된 Firehose 스트림 처리 Lambda 함수의 예시로, 입력에서 출력까지 모든 읽기 레코드를 재생하고 레코드에서 파티셔닝 키를 추출합니다.
package main import ( "fmt" "encoding/json" "time" "strconv" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) type DataFirehoseEventRecordData struct { CustomerId string `json:"customerId"` } func handleRequest(evnt events.DataFirehoseEvent) (events.DataFirehoseResponse, error) { fmt.Printf("InvocationID: %s\n", evnt.InvocationID) fmt.Printf("DeliveryStreamArn: %s\n", evnt.DeliveryStreamArn) fmt.Printf("Region: %s\n", evnt.Region) var response events.DataFirehoseResponse for _, record := range evnt.Records { fmt.Printf("RecordID: %s\n", record.RecordID) fmt.Printf("ApproximateArrivalTimestamp: %s\n", record.ApproximateArrivalTimestamp) var transformedRecord events.DataFirehoseResponseRecord transformedRecord.RecordID = record.RecordID transformedRecord.Result = events.DataFirehoseTransformedStateOk transformedRecord.Data = record.Data var metaData events.DataFirehoseResponseRecordMetadata var recordData DataFirehoseEventRecordData partitionKeys := make(map[string]string) currentTime := time.Now() json.Unmarshal(record.Data, &recordData) partitionKeys["customerId"] = recordData.CustomerId partitionKeys["year"] = strconv.Itoa(currentTime.Year()) partitionKeys["month"] = strconv.Itoa(int(currentTime.Month())) partitionKeys["date"] = strconv.Itoa(currentTime.Day()) partitionKeys["hour"] = strconv.Itoa(currentTime.Hour()) partitionKeys["minute"] = strconv.Itoa(currentTime.Minute()) metaData.PartitionKeys = partitionKeys transformedRecord.Metadata = metaData response.Records = append(response.Records, transformedRecord) } return response, nil } func main() { lambda.Start(handleRequest) }