範例和使用案例 - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

範例和使用案例

Lambda 耐用函數可讓您使用步驟和等待等耐用操作來建置容錯的多步驟應用程式。透過自動檢查點和檢查點重播模型。 執行會在失敗後從頭開始重新啟動,但會略過已完成的檢查點,您的函數可以從失敗中復原並繼續執行,而不會遺失進度。

短期容錯程序

使用耐用的 函數來建置通常在幾分鐘內完成的可靠操作。雖然這些程序比長時間執行的工作流程短,但它們仍然受益於分散式系統的自動檢查點和容錯能力。耐用的 函數可確保您的多步驟程序即使在個別服務呼叫失敗時也能順利完成,而不需要複雜的錯誤處理或狀態管理程式碼。

常見案例包括飯店預訂系統、餐廳預訂平台、共乘行程請求、事件票證購買和 SaaS 訂閱升級。這些案例有共同的特性:多個必須一起完成的服務呼叫、在暫時性故障時自動重試的需求,以及跨分散式系統維持一致狀態的需求。

跨微型服務的分散式交易

使用自動轉返失敗,協調跨多項服務的付款、庫存和運送。每個服務操作都會包裝在步驟中,確保交易可以在服務失敗時從任何時間點復原。

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { orderId, amount, items } = event; // Reserve inventory across multiple warehouses const inventory = await context.step("reserve-inventory", async () => { return await inventoryService.reserve(items); }); // Process payment const payment = await context.step("process-payment", async () => { return await paymentService.charge(amount); }); // Create shipment const shipment = await context.step("create-shipment", async () => { return await shippingService.createShipment(orderId, inventory); }); return { orderId, status: 'completed', shipment }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution @durable_execution def lambda_handler(event, context: DurableContext): order_id = event['orderId'] amount = event['amount'] items = event['items'] # Reserve inventory across multiple warehouses inventory = context.step( lambda _: inventory_service.reserve(items), name='reserve-inventory' ) # Process payment payment = context.step( lambda _: payment_service.charge(amount), name='process-payment' ) # Create shipment shipment = context.step( lambda _: shipping_service.create_shipment(order_id, inventory), name='create-shipment' ) return {'orderId': order_id, 'status': 'completed', 'shipment': shipment}

如果任何步驟失敗,函數會自動從最後一個成功的檢查點重試。即使付款處理暫時失敗,庫存保留仍會持續存在。當函數重試時,它會略過已完成的庫存步驟,並直接進行付款處理。這可消除重複的保留,並確保分散式系統的狀態一致。

使用多個步驟進行訂單處理

透過驗證、付款授權、庫存分配和自動重試和復原履行來處理訂單。每個步驟都會經過檢查點,確保即使個別步驟失敗並重試,訂單也會進行。

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { orderId, customerId, items } = event; // Validate order details const validation = await context.step("validate-order", async () => { const customer = await customerService.validate(customerId); const itemsValid = await inventoryService.validateItems(items); return { customer, itemsValid }; }); if (!validation.itemsValid) { return { orderId, status: 'rejected', reason: 'invalid_items' }; } // Authorize payment const authorization = await context.step("authorize-payment", async () => { return await paymentService.authorize( validation.customer.paymentMethod, calculateTotal(items) ); }); // Allocate inventory const allocation = await context.step("allocate-inventory", async () => { return await inventoryService.allocate(items); }); // Fulfill order const fulfillment = await context.step("fulfill-order", async () => { return await fulfillmentService.createShipment({ orderId, items: allocation.allocatedItems, address: validation.customer.shippingAddress }); }); return { orderId, status: 'completed', trackingNumber: fulfillment.trackingNumber }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution @durable_execution def lambda_handler(event, context: DurableContext): order_id = event['orderId'] customer_id = event['customerId'] items = event['items'] # Validate order details def validate_order(_): customer = customer_service.validate(customer_id) items_valid = inventory_service.validate_items(items) return {'customer': customer, 'itemsValid': items_valid} validation = context.step(validate_order, name='validate-order') if not validation['itemsValid']: return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_items'} # Authorize payment authorization = context.step( lambda _: payment_service.authorize( validation['customer']['paymentMethod'], calculate_total(items) ), name='authorize-payment' ) # Allocate inventory allocation = context.step( lambda _: inventory_service.allocate(items), name='allocate-inventory' ) # Fulfill order fulfillment = context.step( lambda _: fulfillment_service.create_shipment({ 'orderId': order_id, 'items': allocation['allocatedItems'], 'address': validation['customer']['shippingAddress'] }), name='fulfill-order' ) return { 'orderId': order_id, 'status': 'completed', 'trackingNumber': fulfillment['trackingNumber'] }

