Kafka イベントソースのエラー処理コントロールの設定
Lambda が Kafka イベントソースマッピングのエラーと再試行を処理する方法を設定できます。これらの設定は、Lambda が失敗したレコードを処理し、再試行動作を管理する方法を制御するのに役立ちます。
使用可能な再試行の設定
Amazon MSK イベントソースとセルフマネージド Kafka イベントソースの両方で、次の再試行の設定を使用できます。
-
最大再試行回数 – 関数がエラーを返したときに Lambda が再試行する回数の上限です。これにより、最初の呼び出しの試行はカウントされません。デフォルト値は -1 (無制限) です。
-
最大レコード有効時間 – Lambda が関数に送信するレコードの最大経過時間。デフォルト値は -1 (無制限) です。
-
エラー発生時にバッチを分割 – 関数がエラーを返す際、バッチを 2 つの小さなバッチに分割し、それぞれを個別に再試行します。これにより、問題のあるレコードを隔離できます。
-
部分的なバッチレスポンス – 関数がバッチ処理で失敗したレコードに関する情報を返せるようにするため、Lambda は失敗したレコードだけを再試行できます。
エラー処理コントロールの設定 (コンソール)
Lambda コンソールで Kafka イベントソースマッピングを作成または更新するときに再試行動作を設定できます。
Kafka イベントソースの再試行動作を設定するには (コンソール)
-
Lambda コンソールの [関数]
ページを開きます。 -
関数の名前を選択します。
-
次のいずれかを行います。
-
新しい Kafka トリガーを追加するには、[関数の概要] で [トリガーを追加] を選択します。
-
既存の Kafka トリガーを変更するには、トリガーを選択し、[編集] を選択します。
-
-
[イベントポーラー設定] で、プロビジョンドモードを選択してエラー処理コントロールを設定します。
-
[再試行] には、最大再試行回数 (0~10000、または無限の場合は -1) を入力します。
-
[最大レコード有効時間] には、最大経過時間を秒単位で入力します (60~604800、または無限の場合は -1)。
-
エラー発生時にバッチ分割を有効にするには、[エラー発生時にバッチを分割] を選択します。
-
部分的なバッチレスポンスを有効にするには、[ReportBatchItemFailures] を選択します。
-
-
[追加] または [削除] を選択します。
再試行動作の設定 (AWS CLI)
次の AWS CLI コマンドを使用して、Kafka イベントソースマッピングの再試行動作を設定します。
再試行設定を使用したイベントソースマッピングの作成
次の例では、エラー処理コントロールを使用してセルフマネージド型の Kafka イベントソースマッピングを作成します。
aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics my-kafka-topic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --maximum-retry-attempts 3 \ --maximum-record-age-in-seconds 3600 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"
Amazon MSK イベントソースの場合:
aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSMSKKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]' \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --maximum-retry-attempts 3 \ --maximum-record-age-in-seconds 3600 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"
再試行設定の更新
update-event-source-mapping コマンドを使用して、既存のイベントソースマッピングの再試行設定を変更します。
aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --maximum-retry-attempts 5 \ --maximum-record-age-in-seconds 7200 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"
PartialBatchResponse
ReportBatchItemFailures とも呼ばれる部分的なバッチレスポンスは、Lambda が Kafka ソースと統合する際のエラー処理の主要な機能です。この機能がない場合は、バッチ内のいずれかの項目でエラーが発生すると、そのバッチ内のすべてのメッセージが再処理されます。部分的バッチレスポンスを有効にして実装すると、ハンドラーは失敗したメッセージの識別子のみを返します。これにより、Lambda はそれらの特定の項目のみを再試行できます。これにより、失敗したメッセージを含むバッチの処理方法をより細かく制御できます。
バッチエラーを報告するには、次の JSON スキーマを使用します。
{ "batchItemFailures": [ { "itemIdentifier": { "topic-partition": "topic-partition_number", "offset": 100 } }, ... ] }
重要
空の有効な JSON または null を返す場合、イベントソースマッピングはバッチを正常に処理されたと見なします。呼び出されたイベントに存在しなかった無効な topic-partition_number または返されたオフセットは失敗として扱われ、バッチ全体が再試行されます。
次のコードの例では、Kafka ソースからイベントを受け取る Lambda 関数の部分的なバッチレスポンスの実装方法が示されています。この関数は、レスポンスとしてバッチアイテムの失敗を報告し、対象のメッセージを後で再試行するよう Lambda に伝えます。
このアプローチを示す Python Lambda ハンドラーの実装を次に示します。
import base64 from typing import Any, Dict, List def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, List[Dict[str, Dict[str, Any]]]]: failures: List[Dict[str, Dict[str, Any]]] = [] records_dict = event.get("records", {}) for topic_partition, records_list in records_dict.items(): for record in records_list: topic = record.get("topic") partition = record.get("partition") offset = record.get("offset") value_b64 = record.get("value") try: data = base64.b64decode(value_b64).decode("utf-8") process_message(data) except Exception as exc: print(f"Failed to process record topic={topic} partition={partition} offset={offset}: {exc}") item_identifier: Dict[str, Any] = { "topic-partition": f"{topic}-{partition}", "offset": int(offset) if offset is not None else None, } failures.append({"itemIdentifier": item_identifier}) return {"batchItemFailures": failures} def process_message(data: str) -> None: # Your business logic for a single message pass
Node.js のバージョンは次のとおりです。
const { Buffer } = require("buffer"); const handler = async (event) => { const failures = []; for (let topicPartition in event.records) { const records = event.records[topicPartition]; for (const record of records) { const topic = record.topic; const partition = record.partition; const offset = record.offset; const valueBase64 = record.value; const data = Buffer.from(valueBase64, "base64").toString("utf8"); try { await processMessage(data); } catch (error) { console.error("Failed to process record", { topic, partition, offset, error }); const itemIdentifier = { "topic-partition": `${topic}-${partition}`, offset: Number(offset), }; failures.push({ itemIdentifier }); } } } return { batchItemFailures: failures }; }; async function processMessage(payload) { // Your business logic for a single message } module.exports = { handler };