耐久関数を使用したイベントソースマッピング - AWS Lambda

耐久関数を使用したイベントソースマッピング

耐久関数は、すべての Lambda イベントソースマッピングで機能します。耐久関数のイベントソースマッピングは、標準関数の設定と同じ方法で設定します。イベントソースマッピングは、Amazon SQS、Kinesis、DynamoDB Streams などのイベントソースを自動的にポーリングし、レコードのバッチで関数を呼び出します。

イベントソースマッピングは、複雑なマルチステップワークフローでストリームまたはキューを処理する耐久関数に有用です。例えば、再試行、外部 API コール、人間の承認を使用して Amazon SQS メッセージを処理する耐久関数を作成できます。

イベントソースマッピングが耐久関数を呼び出す方法

イベントソースマッピングは、耐久関数を同期的に呼び出し、耐久性のある実行が完全に終了するのを待ってから、次のバッチを処理したり、レコードを処理済みとしてマークします。耐久性のある実行の合計時間が 15 分を超えると、実行はタイムアウトして失敗します。イベントソースマッピングはタイムアウト例外を受け取り、再試行設定に従って処理します。

15 分間の実行制限

耐久関数がイベントソースマッピングによって呼び出される場合、耐久性のある実行の合計時間は 15 分を超えることはできません。この制限は、個々の関数呼び出しだけでなく、最初から最後までの耐久性のある実行全体に適用されます。

この 15 分の制限は、Lambda 関数のタイムアウト (最大 15 分) とは別です。関数タイムアウトは、個々の呼び出しを実行できる時間を制御し、耐久性のある実行のタイムアウトは、実行開始から完了までの合計経過時間を制御します。

シナリオ例

  • 有効: 耐久関数は、Amazon SQS メッセージを 3 つのステップで処理します。各ステップには 2 分かかり、最後のステップを完了するまで 5 分待機します。合計実行時間: 11 分。これは、合計が 15 分未満であるために機能します。

  • 無効: 耐久関数は Amazon SQS メッセージを処理し、最初の処理を 2 分で完了してから、外部コールバックが完了するまで 20 分待機します。合計実行時間: 22 分。これは 15 分の制限を超え、失敗します。

  • 無効: 耐久関数は、ステップ間に合計 30 分におよぶ複数の待機操作を伴いつつ、Kinesis レコードを処理します。個々の呼び出しは迅速に完了しますが、合計実行時間は 15 分を超えています。

重要

イベントソースマッピングを使用する場合、耐久性のある実行のタイムアウトを 15 分以下に設定してください。そうしないと、イベントソースマッピングの作成は失敗します。ワークフローで実行時間が長い場合は、以下で説明する中間関数パターンを使用します。

イベントソースマッピングの設定

Lambda コンソール、AWS CLI、または AWS SDK を使用して、耐久性のある関数のイベントソースマッピングを設定します。すべての標準イベントソースマッピングのプロパティは、耐久関数に適用されます。

aws lambda create-event-source-mapping \ --function-name arn:aws:lambda:us-east-1:123456789012:function:my-durable-function:1 \ --event-source-arn arn:aws:sqs:us-east-1:123456789012:my-queue \ --batch-size 10 \ --maximum-batching-window-in-seconds 5

耐久関数のイベントソースマッピングを設定するときは、必ず修飾 ARN (バージョン番号またはエイリアス) を使用してください。

イベントソースマッピングによるエラー処理

イベントソースマッピングは、耐久関数で動作する組み込みのエラー処理を提供します。

  • 再試行動作: 最初の呼び出しが失敗した場合、イベントソースマッピングは再試行設定に従って再試行します。要件に基づいて最大再試行回数と再試行間隔を設定します。

  • デッドレターキュー: すべての再試行後に失敗したレコードをキャプチャするようにデッドレター キューを構成します。これにより、メッセージの損失を防ぎ、失敗したレコードを手動で検査できます。

  • 部分的なバッチ処理の失敗: Amazon SQS および Kinesis では、部分的なバッチ処理の失敗レポートを使用して、レコードを個別に処理し、失敗したレコードのみを再試行します。

  • Bisect on error: Kinesis および DynamoDB Streams の場合、bisect on error 機能を有効にして、失敗したバッチを分割し、問題のあるレコードを分離します。

注記

耐久性のある関数では、エラー処理用のデッドレターキュー (DLQ) がサポートされていますが、Lambda 送信先はサポートされていません。DLQ を設定し、失敗した呼び出しのレコードをキャプチャします。

イベントソースマッピングエラー処理の詳細については、「イベントソースマッピング」を参照してください。

長時間実行されるワークフローに中間関数を使用する

ワークフローの完了に 15 分以上かかる場合は、イベントソースマッピングと耐久関数の間に中間標準 Lambda 関数を使用します。中間関数は、イベントソースマッピングからイベントを受信し、耐久関数を非同期的に呼び出し、15 分間の実行制限を削除します。

このパターンは、イベントソースマッピングの同期呼び出しモデルを耐久関数の長時間かかる実行モデルから切り離します。イベントソースマッピングは中間関数を呼び出し、中間関数は耐久性のある実行を開始した後すぐに返されます。その後、耐久関数は、必要な期間 (最大 1 年間) 独立して実行されます。