此模式可確保訂單永遠不會卡在中繼狀態。如果驗證失敗,則會在付款授權之前拒絕訂單。如果付款授權失敗,則不會配置庫存。每個步驟都以上一個步驟為基礎,並具有自動重試和復原。

注意

條件式檢查if (!validation.itemsValid)位於步驟之外,並在重播期間重新執行。這是安全的,因為它是確定性的,它永遠會在相同的驗證物件下產生相同的結果。

長時間執行的程序

對跨越數小時、數天或數週的程序使用耐久的函數。等待操作會暫停執行,而不會產生運算費用,讓長時間執行的程序符合成本效益。在等待期間,您的函數會停止執行,Lambda 會回收執行環境。恢復時,Lambda 會再次叫用您的函數,並從最後一個檢查點重播。

此執行模型讓耐用的函數非常適合需要長時間暫停的程序,無論是等待人工決策、外部系統回應、排程處理時段還是時間型延遲。您只需支付作用中的運算時間,而非等待。

常見案例包括文件核准程序、排定的批次處理、多日加入程序、訂閱試用程序,以及延遲通知系統。這些案例有共同的特性:以小時或天為單位測量的延長等待期、在這些等待期間維持執行狀態的需求,以及支付閒置運算時間的成本敏感要求是禁止的。

Human-in-the-loop核准

暫停執行文件檢閱、核准或決策,同時維持執行狀態。函數會等待外部回呼而不耗用資源,在收到核准時自動繼續。

此模式對於需要人工判斷或外部驗證的程序至關重要。函數會在回呼點暫停,等待時不會產生運算費用。當有人透過 API 提交其決策時,Lambda 會再次調用您的函數並從檢查點重播,並繼續執行核准結果。

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { documentId, reviewers } = event; // Step 1: Prepare document for review const prepared = await context.step("prepare-document", async () => { return await documentService.prepare(documentId); }); // Step 2: Request approval with callback const approval = await context.waitForCallback( "approval-callback", async (callbackId) => { await notificationService.sendApprovalRequest({ documentId, reviewers, callbackId, expiresIn: 86400 }); }, { timeout: { seconds: 86400 } } ); // Function resumes here when approval is received if (approval?.approved) { const finalized = await context.step("finalize-document", async () => { return await documentService.finalize(documentId, approval.comments); }); return { status: 'approved', documentId, finalizedAt: finalized.timestamp }; } // Handle rejection await context.step("archive-rejected", async () => { await documentService.archive(documentId, approval?.reason); }); return { status: 'rejected', documentId, reason: approval?.reason }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig @durable_execution def lambda_handler(event, context: DurableContext): document_id = event['documentId'] reviewers = event['reviewers'] # Step 1: Prepare document for review prepared = context.step( lambda _: document_service.prepare(document_id), name='prepare-document' ) # Step 2: Request approval with callback def send_approval_request(callback_id): notification_service.send_approval_request({ 'documentId': document_id, 'reviewers': reviewers, 'callbackId': callback_id, 'expiresIn': 86400 }) approval = context.wait_for_callback( send_approval_request, name='approval-callback', config=WaitConfig(timeout=86400) ) # Function resumes here when approval is received if approval and approval.get('approved'): finalized = context.step( lambda _: document_service.finalize(document_id, approval.get('comments')), name='finalize-document' ) return { 'status': 'approved', 'documentId': document_id, 'finalizedAt': finalized['timestamp'] } # Handle rejection context.step( lambda _: document_service.archive(document_id, approval.get('reason') if approval else None), name='archive-rejected' ) return { 'status': 'rejected', 'documentId': document_id, 'reason': approval.get('reason') if approval else None }

