Kafka トピックを障害発生時の送信先として使用する - AWS Lambda

Kafka トピックを障害発生時の送信先として使用する

Kafka トピックは、Kafka イベントソースマッピングの障害発生時の送信先として設定できます。再試行回数を超えた後、またはレコードが最大経過時間を超えた後に Lambda がレコードを処理できなくなると、Lambda は失敗したレコードを指定された Kafka トピックに送信して、後で処理します。

Kafka 障害発生時の送信先のしくみ

Kafka トピックを障害発生時の送信先として設定すると、Lambda は Kafka プロデューサーとして機能し、失敗したレコードを送信先トピックに書き込みます。これにより、Kafka インフラストラクチャ内にデッドレタートピック (DLT) のパターンが作成されます。

  • 同じクラスター要件 – 送信先トピックは、送信元トピックと同じ Kafka クラスターに存在する必要があります。

  • 実際のレコードコンテンツ – Kafka の送信先は、実際の失敗したレコードと障害のメタデータを受け取ります。

  • 反復防止 – Lambda は、送信元トピックと送信先トピックが同じ場所になっている設定をブロックすることで、無限ループを防止します。

Kafka 障害発生時の送信先の設定

Kafka イベントソースマッピングを作成または更新するときに、Kafka トピックを障害発生時の送信先として設定できます。

Kafka 送信先の設定 (コンソール)

Kafka トピックを障害発生時の送信先として設定するには (コンソール)
  1. Lambda コンソールの [関数] ページを開きます。

  2. 関数の名前を選択します。

  3. 次のいずれかを行います。

    • 新しい Kafka トリガーを追加するには、[関数の概要][トリガーを追加] を選択します。

    • 既存の Kafka トリガーを変更するには、トリガーを選択し、[編集] を選択します。

  4. [追加設定][障害発生時の送信先] で、[Kafka トピック] を選択します。

  5. [トピック名] に、失敗したレコードを送信する Kafka トピックの名前を入力します。

  6. [追加] または [削除] を選択します。

Kafka 送信先の設定 (AWS CLI)

kafka:// プレフィックスを使用して、障害発生時の送信先として Kafka トピックを指定します。

Kafka 送信先を使用したイベントソースマッピングの作成

次の例では、障害発生時の送信先として Kafka トピックを使用して Amazon MSK イベントソースマッピングを作成します。

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123 \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

セルフマネージド型の Kafka の場合は、同じ構文を使用します。

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

Kafka 送信先の更新

update-event-source-mapping コマンドを使用して Kafka 送信先を追加または変更します。

aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

Kafka 送信先のレコード形式

Lambda が失敗したレコードを Kafka トピックに送信すると、各メッセージにはその障害に関するメタデータと実際のレコードコンテンツの両方が含まれます。

障害のメタデータ

メタデータには、レコードが失敗した理由に関する情報と元のバッチに関する詳細が含まれます。

{ "requestContext": { "requestId": "e4b46cbf-b738-xmpl-8880-a18cdf61200e", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:$LATEST", "condition": "RetriesExhausted", "approximateInvokeCount": 3 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T18:16:05.568Z", "KafkaBatchInfo": { "batchSize": 1, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123", "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098", "payloadSize": 1162, "recordInfo": { "offset": "49601189658422359378836298521827638475320189012309704722", "timestamp": "2019-11-14T18:16:04.835Z" } }, "payload": { "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098", "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123", "records": { "my-topic-0": [ { "headers": [], "key": "dGVzdC1rZXk=", "offset": 100, "partition": 0, "timestamp": 1749116692330, "timestampType": "CREATE_TIME", "topic": "my-topic", "value": "dGVzdC12YWx1ZQ==" } ] } } }

パーティションキーの動作

Lambda は、送信先トピックに生成するときに、元のレコードと同じパーティションキーを使用します。元のレコードにキーがない場合、Lambda は送信先トピックで使用可能なすべてのパーティションで Kafka のデフォルトのラウンドロビンパーティショニングを使用します。

要件と制限

  • プロビジョニングモードが必要 – Kafka 障害発生時の送信先は、プロビジョニングモードが有効になっているイベントソースマッピングでのみ使用できます。

  • 同じクラスターのみ – 送信先トピックは、送信元トピックと同じ Kafka クラスターに存在する必要があります。

  • トピックのアクセス許可 – イベントソースマッピングには、送信先トピックへの書き込みアクセス許可が必要です。例:

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ClusterPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeCluster", "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:cluster/*" ] }, { "Sid": "TopicPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:topic/*/*" ] }, { "Effect": "Allow", "Action": [ "kafka:DescribeCluster", "kafka:GetBootstrapBrokers", "kafka:Produce" ], "Resource": "arn:aws:kafka:*:*:cluster/*" }, { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • 反復なし – 送信先トピック名を送信元トピック名と同じにすることはできません。