アーキテクチャ

中間関数パターンでは、次の 3 つのコンポーネントを使用します。

  1. イベントソースマッピング: イベントソース (Amazon SQS、Kinesis、DynamoDB Streams) をポーリングし、レコードのバッチと同期的に中間関数を呼び出します。

  2. 中間関数: イベントソースマッピングからイベントを受信し、必要に応じてデータを検証および変換し、耐久関数を非同期的に呼び出す標準の Lambda 関数。この関数は迅速に (通常は 1 秒未満) 完了し、イベントソースマッピングに制御を返します。

  3. 耐久関数: 長時間の実行が可能な複雑かつ多段階のロジックでイベントを処理します。非同期的に呼び出されるため、15 分の制限による制約はありません。

実装

中間関数は、イベントソースマッピングからイベント全体を受信し、耐久関数を非同期的に呼び出します。実行名パラメータを使用してべき等実行が開始されるようにし、イベントソースマッピングが再試行された場合に重複処理を防止します。

TypeScript
import { LambdaClient, InvokeCommand } from '@aws-sdk/client-lambda'; import { SQSEvent } from 'aws-lambda'; import { createHash } from 'crypto'; const lambda = new LambdaClient({}); export const handler = async (event: SQSEvent) => { // Invoke durable function asynchronously with execution name await lambda.send(new InvokeCommand({ FunctionName: 'arn:aws:lambda:us-east-1:123456789012:function:my-durable-function:1', InvocationType: 'Event', Payload: JSON.stringify({ executionName: event.Name, event: event }) })); return { statusCode: 200 }; };
Python
import boto3 import json import hashlib lambda_client = boto3.client('lambda') def handler(event, context): # Invoke durable function asynchronously with execution name lambda_client.invoke( FunctionName='arn:aws:lambda:us-east-1:123456789012:function:my-durable-function:1', InvocationType='Event', Payload=json.dumps({ 'executionName': execution_name, 'event': event["name"] }) ) return {'statusCode': 200}

中間関数自体のべき等性については、Powertools for AWS Lambda を使用して、イベントソースマッピングが中間関数を再試行した場合に、耐久関数の重複呼び出しを防止します。

耐久関数は、実行名を持つペイロードを受信し、長時間実行されるロジックを持つすべてのレコードを処理します。

TypeScript
import { withDurableExecution, DurableContext } from '@aws/durable-execution-sdk-js'; export const handler = withDurableExecution( async (payload: any, context: DurableContext) => { const sqsEvent = payload.event; // Process each record with complex, multi-step logic const results = await context.map( sqsEvent.Records, async (ctx, record) => { const validated = await ctx.step('validate', async () => { return validateOrder(JSON.parse(record.body)); }); // Wait for external approval (could take hours or days) const approval = await ctx.waitForCallback( 'approval', async (callbackId) => { await requestApproval(callbackId, validated); }, { timeout: { hours: 48 } } ); // Complete processing return await ctx.step('complete', async () => { return completeOrder(validated, approval); }); } ); return { statusCode: 200, processed: results.getResults().length }; } );
Python
from aws_durable_execution_sdk_python import durable_execution, DurableContext import json @durable_execution def handler(payload, context: DurableContext): sqs_event = payload['event'] # Process each record with complex, multi-step logic def process_record(ctx, record): validated = ctx.step( lambda _: validate_order(json.loads(record['body'])), name='validate' ) # Wait for external approval (could take hours or days) approval = ctx.wait_for_callback( lambda callback_id: request_approval(callback_id, validated), name='approval', config=WaitForCallbackConfig(timeout_seconds=172800) # 48 hours ) # Complete processing return ctx.step( lambda _: complete_order(validated, approval), name='complete' ) results = context.map(sqs_event['Records'], process_record) return {'statusCode': 200, 'processed': len(results.get_results())}

主な考慮事項

このパターンは、耐久性のある実行からイベントソースマッピングを切り離すことで、15 分間の実行制限を削除します。中間関数は、耐久性のある実行を開始した直後に返り、イベントソースマッピングの処理を続行できるようにします。その後、耐久関数は、必要な期間だけ独立して実行されます。

中間関数は、耐久性のある実行が完了したときではなく、耐久関数を呼び出すときに成功します。耐久性のある実行が後で失敗した場合、イベントソースマッピングはすでにバッチを正常に処理しているため、再試行されません。耐久関数にエラー処理を実装し、失敗した実行に対してデッドレターキューを設定します。

実行名パラメータを使用して、べき等実行が開始されるようにします。イベントソースマッピングが中間関数を再試行する場合、実行名がすでに存在するため、耐久関数は重複した実行を開始しません。

サポートされているイベントソース

耐久関数は、イベントソースマッピングを使用するすべての Lambda イベントソースをサポートします。

  • Amazon SQS キュー (スタンダードおよび FIFO)

  • Kinesis Streams

  • DynamoDB Streams

  • Amazon Managed Streaming for Apache Kafka (Amazon MSK)

  • セルフマネージド Apache Kafka

  • Amazon MQ (ActiveMQ および RabbitMQ)

  • Amazon DocumentDB 変更ストリーム

耐久関数を呼び出す場合、すべてのイベントソースタイプには 15 分間の耐久性のある実行制限が適用されます。