收到回呼且您的函數恢復時,它會從頭開始重播。prepare-document 步驟會立即傳回其檢查點結果。waitForCallback 操作也會立即傳回預存核准結果,而不是再次等待。接著執行會繼續完成或封存步驟。

多階段資料管道

透過擷取、轉換和載入階段處理大型資料集,並在階段之間使用檢查點。每個階段可能需要數小時才能完成,檢查點可確保管道可以在中斷時從任何階段繼續。

此模式非常適合 ETL 工作流程、資料遷移或批次處理任務,而您需要在其中使用復原點分階段處理資料。如果階段失敗,管道會從上次完成的階段繼續,而不是從頭開始重新啟動。您也可以使用等待操作在階段之間暫停;遵守速率限制、等待下游系統就緒,或在離峰時間排程處理。

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { datasetId, batchSize } = event; // Stage 1: Extract data from source const extracted = await context.step("extract-data", async () => { const records = await sourceDatabase.extractRecords(datasetId); return { recordCount: records.length, records }; }); // Wait 5 minutes to respect source system rate limits await context.wait({ seconds: 300 }); // Stage 2: Transform data in batches const transformed = await context.step("transform-data", async () => { const batches = chunkArray(extracted.records, batchSize); const results = []; for (const batch of batches) { const transformed = await transformService.processBatch(batch); results.push(transformed); } return { batchCount: batches.length, results }; }); // Wait until off-peak hours (e.g., 2 AM) const now = new Date(); const targetHour = 2; const msUntilTarget = calculateMsUntilHour(now, targetHour); await context.wait({ seconds: Math.floor(msUntilTarget / 1000) }); // Stage 3: Load data to destination const loaded = await context.step("load-data", async () => { let loadedCount = 0; for (const result of transformed.results) { await destinationDatabase.loadBatch(result); loadedCount += result.length; } return { loadedCount }; }); // Stage 4: Verify and finalize const verified = await context.step("verify-pipeline", async () => { const verification = await destinationDatabase.verifyRecords(datasetId); await pipelineService.markComplete(datasetId, verification); return verification; }); return { datasetId, recordsProcessed: extracted.recordCount, batchesProcessed: transformed.batchCount, recordsLoaded: loaded.loadedCount, verified: verified.success }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution from datetime import datetime @durable_execution def lambda_handler(event, context: DurableContext): dataset_id = event['datasetId'] batch_size = event['batchSize'] # Stage 1: Extract data from source def extract_data(_): records = source_database.extract_records(dataset_id) return {'recordCount': len(records), 'records': records} extracted = context.step(extract_data, name='extract-data') # Wait 5 minutes to respect source system rate limits context.wait(300) # Stage 2: Transform data in batches def transform_data(_): batches = chunk_array(extracted['records'], batch_size) results = [] for batch in batches: transformed = transform_service.process_batch(batch) results.append(transformed) return {'batchCount': len(batches), 'results': results} transformed = context.step(transform_data, name='transform-data') # Wait until off-peak hours (e.g., 2 AM) now = datetime.now() target_hour = 2 ms_until_target = calculate_ms_until_hour(now, target_hour) context.wait(ms_until_target // 1000) # Stage 3: Load data to destination def load_data(_): loaded_count = 0 for result in transformed['results']: destination_database.load_batch(result) loaded_count += len(result) return {'loadedCount': loaded_count} loaded = context.step(load_data, name='load-data') # Stage 4: Verify and finalize def verify_pipeline(_): verification = destination_database.verify_records(dataset_id) pipeline_service.mark_complete(dataset_id, verification) return verification verified = context.step(verify_pipeline, name='verify-pipeline') return { 'datasetId': dataset_id, 'recordsProcessed': extracted['recordCount'], 'batchesProcessed': transformed['batchCount'], 'recordsLoaded': loaded['loadedCount'], 'verified': verified['success'] }

每個階段都會包裝在步驟中,建立檢查點,允許管道在中斷時從任何階段繼續。擷取和轉換之間的 5 分鐘等待會遵守來源系統速率限制,而不會耗用運算資源,而等到上午 2 點才會在離峰時間排程昂貴的負載操作。

