使用 Kafka 主題做為失敗時的目的地 - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Kafka 主題做為失敗時的目的地

您可以將 Kafka 主題設定為 Kafka 事件來源映射的失敗時目的地。當 Lambda 在耗盡重試嘗試後或記錄超過最長存留期後無法處理記錄時,Lambda 會將失敗的記錄傳送至指定的 Kafka 主題,以供稍後處理。

Kafka 失敗時目的地的運作方式

當您將 Kafka 主題設定為失敗時的目的地時,Lambda 會充當 Kafka 生產者,並將失敗的記錄寫入目的地主題。這會在 Kafka 基礎設施內建立無效字母主題 (DLT) 模式。

  • 相同的叢集需求 – 目的地主題必須與來源主題存在於相同的 Kafka 叢集中。

  • 實際記錄內容 – Kafka 目的地會收到實際失敗的記錄以及失敗中繼資料。

  • 防止遞迴 – Lambda 透過封鎖來源和目的地主題相同的組態來防止無限迴圈。

設定 Kafka 失敗時的目的地

您可以在建立或更新 Kafka 事件來源映射時,將 Kafka 主題設定為失敗時的目的地。

設定 Kafka 目的地 (主控台)

將 Kafka 主題設定為失敗時的目的地 (主控台)
  1. 開啟 Lambda 主控台中的函數頁面

  2. 選擇您的函數名稱。

  3. 執行以下任意一項:

    • 若要新增新的 Kafka 觸發條件,請在函數概觀下,選擇新增觸發條件

    • 若要修改現有的 Kafka 觸發條件,請選擇觸發條件,然後選擇編輯

  4. 其他設定下,針對失敗目的地,選擇 Kafka 主題

  5. 針對主題名稱,輸入您要傳送失敗記錄的 Kafka 主題名稱。

  6. 選擇新增儲存

設定 Kafka 目的地 (AWS CLI)

使用 kafka://字首將 Kafka 主題指定為失敗時的目的地。

使用 Kafka 目的地建立事件來源映射

下列範例會建立 Amazon MSK 事件來源映射,並以 Kafka 主題做為失敗時的目的地:

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123 \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

對於自我管理的 Kafka,請使用相同的語法:

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

更新 Kafka 目的地

使用 update-event-source-mapping命令來新增或修改 Kafka 目的地:

aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

Kafka 目的地的記錄格式

當 Lambda 將失敗的記錄傳送到 Kafka 主題時,每個訊息都會包含有關失敗和實際記錄內容的中繼資料。

失敗中繼資料

中繼資料包含記錄失敗原因的相關資訊,以及原始批次的詳細資訊:

{ "requestContext": { "requestId": "e4b46cbf-b738-xmpl-8880-a18cdf61200e", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:$LATEST", "condition": "RetriesExhausted", "approximateInvokeCount": 3 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T18:16:05.568Z", "KafkaBatchInfo": { "batchSize": 1, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123", "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098", "payloadSize": 1162, "recordInfo": { "offset": "49601189658422359378836298521827638475320189012309704722", "timestamp": "2019-11-14T18:16:04.835Z" } }, "payload": { "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098", "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123", "records": { "my-topic-0": [ { "headers": [], "key": "dGVzdC1rZXk=", "offset": 100, "partition": 0, "timestamp": 1749116692330, "timestampType": "CREATE_TIME", "topic": "my-topic", "value": "dGVzdC12YWx1ZQ==" } ] } } }

分割區索引鍵行為

在產生目的地主題時,Lambda 會使用原始記錄中的相同分割區索引鍵。如果原始記錄沒有金鑰,Lambda 會在目的地主題中的所有可用分割區中使用 Kafka 的預設循環配置分割。

要求與限制

  • 需要佈建模式 – Kafka 故障時目的地僅適用於啟用佈建模式的事件來源映射。

  • 僅限相同叢集 – 目的地主題必須存在於與來源主題相同的 Kafka 叢集中。

  • 主題許可 – 您的事件來源映射必須具有目的地主題的寫入許可。範例:

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ClusterPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeCluster", "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:cluster/*" ] }, { "Sid": "TopicPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:topic/*/*" ] }, { "Effect": "Allow", "Action": [ "kafka:DescribeCluster", "kafka:GetBootstrapBrokers", "kafka:Produce" ], "Resource": "arn:aws:kafka:*:*:cluster/*" }, { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • 無遞迴 – 目的地主題名稱不能與來源主題名稱相同。