本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 Kafka 主題做為失敗時的目的地
您可以將 Kafka 主題設定為 Kafka 事件來源映射的失敗時目的地。當 Lambda 在耗盡重試嘗試後或記錄超過最長存留期後無法處理記錄時,Lambda 會將失敗的記錄傳送至指定的 Kafka 主題,以供稍後處理。
Kafka 失敗時目的地的運作方式
當您將 Kafka 主題設定為失敗時的目的地時,Lambda 會充當 Kafka 生產者,並將失敗的記錄寫入目的地主題。這會在 Kafka 基礎設施內建立無效字母主題 (DLT) 模式。
-
相同的叢集需求 – 目的地主題必須與來源主題存在於相同的 Kafka 叢集中。
-
實際記錄內容 – Kafka 目的地會收到實際失敗的記錄以及失敗中繼資料。
-
防止遞迴 – Lambda 透過封鎖來源和目的地主題相同的組態來防止無限迴圈。
設定 Kafka 失敗時的目的地
您可以在建立或更新 Kafka 事件來源映射時,將 Kafka 主題設定為失敗時的目的地。
設定 Kafka 目的地 (主控台)
將 Kafka 主題設定為失敗時的目的地 (主控台)
-
開啟 Lambda 主控台中的函數頁面
。 -
選擇您的函數名稱。
-
執行以下任意一項:
-
若要新增新的 Kafka 觸發條件,請在函數概觀下,選擇新增觸發條件。
-
若要修改現有的 Kafka 觸發條件,請選擇觸發條件,然後選擇編輯。
-
-
在其他設定下,針對失敗目的地,選擇 Kafka 主題。
-
針對主題名稱,輸入您要傳送失敗記錄的 Kafka 主題名稱。
-
選擇新增或儲存。
設定 Kafka 目的地 (AWS CLI)
使用 kafka://字首將 Kafka 主題指定為失敗時的目的地。
使用 Kafka 目的地建立事件來源映射
下列範例會建立 Amazon MSK 事件來源映射,並以 Kafka 主題做為失敗時的目的地:
aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123 \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'
對於自我管理的 Kafka,請使用相同的語法:
aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'
更新 Kafka 目的地
使用 update-event-source-mapping命令來新增或修改 Kafka 目的地:
aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'
Kafka 目的地的記錄格式
當 Lambda 將失敗的記錄傳送到 Kafka 主題時,每個訊息都會包含有關失敗和實際記錄內容的中繼資料。
失敗中繼資料
中繼資料包含記錄失敗原因的相關資訊,以及原始批次的詳細資訊:
{
"requestContext": {
"requestId": "e4b46cbf-b738-xmpl-8880-a18cdf61200e",
"functionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:$LATEST",
"condition": "RetriesExhausted",
"approximateInvokeCount": 3
},
"responseContext": {
"statusCode": 200,
"executedVersion": "$LATEST",
"functionError": "Unhandled"
},
"version": "1.0",
"timestamp": "2019-11-14T18:16:05.568Z",
"KafkaBatchInfo": {
"batchSize": 1,
"eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123",
"bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098",
"payloadSize": 1162,
"recordInfo": {
"offset": "49601189658422359378836298521827638475320189012309704722",
"timestamp": "2019-11-14T18:16:04.835Z"
}
},
"payload": {
"bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098",
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123",
"records": {
"my-topic-0": [
{
"headers": [],
"key": "dGVzdC1rZXk=",
"offset": 100,
"partition": 0,
"timestamp": 1749116692330,
"timestampType": "CREATE_TIME",
"topic": "my-topic",
"value": "dGVzdC12YWx1ZQ=="
}
]
}
}
}
分割區索引鍵行為
在產生目的地主題時,Lambda 會使用原始記錄中的相同分割區索引鍵。如果原始記錄沒有金鑰,Lambda 會在目的地主題中的所有可用分割區中使用 Kafka 的預設循環配置分割。
要求與限制
-
需要佈建模式 – Kafka 故障時目的地僅適用於啟用佈建模式的事件來源映射。
-
僅限相同叢集 – 目的地主題必須存在於與來源主題相同的 Kafka 叢集中。
-
主題許可 – 您的事件來源映射必須具有目的地主題的寫入許可。範例:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "ClusterPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeCluster", "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:cluster/*" ] }, { "Sid": "TopicPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:topic/*/*" ] }, { "Effect": "Allow", "Action": [ "kafka:DescribeCluster", "kafka:GetBootstrapBrokers", "kafka:Produce" ], "Resource": "arn:aws:kafka:*:*:cluster/*" }, { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] } -
無遞迴 – 目的地主題名稱不能與來源主題名稱相同。