注意

new Date() 呼叫和calculateMsUntilHour()函數位於步驟之外,並在重播期間重新執行。對於必須在重播之間保持一致的時間型操作,請計算步驟內的時間戳記,或僅將其用於等待持續時間 (檢查點)。

跨 函數的鏈結調用

使用 從耐用函數內叫用其他 Lambda 函數context.invoke()。呼叫函數會在等待調用函數完成時暫停,建立保留結果的檢查點。如果呼叫函數在調用函數完成後中斷,它會以儲存的結果繼續,而不會重新調用函數。

當您有處理特定網域 (客戶驗證、付款處理、庫存管理) 且需要在工作流程中協調它們的特殊 函數時,請使用此模式。每個函數都會維護自己的邏輯,並可由多個協調器函數叫用,避免程式碼重複。

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; // Main orchestrator function export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { orderId, customerId } = event; // Step 1: Validate customer by invoking customer service function const customer = await context.invoke( "validate-customer", "arn:aws:lambda:us-east-1:123456789012:function:customer-service:1", { customerId } ); if (!customer.isValid) { return { orderId, status: "rejected", reason: "invalid_customer" }; } // Step 2: Check inventory by invoking inventory service function const inventory = await context.invoke( "check-inventory", "arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1", { orderId, items: event.items } ); if (!inventory.available) { return { orderId, status: "rejected", reason: "insufficient_inventory" }; } // Step 3: Process payment by invoking payment service function const payment = await context.invoke( "process-payment", "arn:aws:lambda:us-east-1:123456789012:function:payment-service:1", { customerId, amount: inventory.totalAmount, paymentMethod: customer.paymentMethod } ); // Step 4: Create shipment by invoking fulfillment service function const shipment = await context.invoke( "create-shipment", "arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1", { orderId, items: inventory.allocatedItems, address: customer.shippingAddress } ); return { orderId, status: "completed", trackingNumber: shipment.trackingNumber, estimatedDelivery: shipment.estimatedDelivery }; } );
Python
from aws_durable_execution_sdk_python import DurableContext, durable_execution # Main orchestrator function @durable_execution def lambda_handler(event, context: DurableContext): order_id = event['orderId'] customer_id = event['customerId'] # Step 1: Validate customer by invoking customer service function customer = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:customer-service:1', {'customerId': customer_id}, name='validate-customer' ) if not customer['isValid']: return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_customer'} # Step 2: Check inventory by invoking inventory service function inventory = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1', {'orderId': order_id, 'items': event['items']}, name='check-inventory' ) if not inventory['available']: return {'orderId': order_id, 'status': 'rejected', 'reason': 'insufficient_inventory'} # Step 3: Process payment by invoking payment service function payment = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:payment-service:1', { 'customerId': customer_id, 'amount': inventory['totalAmount'], 'paymentMethod': customer['paymentMethod'] }, name='process-payment' ) # Step 4: Create shipment by invoking fulfillment service function shipment = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1', { 'orderId': order_id, 'items': inventory['allocatedItems'], 'address': customer['shippingAddress'] }, name='create-shipment' ) return { 'orderId': order_id, 'status': 'completed', 'trackingNumber': shipment['trackingNumber'], 'estimatedDelivery': shipment['estimatedDelivery'] }

每個調用都會在協調器函數中建立檢查點。如果協調器在客戶驗證完成後中斷,它會使用儲存的客戶資料從該檢查點繼續,略過驗證調用。這可防止對下游服務的重複呼叫,並確保中斷之間的執行一致。

調用函數可以是耐用的函數或標準 Lambda 函數。如果您叫用耐用的 函數,它可以擁有自己的多步驟工作流程,其中包含等待和檢查點。協調器只會等待完整耐久執行完成,並收到最終結果。

注意

不支援跨帳戶調用。所有調用函數必須與呼叫函數位於相同的 AWS 帳戶中。

進階模式

使用持久性函數來建置複雜的多步驟應用程式,結合多個持久性操作、平行執行、陣列處理、條件式邏輯和輪詢。這些模式可讓您建置複雜的應用程式來協調許多任務,同時維持容錯能力和自動復原。

