耐久性のある実行 SDK - AWS Lambda

耐久性のある実行 SDK

耐久性のある実行 SDK は、耐久性のある関数を構築するための基盤です。進行状況のチェックポイント、再試行の処理、実行フローの管理に必要なプリミティブが提供されます。SDK によってチェックポイントの管理および再生の複雑さが抽象化されるため、自動的に耐障害性になるシーケンシャルコードを記述できます。

SDK は JavaScript、TypeScript、Python に利用できます。API の完全なドキュメントや例については、GitHub の「JavaScript/TypeScript SDK」および「Python SDK」を参照してください。

DurableContext

SDK により、耐久性のあるすべてのオペレーションが公開される DurableContext オブジェクトで関数に提供されます。このコンテキストは標準 Lambda コンテキストを置き換えて、チェックポイントの作成、実行フローの管理、外部システムとの調整に関する方法を説明します。

SDK を使用するには、Lambda ハンドラーを耐久性のある実行ラッパーでラップします。

TypeScript
import { withDurableExecution, DurableContext } from '@aws/durable-execution-sdk-js'; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { // Your function receives DurableContext instead of Lambda context // Use context.step(), context.wait(), etc. return result; } );
Python
from aws_durable_execution_sdk_python import durable_execution, DurableContext @durable_execution def handler(event: dict, context: DurableContext): # Your function receives DurableContext # Use context.step(), context.wait(), etc. return result

ラッパーは関数の呼び出しを傍受し、既存のチェックポイントログをロードし、再生およびチェックポイントを管理する DurableContext を提供します。

SDK の動作

SDK により、耐久性のある実行を実現する 3 つの重要な責任が処理されます。

チェックポイント管理: 関数が耐久性のあるオペレーションを実行すると、SDK によってチェックポイントが自動的に作成されます。各チェックポイントでは、オペレーションタイプ、入力、結果が記録されます。関数がステップを完了すると、SDK によって続行される前にチェックポイントが保持されます。関数が中断された場合、完了したすべてのオペレーションから再開できます。

再生の調整: 一時停止または中断の後に関数が再開されると、SDK によって再生が実行されます。コードを最初から実行しますが、再実行せずに保存されたチェックポイント結果が使用されて、完了したオペレーションがスキップされます。SDK によって決定的な再生が実現されます。入力およびチェックポイントログが同じであることを前提として、関数で同じ結果が生成されます。

状態の分離: SDK により、ビジネスロジックとは別に実行状態が維持されます。耐久性のある各実行には、他の実行ではアクセスできない独自のチェックポイントログがあります。SDK によって保管中のチェックポイントデータが暗号化され、再生間で状態の一貫性が維持されます。

チェックポイントの仕組み

耐久性のあるオペレーションを呼び出すと、SDK は次のシーケンスに従います。

  1. 既存のチェックポイントの確認: SDK により、このオペレーションは以前の呼び出しで既に完了されているかどうか確認されます。チェックポイントが存在する場合、SDK によってオペレーションが再実行されずに保存された結果が返されます。

  2. オペレーションを実行する: チェックポイントが存在しない場合、SDK によってオペレーションコードが実行されます。ステップの場合、関数を呼び出すことを意味します。待機の場合、再開のスケジューリングを意味します。

  3. チェックポイントの作成: オペレーションが完了すると、SDK によって結果がシリアル化されてチェックポイントが作成されます。チェックポイントにはオペレーションタイプ、名前、入力、結果、タイムスタンプが含まれます。

  4. チェックポイントの永続化: SDK によって Lambda チェックポイント API が呼び出され、チェックポイントが永続化されます。実行を続行する前にチェックポイントの耐久性が実現されます。

  5. 結果を返す: SDK によってオペレーション結果がコードに返され、次のオペレーションに進みます。

このシーケンスにより、オペレーションが完了したら結果は安全に保存されます。関数がある時点で中断された場合、SDK によって最後に完了されたチェックポイントまで再生することができます。

再生動作

一時停止または中断後に関数が再開されると、SDK によって再生が実行されます。

  1. チェックポイントログの読み込み: SDK により、この実行のチェックポイントログは Lambda から取得されます。

  2. 最初から実行: 一時停止した時点からではなく、SDK によって最初からハンドラー関数が呼び出されます。

  3. 完了した耐久性のあるオペレーションのスキップ: コードが耐久性のあるオペレーションを呼び出すと、SDK によってチェックポイントログが照合され、それぞれ確認されます。完了した耐久性のあるオペレーションには、SDK によってオペレーションコードが実行されずに保存された結果が返されます。

    注記

    子コンテキストの結果がチェックポイントの最大サイズ (256 KB) より大きい場合、再生中にコンテキストのコードが再度実行されます。コンテキスト内で実行された耐久性のあるオペレーションで大規模な結果を構築できます。コンテキストはチェックポイントログから検索されます。したがって、コンテキスト自体で決定的コードのみを実行することが不可欠です。大規模な結果を伴う子コンテキストを使用するとき、ステップ内で長時間の作業または非決定的な作業を実行し、コンテキスト自体に結果を組み合わせた短時間実行のタスクのみを実行することがベストプラクティスです。

  4. 中断ポイントでの再開: SDK がチェックポイントなしでオペレーションに到達すると、正常に実行されて、耐久性のあるオペレーションが完了すると新しいチェックポイントが作成されます。

この再生メカニズムでは、コードが決定的である必要があります。同じ入力およびチェックポイントログの場合、関数は耐久性のあるオペレーションの呼び出しを同じシーケンスで実行する必要があります。SDK によってリプレイ中にオペレーション名およびタイプがチェックポイントログと一致することが確認されることで、これが実施されます。

利用可能な耐久性のあるオペレーション

DurableContext は、さまざまな調整パターンのオペレーションを提供します。耐久性のある各オペレーションによってチェックポイントが自動的に作成され、関数はどの時点からでも再開できます。

Steps

自動チェックポイントおよび再試行によってビジネスロジックが実行されます。外部サービスを呼び出したり、計算を実行したり、チェックポイントする必要があるロジックを実行したりするオペレーションのステップを使用します。SDK によってステップの前後にチェックポイントが作成され、結果が再生用に保存されます。

TypeScript
const result = await context.step('process-payment', async () => { return await paymentService.charge(amount); });
Python
result = context.step( lambda _: payment_service.charge(amount), name='process-payment' )

ステップには、設定可能な再試行戦略、実行セマンティクス (at-most-once または at-least-once)、カスタムシリアル化がサポートされます。

待機

コンピューティングリソースを消費せずに、指定された期間に実行が一時停止されます。SDK によってチェックポイントが作成され、関数の呼び出しが終了され、再開がスケジューリングされます。待機が完了すると Lambda によって関数が再度呼び出され、SDK によって続行される前に待機ポイントまで再生されます。

TypeScript
// Wait 1 hour without charges await context.wait({ seconds: 3600 });
Python
# Wait 1 hour without charges context.wait(3600)

コールバック

コールバックにより、関数は一時停止して外部システムが入力するまで待つことができます。コールバックを作成すると、SDK によって一意のコールバック ID が生成され、チェックポイントが作成されます。その後、関数はコンピューティング料金を発生させずに停止 (呼び出しを終了する) します。外部システムは SendDurableExecutionCallbackSuccess または SendDurableExecutionCallbackFailure Lambda API を使用し、コールバック結果を送信します。コールバックが送信されると、Lambda は関数を再度呼び出します。SDK によってコールバックポイントまで再生され、関数によってコールバック結果が継続されます。

SDK には、コールバックを操作する 2 つの方法があります。

createCallback: コールバックが作成され、約束およびコールバック ID の両方が返されます。コールバック ID を外部システムに送信し、Lambda API を使用して結果が送信されます。

TypeScript
const [promise, callbackId] = await context.createCallback('approval', { timeout: { hours: 24 } }); await sendApprovalRequest(callbackId, requestData); const approval = await promise;
Python
callback = context.create_callback( name='approval', config=CallbackConfig(timeout_seconds=86400) ) context.step( lambda _: send_approval_request(callback.callback_id), name='send_request' ) approval = callback.result()

waitForCallback: コールバックの作成および送信を 1 回のオペレーションに組み合わせることで、コールバック処理が簡素化されます。SDK によってコールバックが作成され、コールバック ID で送信者関数が実行されて結果を待機します。

TypeScript
const result = await context.waitForCallback( 'external-api', async (callbackId, ctx) => { await submitToExternalAPI(callbackId, requestData); }, { timeout: { minutes: 30 } } );
Python
result = context.wait_for_callback( lambda callback_id: submit_to_external_api(callback_id, request_data), name='external-api', config=WaitForCallbackConfig(timeout_seconds=1800) )

関数が無期限に待機しないようにタイムアウトを設定します。コールバックがタイムアウトした場合、SDK によって CallbackError がスローされ、関数はタイムアウトケースを処理できます。長時間のコールバックにハートビートタイムアウトが使用され、外部システムの応答が停止したタイミングが検出されます。

コールバックはヒューマンインザループのワークフロー、外部システム統合、ウェブフックのレスポンス、外部入力のために実行を一時停止する必要があるシナリオに使用します。

同時実行

複数のオペレーションは、オプションの同時実行制御と同時に実行します。SDK によって並列実行が管理され、オペレーションごとにチェックポイントが作成されて、完了ポリシーに従って障害が処理されます。

TypeScript
const results = await context.parallel([ async (ctx) => ctx.step('task1', async () => processTask1()), async (ctx) => ctx.step('task2', async () => processTask2()), async (ctx) => ctx.step('task3', async () => processTask3()) ]);
Python
results = context.parallel( lambda ctx: ctx.step(lambda _: process_task1(), name='task1'), lambda ctx: ctx.step(lambda _: process_task2(), name='task2'), lambda ctx: ctx.step(lambda _: process_task3(), name='task3') )

parallel を使用して、独立したオペレーションを同時に実行します。

マップ

オプションの同時実行制御を使用して、配列の各項目に対してオペレーションを同時に実行します。SDK によって同時実行が管理され、オペレーションごとにチェックポイントが作成されて、完了ポリシーに従って障害が処理されます。

TypeScript
const results = await context.map(itemArray, async (ctx, item, index) => ctx.step('task', async () => processItem(item, index)) );
Python
results = context.map( item_array, lambda ctx, item, index: ctx.step( lambda _: process_item(item, index), name='task' ) )

map を使用して同時実行制御で配列を処理します。

子コンテキスト

グループ化オペレーションの分離された実行コンテキストが作成されます。子コンテキストには独自のチェックポイントログがあり、複数のステップ、待機、その他のオペレーションが含める場合があります。SDK により、子コンテキスト全体が再試行および復旧の単一ユニットとして扱われます。

子コンテキストを使用して複雑なワークフローを整理したり、サブワークフローを実装したり、まとめて再試行するオペレーションを分離したりします。

TypeScript
const result = await context.runInChildContext( 'batch-processing', async (childCtx) => { return await processBatch(childCtx, items); } );
Python
result = context.run_in_child_context( lambda child_ctx: process_batch(child_ctx, items), name='batch-processing' )

再生メカニズムでは、耐久性のあるオペレーションが決定的な順序で実行されることが求められます。複数の子コンテキストを使用すると、複数の作業ストリームを同時に実行することができ、決定論は各コンテキスト内で個別に適用されます。複数の CPU コアを効率的に利用する高性能な関数を構築できます。

例えば、A および B という 2 つの子コンテキストを起動することを想定します。最初の呼び出しでは、コンテキスト内のステップは次の順序で実行され、「A」ステップは「B」ステップと同時に実行されます (A1、B1、B2、A2、A3)。再生すると、結果はチェックポイントログから取得されて、ステップが異なる順序 (B1、A1、A2、B2、A3) で発生するため、タイミングが大幅に速くなります。「A」ステップは正しい順序 (A1、A2、A3) で発生し、「B」ステップは正しい順序 (B1、B2) で発生したため、決定論の必要性は正しく満たされました。

条件付き待機

試行間に自動チェックポイントを適用した条件によるポーリング。SDK によってチェック関数が実行され、結果を含むチェックポイントが作成されて、戦略に従って待機して条件が満たされるまで繰り返されます。

TypeScript
const result = await context.waitForCondition( async (state, ctx) => { const status = await checkJobStatus(state.jobId); return { ...state, status }; }, { initialState: { jobId: 'job-123', status: 'pending' }, waitStrategy: (state) => state.status === 'completed' ? { shouldContinue: false } : { shouldContinue: true, delay: { seconds: 30 } } } );
Python
result = context.wait_for_condition( lambda state, ctx: check_job_status(state['jobId']), config=WaitForConditionConfig( initial_state={'jobId': 'job-123', 'status': 'pending'}, wait_strategy=lambda state, attempt: {'should_continue': False} if state['status'] == 'completed' else {'should_continue': True, 'delay': 30} ) )

外部システムのポーリング、リソース準備の待機、バックオフによる再試行の実装に waitForCondition を使用します。

関数の呼び出し

別の Lambda 関数を呼び出して、結果を待機します。SDK によってチェックポイントが作成され、ターゲット関数が呼び出され、呼び出しが完了すると関数が再開されます。関数の構成およびワークフローの分解を実現します。

TypeScript
const result = await context.invoke( 'invoke-processor', 'arn:aws:lambda:us-east-1:123456789012:function:processor', { data: inputData } );
Python
result = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:processor', {'data': input_data}, name='invoke-processor' )

耐久性のあるオペレーションの計測方法

DurableContext を通じて呼び出す耐久性のあるすべてのオペレーションによってチェックポイントが作成され、実行の進行状況が追跡されて状態データが保存されます。これらのオペレーションは使用状況に基づいて料金が発生し、チェックポイントにはデータの書き込みおよび保持コストに寄与するデータが含まれる場合があります。保存データには、呼び出しイベントデータ、ステップから返されたペイロード、コールバックの完了時に渡されたデータが含まれます。耐久性のあるオペレーションの計測方法を理解することで、実行コストを見積もってワークフローを最適化できます。料金の詳細については、「Lambda の料金ページ」を参照してください。

ペイロードサイズとは、耐久性のあるオペレーションが保持されるシリアル化されたデータのサイズを指します。データはバイト単位で測定され、サイズはオペレーションで使用されるシリアライザーによって異なります。オペレーションのペイロードは、正常な完了による結果自体であるか、オペレーションが失敗した場合はシリアル化されたエラーオブジェクトの可能性があります。

基本的なオペレーション

基本的なオペレーションは、耐久性のある関数の基礎的な構成要素です。

Operation チェックポイントのタイミング オペレーション数 継続したデータ
実行 Started 1 入力ペイロードのサイズ
実行 完了 (成功/失敗/停止) 0 出力ペイロードのサイズ
Step 再試行/成功/失敗 1 + N 回の再試行 各試行から返されるペイロードのサイズ
待ちます Started 1 該当なし
WaitForCondition 各ポーリング試行 1 + N ポーリング 各ポーリング試行から返されるペイロードサイズ
呼び出しレベルの再試行 Started 1 エラーオブジェクトのペイロード

コールバックオペレーション

コールバックオペレーションは、関数が一時停止して外部システムが入力するまで待てるようにします。これらのオペレーションは、コールバックの作成時および完了時にチェックポイントを作成します。

Operation チェックポイントのタイミング オペレーション数 継続したデータ
CreateCallback Started 1 該当なし
API コールによるコールバック完了 完了 0 コールバックペイロード
WaitForCallback Started 3 + N 回の再試行 (コンテキスト + コールバック + ステップ) 送信者ステップの試行によって返されるペイロード、ならびにコールバックペイロードの 2 つのコピー

複合オペレーション

複合オペレーションでは複数の耐久性のあるオペレーションを組み合わせて、並列実行、配列処理、ネストされたコンテキストなどの複雑な調整パターンが処理されます。

Operation チェックポイントのタイミング オペレーション数 継続したデータ
並行 Started 1 + N ブランチ (1 親コンテキスト + N 子コンテキスト) 各ブランチから返されたペイロードサイズの最大 2 コピー、ならびに各ブランチのステータス
マッピング Started 1 + N ブランチ (1 親コンテキスト + N 子コンテキスト) 各イテレーションから返されたペイロードサイズの最大 2 コピー、ならびに各イテレーションのステータス
約束ヘルパー 完了 1 約束から返されたペイロードサイズ
RunInChildContext 成功/失敗 1 子コンテキストから返されるペイロードサイズ

runInChildContext から、または複合オペレーションによって内部的に使用されるコンテキストなどの場合、256 KB 未満の結果は直接チェックポイントされます。大規模な結果は保存されずに、コンテキストのオペレーションが再処理されることでリプレイ中に再構築されます。