本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
耐用的執行 SDK
耐用執行 SDK 是建置耐用函數的基礎。它提供檢查點進度、處理重試和管理執行流程所需的基本概念。SDK 可抽象化檢查點管理和重播的複雜性,讓您撰寫可自動容錯的循序程式碼。
開發套件適用於 JavaScript、TypeScript、Python 和 Java (預覽版)。如需完整的 API 文件和範例,請參閱 GitHub 上的 JavaScript/TypeScript SDK、Python SDK 和 Java SDK。
DurableContext
開發套件為您的函數提供公開所有耐久操作的DurableContext物件。此內容會取代標準 Lambda 內容,並提供建立檢查點、管理執行流程以及與外部系統協調的方法。
若要使用 SDK,請使用耐用的執行包裝函式來包裝 Lambda 處理常式:
- TypeScript
-
import { withDurableExecution, DurableContext } from '@aws/durable-execution-sdk-js';
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
// Your function receives DurableContext instead of Lambda context
// Use context.step(), context.wait(), etc.
return result;
}
);
- Python
-
from aws_durable_execution_sdk_python import durable_execution, DurableContext
@durable_execution
def handler(event: dict, context: DurableContext):
# Your function receives DurableContext
# Use context.step(), context.wait(), etc.
return result
- Java (Preview)
-
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
public class Handler extends DurableHandler<Object, String> {
@Override
public String handleRequest(Object input, DurableContext context) {
// Your function receives DurableContext
// Use context.step(), context.wait(), etc.
return result;
}
}
包裝函式會攔截函數叫用、載入任何現有的檢查點日誌,並提供DurableContext管理重播和檢查點的 。
開發套件的功能
開發套件會處理三個關鍵責任,以實現持久的執行:
檢查點管理:軟體開發套件會在函數執行耐久操作時自動建立檢查點。每個檢查點都會記錄操作類型、輸入和結果。函數完成步驟時,SDK 會保留檢查點再繼續。這可確保函數可以在中斷時從任何已完成的操作恢復。
重播協調:當函數在暫停或中斷後恢復時,軟體開發套件會執行重播。它會從頭開始執行程式碼,但會略過已完成的操作,使用儲存的檢查點結果,而不是重新執行它們。SDK 可確保重播是確定性的,因為提供相同的輸入和檢查點日誌,您的函數會產生相同的結果。
狀態隔離:SDK 會與您的業務邏輯分開維護執行狀態。每個持久性執行都有自己的檢查點日誌,其他執行無法存取。SDK 會加密靜態檢查點資料,並確保狀態在重播之間保持一致。
檢查點的運作方式
當您呼叫耐用操作時,軟體開發套件會遵循以下順序:
檢查現有檢查點:軟體開發套件會檢查此操作是否已在先前的調用中完成。如果檢查點存在,軟體開發套件會傳回儲存的結果,而無需重新執行操作。
執行操作:如果沒有檢查點,則 SDK 會執行您的操作程式碼。對於步驟,這表示呼叫您的 函數。對於等待,這表示排程恢復。
建立檢查點:操作完成後,開發套件會序列化結果並建立檢查點。檢查點包含操作類型、名稱、輸入、結果和時間戳記。
持久性檢查點: SDK 會呼叫 Lambda 檢查點 API 來持久性檢查點。這可確保檢查點在繼續執行之前是耐用的。
傳回結果:軟體開發套件會將操作結果傳回至您的程式碼,繼續下一個操作。
此序列可確保一旦操作完成,就會安全地存放其結果。如果您的函數在任何時間點中斷,開發套件可以重新執行到最後完成的檢查點。
重播行為
當您的函數在暫停或中斷後繼續時,軟體開發套件會執行重播:
載入檢查點日誌: SDK 會從 Lambda 擷取此執行的檢查點日誌。
從頭開始執行:軟體開發套件會從頭調用您的處理常式函數,而不是從暫停的位置調用。
-
略過已完成的持久性操作:當程式碼呼叫持久性操作時,軟體開發套件會根據檢查點日誌檢查每個操作。對於已完成的耐久操作,軟體開發套件會傳回預存結果,而不執行操作程式碼。
如果子內容的結果大於檢查點大小上限 (256 KB),則在重播期間會再次執行內容的程式碼。這可讓您從在內容中執行的耐久操作建構大型結果,這些操作將從檢查點日誌中查詢。因此,只能在內容本身中執行確定性程式碼。使用具有大型結果的子內容時,最佳實務是在步驟內執行長時間執行或非確定性工作,並且只執行將結果合併到內容本身的短期執行任務。
在中斷點繼續:當開發套件在沒有檢查點的情況下達到 操作時,它會正常執行,並在耐久操作完成時建立新的檢查點。
此重播機制需要您的程式碼具有決定性。假設輸入和檢查點日誌相同,您的函數必須進行相同序列的耐久操作呼叫。SDK 透過驗證操作名稱和類型在重播期間符合檢查點日誌來強制執行此操作。
可用的耐用操作
DurableContext 提供不同協調模式的操作。每個耐用的操作都會自動建立檢查點,確保您的函數可以隨時恢復。
步驟
使用自動檢查點和重試來執行商業邏輯。對呼叫外部服務、執行計算或執行任何應檢查點邏輯的操作使用步驟。軟體開發套件會在步驟前後建立檢查點,存放結果以進行重播。
- TypeScript
-
const result = await context.step('process-payment', async () => {
return await paymentService.charge(amount);
});
- Python
-
result = context.step(
lambda _: payment_service.charge(amount),
name='process-payment'
)
- Java (Preview)
-
var result = context.step("process-payment", Payment.class,
() -> paymentService.charge(amount)
);
步驟支援可設定的重試策略、執行語意 (at-most-once或at-least-once和自訂序列化。
等待
在指定的持續時間內暫停執行,而不耗用運算資源。SDK 會建立檢查點、終止函數叫用,以及排程恢復。當等待完成時,Lambda 會再次叫用您的函數,而 SDK 會在繼續之前重播至等待點。
- TypeScript
-
// Wait 1 hour without charges
await context.wait({ seconds: 3600 });
- Python
-
# Wait 1 hour without charges
context.wait(3600)
- Java (Preview)
-
// Wait 1 hour without charges
context.wait(Duration.ofHours(1));
回呼
回呼可讓函數暫停並等待外部系統提供輸入。當您建立回呼時,開發套件會產生唯一的回呼 ID 並建立檢查點。然後,您的函數會暫停 (終止調用),而不會產生運算費用。外部系統使用 SendDurableExecutionCallbackSuccess或 SendDurableExecutionCallbackFailure Lambda APIs提交回呼結果。提交回呼時,Lambda 會再次叫用您的函數,開發套件會重播至回呼點,而函數會繼續回呼結果。
開發套件提供兩種使用回呼的方法:
createCallback:建立回呼,並同時傳回 promise 和回呼 ID。您可以將回呼 ID 傳送至外部系統,該系統會使用 Lambda API 提交結果。
- TypeScript
-
const [promise, callbackId] = await context.createCallback('approval', {
timeout: { hours: 24 }
});
await sendApprovalRequest(callbackId, requestData);
const approval = await promise;
- Python
-
callback = context.create_callback(
name='approval',
config=CallbackConfig(timeout_seconds=86400)
)
context.step(
lambda _: send_approval_request(callback.callback_id),
name='send_request'
)
approval = callback.result()
- Java (Preview)
-
var config = CallbackConfig.builder(Duration.ofHours(24)).timeout()
var callback = context.createCallback("approval", String.class, config);
context.step("send-request", String.class, () -> {
notificationService.sendApprovalRequest(callback.callbackId(), requestData);
return "request-sent";
});
// Blocks until the callback finishes or times out
String approval = callback.get();
waitForCallback:將回呼建立和提交結合在一個操作中,簡化回呼處理。開發套件會建立回呼、使用回呼 ID 執行您的提交者函數,並等待結果。
- TypeScript
-
const result = await context.waitForCallback(
'external-api',
async (callbackId, ctx) => {
await submitToExternalAPI(callbackId, requestData);
},
{ timeout: { minutes: 30 } }
);
- Python
-
result = context.wait_for_callback(
lambda callback_id: submit_to_external_api(callback_id, request_data),
name='external-api',
config=WaitForCallbackConfig(timeout_seconds=1800)
)
- Java (Preview)
-
waitForCallback 仍在開發 Java。
設定逾時以防止函數無限期等待。如果回呼逾時,開發套件會擲回 CallbackError,而您的函數可以處理逾時案例。針對長時間執行的回呼使用活動訊號逾時,以偵測外部系統何時停止回應。
將回呼用於human-in-the-loop工作流程、外部系統整合、Webhook 回應或任何執行必須針對外部輸入暫停的情況。
平行執行
與選用並行控制同時執行多個操作。SDK 會管理平行執行、為每個操作建立檢查點,以及根據您的完成政策處理失敗。
- TypeScript
-
const results = await context.parallel([
async (ctx) => ctx.step('task1', async () => processTask1()),
async (ctx) => ctx.step('task2', async () => processTask2()),
async (ctx) => ctx.step('task3', async () => processTask3())
]);
- Python
-
results = context.parallel(
lambda ctx: ctx.step(lambda _: process_task1(), name='task1'),
lambda ctx: ctx.step(lambda _: process_task2(), name='task2'),
lambda ctx: ctx.step(lambda _: process_task3(), name='task3')
)
- Java (Preview)
-
平行處理仍在開發 Java。
使用 parallel 可同時執行獨立操作。
Map
使用選用並行控制,同時對陣列中的每個項目執行 操作。SDK 會管理並行執行、為每個操作建立檢查點,以及根據您的完成政策處理失敗。
- TypeScript
-
const results = await context.map(itemArray, async (ctx, item, index) =>
ctx.step('task', async () => processItem(item, index))
);
- Python
-
results = context.map(
item_array,
lambda ctx, item, index: ctx.step(
lambda _: process_item(item, index),
name='task'
)
)
- Java (Preview)
-
地圖仍在開發 Java。
使用 map 處理具有並行控制的陣列。
子內容
建立用於分組操作的隔離執行內容。子內容有自己的檢查點日誌,可包含多個步驟、等待和其他操作。SDK 會將整個子內容視為單一單位,以供重試和復原。
使用子內容來組織複雜的工作流程、實作子工作流程,或隔離應同時重試的操作。
- TypeScript
-
const result = await context.runInChildContext(
'batch-processing',
async (childCtx) => {
return await processBatch(childCtx, items);
}
);
- Python
-
result = context.run_in_child_context(
lambda child_ctx: process_batch(child_ctx, items),
name='batch-processing'
)
- Java (Preview)
-
var result = context.runInChildContext(
"batch-processing",
String.class,
childCtx -> process_batch(childCtx, items)
);
重播機制要求以決定性順序進行持久性操作。使用多個子內容,您可以同時執行多個工作串流,而且決定性會分別套用在每個內容中。這可讓您建置高效能函數,以有效率地利用多個 CPU 核心。
例如,假設我們啟動兩個子內容 A 和 B。在初始調用時,內容中的步驟會依此順序執行,而 'A' 步驟會與 'B' 步驟同時執行:A1, B1, B2, A2, A3。重新執行時,從檢查點日誌擷取結果時,時間會快得多,而步驟的發生順序也不同:B1, A1, A2, B2, A3。由於「A」步驟的順序正確 (A1、A2, A3),且「B」步驟的順序正確 (B1、B2),因此正確滿足確定性的需求。
條件式等待
輪詢在嘗試之間具有自動檢查點的條件。SDK 會執行您的檢查函數、建立具有結果的檢查點、根據您的策略等待,並重複直到滿足條件為止。
- TypeScript
-
const result = await context.waitForCondition(
async (state, ctx) => {
const status = await checkJobStatus(state.jobId);
return { ...state, status };
},
{
initialState: { jobId: 'job-123', status: 'pending' },
waitStrategy: (state) =>
state.status === 'completed'
? { shouldContinue: false }
: { shouldContinue: true, delay: { seconds: 30 } }
}
);
- Python
-
result = context.wait_for_condition(
lambda state, ctx: check_job_status(state['jobId']),
config=WaitForConditionConfig(
initial_state={'jobId': 'job-123', 'status': 'pending'},
wait_strategy=lambda state, attempt:
{'should_continue': False} if state['status'] == 'completed'
else {'should_continue': True, 'delay': 30}
)
)
- Java (Preview)
-
waitForCondition 仍在開發 Java。
使用 waitForCondition輪詢外部系統、等待資源準備就緒,或使用退避實作重試。
函數調用
叫用另一個 Lambda 函數並等待其結果。SDK 會建立檢查點、叫用目標函數,並在叫用完成時繼續您的函數。這可啟用函數合成和工作流程分解。
- TypeScript
-
const result = await context.invoke(
'invoke-processor',
'arn:aws:lambda:us-east-1:123456789012:function:processor:1',
{ data: inputData }
);
- Python
-
result = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:processor:1',
{'data': input_data},
name='invoke-processor'
)
- Java (Preview)
-
var result = context.invoke(
"invoke-processor",
"arn:aws:lambda:us-east-1:123456789012:function:processor:1",
inputData,
Result.class,
InvokeConfig.builder().build()
);
如何計量耐久性操作
您透過 呼叫的每個耐用操作都會DurableContext建立檢查點,以追蹤執行進度並存放狀態資料。這些操作會根據其用量產生費用,而檢查點可能包含對您的資料寫入和保留成本有所貢獻的資料。儲存的資料包括調用事件資料、步驟傳回的承載,以及完成回呼時傳遞的資料。了解耐用操作的計量方式可協助您預估執行成本並最佳化工作流程。如需定價的詳細資訊,請參閱 Lambda 定價頁面。
承載大小是指持久性操作持續的序列化資料大小。資料是以位元組為單位測量,大小可能會因 操作使用的序列化程式而有所不同。操作的承載可能是成功完成的結果本身,如果操作失敗,可能是序列化錯誤物件。
基本操作
基本操作是耐用函數的基本建置區塊:
| 作業 |
檢查點計時 |
操作數量 |
資料持續存在 |
| 執行 |
已開始 |
1 |
輸入承載大小 |
| 執行 |
已完成 Succeeded/Failed/Stopped) |
0 |
輸出承載大小 |
| Step (步驟) |
Retry/Succeeded/Failed |
1 + N 次重試 |
每次嘗試傳回的承載大小 |
| 等候 |
已開始 |
1 |
N/A |
| WaitForCondition |
每次輪詢嘗試 |
1 + N 輪詢 |
每次輪詢嘗試傳回的承載大小 |
| 調用層級重試 |
已開始 |
1 |
錯誤物件的承載 |
回呼操作
回呼操作可讓您的函數暫停並等待外部系統提供輸入。這些操作會在建立回呼和完成時建立檢查點:
| 作業 |
檢查點計時 |
操作數量 |
資料持續存在 |
| CreateCallback |
已開始 |
1 |
N/A |
| 透過 API 呼叫完成回呼 |
已完成 |
0 |
回呼承載 |
| WaitForCallback |
已開始 |
3 + N 次重試 (內容 + 回呼 + 步驟) |
提交者步驟嘗試傳回的承載,加上兩個回呼承載副本 |
複合操作
複合操作結合多個耐久操作來處理複雜的協調模式,例如平行執行、陣列處理和巢狀內容:
| 作業 |
檢查點計時 |
操作數量 |
資料持續存在 |
| 平行 |
已開始 |
1 + N 個分支 (1 個父內容 + N 個子內容) |
每個分支最多兩個傳回承載大小的副本,以及每個分支的狀態 |
| Map |
已開始 |
1 + N 個分支 (1 個父內容 + N 個子內容) |
每次反覆運算傳回的承載大小最多兩個副本,加上每次反覆運算的狀態 |
| Promise 協助程式 |
已完成 |
1 |
從 promise 傳回的承載大小 |
| RunInChildContext |
成功/失敗 |
1 |
從子內容傳回的承載大小 |
對於內容,例如來自 runInChildContext或由複合操作內部使用,小於 256 KB 的結果會直接檢查點。不會儲存較大的結果,而是透過重新處理內容的操作,在重播期間重建結果。