進階模式超過簡單的循序步驟。您可以與 同時執行操作parallel()、使用 處理陣列map()、使用 等待外部條件waitForCondition(),並結合這些基本概念來建置可靠的應用程式。每個耐用的操作都會建立自己的檢查點,因此您的應用程式可以在中斷時從任何時間點復原。

使用者加入程序

引導使用者完成註冊、電子郵件驗證、設定檔設定,以及具有重試處理的初始組態。此範例結合了循序步驟、回呼和條件式邏輯,以建立完整的加入程序。

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { userId, email } = event; // Step 1: Create user account const user = await context.step("create-account", async () => { return await userService.createAccount(userId, email); }); // Step 2: Send verification email await context.step("send-verification", async () => { return await emailService.sendVerification(email); }); // Step 3: Wait for email verification (up to 48 hours) const verified = await context.waitForCallback( "email-verification", async (callbackId) => { await notificationService.sendVerificationLink({ email, callbackId, expiresIn: 172800 }); }, { timeout: { seconds: 172800 } } ); if (!verified) { await context.step("send-reminder", async () => { await emailService.sendReminder(email); }); return { status: "verification_timeout", userId, message: "Email verification not completed within 48 hours" }; } // Step 4: Initialize user profile in parallel const setupResults = await context.parallel("profile-setup", [ async (ctx: DurableContext) => { return await ctx.step("create-preferences", async () => { return await preferencesService.createDefaults(userId); }); }, async (ctx: DurableContext) => { return await ctx.step("setup-notifications", async () => { return await notificationService.setupDefaults(userId); }); }, async (ctx: DurableContext) => { return await ctx.step("create-welcome-content", async () => { return await contentService.createWelcome(userId); }); } ]); // Step 5: Send welcome email await context.step("send-welcome", async () => { const [preferences, notifications, content] = setupResults.getResults(); return await emailService.sendWelcome({ email, preferences, notifications, content }); }); return { status: "onboarding_complete", userId, completedAt: new Date().toISOString() }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig from datetime import datetime @durable_execution def lambda_handler(event, context: DurableContext): user_id = event['userId'] email = event['email'] # Step 1: Create user account user = context.step( lambda _: user_service.create_account(user_id, email), name='create-account' ) # Step 2: Send verification email context.step( lambda _: email_service.send_verification(email), name='send-verification' ) # Step 3: Wait for email verification (up to 48 hours) def send_verification_link(callback_id): notification_service.send_verification_link({ 'email': email, 'callbackId': callback_id, 'expiresIn': 172800 }) verified = context.wait_for_callback( send_verification_link, name='email-verification', config=WaitConfig(timeout=172800) ) if not verified: context.step( lambda _: email_service.send_reminder(email), name='send-reminder' ) return { 'status': 'verification_timeout', 'userId': user_id, 'message': 'Email verification not completed within 48 hours' } # Step 4: Initialize user profile in parallel def create_preferences(ctx: DurableContext): return ctx.step( lambda _: preferences_service.create_defaults(user_id), name='create-preferences' ) def setup_notifications(ctx: DurableContext): return ctx.step( lambda _: notification_service.setup_defaults(user_id), name='setup-notifications' ) def create_welcome_content(ctx: DurableContext): return ctx.step( lambda _: content_service.create_welcome(user_id), name='create-welcome-content' ) setup_results = context.parallel( [create_preferences, setup_notifications, create_welcome_content], name='profile-setup' ) # Step 5: Send welcome email def send_welcome(_): results = setup_results.get_results() preferences, notifications, content = results[0], results[1], results[2] return email_service.send_welcome({ 'email': email, 'preferences': preferences, 'notifications': notifications, 'content': content }) context.step(send_welcome, name='send-welcome') return { 'status': 'onboarding_complete', 'userId': user_id, 'completedAt': datetime.now().isoformat() }

程序結合了循序步驟與用於建立帳戶和電子郵件傳送的檢查點,然後暫停最多 48 小時等待電子郵件驗證,而不會耗用資源。條件式邏輯會根據驗證是否完成或逾時來處理不同的路徑。設定檔設定任務會使用平行操作同時執行,以減少總執行時間,而且每個步驟都會在暫時性失敗時自動重試,以確保上線可靠地完成。

使用檢查點進行批次處理

處理數百萬筆記錄,並在失敗後從最後一個成功的檢查點自動復原。此範例示範耐用的函數如何將map()操作與區塊化和速率限制結合,以處理大規模的資料處理。

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; interface Batch { batchIndex: number; recordIds: string[]; } export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { datasetId, batchSize = 1000 } = event; // Step 1: Get all record IDs to process const recordIds = await context.step("fetch-record-ids", async () => { return await dataService.getRecordIds(datasetId); }); // Step 2: Split into batches const batches: Batch[] = []; for (let i = 0; i < recordIds.length; i += batchSize) { batches.push({ batchIndex: Math.floor(i / batchSize), recordIds: recordIds.slice(i, i + batchSize) }); } // Step 3: Process batches with controlled concurrency const batchResults = await context.map( "process-batches", batches, async (ctx: DurableContext, batch: Batch, index: number) => { const processed = await ctx.step(`batch-${batch.batchIndex}`, async () => { const results = []; for (const recordId of batch.recordIds) { const result = await recordService.process(recordId); results.push(result); } return results; }); const validated = await ctx.step(`validate-${batch.batchIndex}`, async () => { return await validationService.validateBatch(processed); }); return { batchIndex: batch.batchIndex, recordCount: batch.recordIds.length, successCount: validated.successCount, failureCount: validated.failureCount }; }, { maxConcurrency: 5 } ); // Step 4: Aggregate results const summary = await context.step("aggregate-results", async () => { const results = batchResults.getResults(); const totalSuccess = results.reduce((sum, r) => sum + r.successCount, 0); const totalFailure = results.reduce((sum, r) => sum + r.failureCount, 0); return { datasetId, totalRecords: recordIds.length, batchesProcessed: batches.length, successCount: totalSuccess, failureCount: totalFailure, completedAt: new Date().toISOString() }; }); return summary; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution, MapConfig from datetime import datetime from typing import List, Dict @durable_execution def lambda_handler(event, context: DurableContext): dataset_id = event['datasetId'] batch_size = event.get('batchSize', 1000) # Step 1: Get all record IDs to process record_ids = context.step( lambda _: data_service.get_record_ids(dataset_id), name='fetch-record-ids' ) # Step 2: Split into batches batches = [] for i in range(0, len(record_ids), batch_size): batches.append({ 'batchIndex': i // batch_size, 'recordIds': record_ids[i:i + batch_size] }) # Step 3: Process batches with controlled concurrency def process_batch(ctx: DurableContext, batch: Dict, index: int): batch_index = batch['batchIndex'] def process_records(_): results = [] for record_id in batch['recordIds']: result = record_service.process(record_id) results.append(result) return results processed = ctx.step(process_records, name=f'batch-{batch_index}') validated = ctx.step( lambda _: validation_service.validate_batch(processed), name=f'validate-{batch_index}' ) return { 'batchIndex': batch_index, 'recordCount': len(batch['recordIds']), 'successCount': validated['successCount'], 'failureCount': validated['failureCount'] } batch_results = context.map( process_batch, batches, name='process-batches', config=MapConfig(max_concurrency=5) ) # Step 4: Aggregate results def aggregate_results(_): results = batch_results.get_results() total_success = sum(r['successCount'] for r in results) total_failure = sum(r['failureCount'] for r in results) return { 'datasetId': dataset_id, 'totalRecords': len(record_ids), 'batchesProcessed': len(batches), 'successCount': total_success, 'failureCount': total_failure, 'completedAt': datetime.now().isoformat() } summary = context.step(aggregate_results, name='aggregate-results') return summary

記錄會分割為可管理的批次,以避免記憶體或下游服務過載,然後多個批次程序同時maxConcurrency控制平行處理。每個批次都有自己的檢查點,因此失敗只會重試失敗的批次,而不是重新處理所有記錄。此模式非常適合處理可能需要數小時的 ETL 任務、資料遷移或大量操作。

後續步驟