Beispiele und Anwendungsfälle - AWS Lambda

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Beispiele und Anwendungsfälle

Mit den dauerhaften Lambda-Funktionen können Sie fehlertolerante, mehrstufige Anwendungen mit dauerhaften Vorgängen wie Steps und Waits erstellen. Mit automatischem Checkpointing und einem Checkpoint-Replay-Modell, bei dem die Ausführung nach einem Ausfall wieder von vorne beginnt, abgeschlossene Checkpoints jedoch überspringt, können Ihre Funktionen nach Ausfällen wiederhergestellt und die Ausführung wieder aufgenommen werden, ohne dass der Fortschritt verloren geht.

Kurzlebige fehlertolerante Prozesse

Verwenden Sie langlebige Funktionen, um zuverlässige Abläufe aufzubauen, die in der Regel innerhalb von Minuten abgeschlossen werden. Diese Prozesse sind zwar kürzer als langwierige Workflows, profitieren aber dennoch von automatischem Checkpoint und Fehlertoleranz in verteilten Systemen. Zuverlässige Funktionen stellen sicher, dass Ihre mehrstufigen Prozesse auch dann erfolgreich abgeschlossen werden, wenn einzelne Serviceanrufe fehlschlagen, ohne dass eine komplexe Fehlerbehandlung oder ein Code für die Statusverwaltung erforderlich sind.

Zu den gängigen Szenarien gehören Hotelbuchungssysteme, Restaurantreservierungsplattformen, Anfragen für Mitfahrgelegenheiten, der Kauf von Veranstaltungstickets und SaaS-Abonnement-Upgrades. Diese Szenarien weisen gemeinsame Merkmale auf: mehrere Serviceanrufe, die zusammen abgeschlossen werden müssen, die Notwendigkeit, bei vorübergehenden Ausfällen automatische Wiederholungsversuche durchzuführen, und die Anforderung, einen konsistenten Status über verteilte Systeme hinweg aufrechtzuerhalten.

Verteilte Transaktionen über Microservices hinweg

Koordinieren Sie Zahlungen, Inventar und Versand für mehrere Dienste mit automatischem Rollback bei Ausfällen. Jeder Servicevorgang ist in einem Schritt zusammengefasst, sodass sichergestellt wird, dass die Transaktion jederzeit wiederhergestellt werden kann, falls ein Service ausfällt.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { orderId, amount, items } = event; // Reserve inventory across multiple warehouses const inventory = await context.step("reserve-inventory", async () => { return await inventoryService.reserve(items); }); // Process payment const payment = await context.step("process-payment", async () => { return await paymentService.charge(amount); }); // Create shipment const shipment = await context.step("create-shipment", async () => { return await shippingService.createShipment(orderId, inventory); }); return { orderId, status: 'completed', shipment }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution @durable_execution def lambda_handler(event, context: DurableContext): order_id = event['orderId'] amount = event['amount'] items = event['items'] # Reserve inventory across multiple warehouses inventory = context.step( lambda _: inventory_service.reserve(items), name='reserve-inventory' ) # Process payment payment = context.step( lambda _: payment_service.charge(amount), name='process-payment' ) # Create shipment shipment = context.step( lambda _: shipping_service.create_shipment(order_id, inventory), name='create-shipment' ) return {'orderId': order_id, 'status': 'completed', 'shipment': shipment}

Schlägt ein Schritt fehl, wiederholt die Funktion den Vorgang automatisch vom letzten erfolgreichen Checkpoint aus. Die Inventarreservierung bleibt auch dann bestehen, wenn die Zahlungsabwicklung vorübergehend fehlschlägt. Wenn die Funktion es erneut versucht, überspringt sie den abgeschlossenen Inventarisierungsschritt und fährt direkt mit der Zahlungsabwicklung fort. Dadurch werden doppelte Reservierungen vermieden und ein einheitlicher Status in Ihrem gesamten verteilten System gewährleistet.

Auftragsabwicklung mit mehreren Schritten

Bearbeiten Sie Bestellungen durch Validierung, Zahlungsautorisierung, Inventarzuweisung und Versand mit automatischer Wiederholung und Wiederherstellung. Jeder Schritt wird überprüft, sodass sichergestellt wird, dass die Bestellung auch dann bearbeitet wird, wenn einzelne Schritte fehlschlagen und es erneut versucht wird.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { orderId, customerId, items } = event; // Validate order details const validation = await context.step("validate-order", async () => { const customer = await customerService.validate(customerId); const itemsValid = await inventoryService.validateItems(items); return { customer, itemsValid }; }); if (!validation.itemsValid) { return { orderId, status: 'rejected', reason: 'invalid_items' }; } // Authorize payment const authorization = await context.step("authorize-payment", async () => { return await paymentService.authorize( validation.customer.paymentMethod, calculateTotal(items) ); }); // Allocate inventory const allocation = await context.step("allocate-inventory", async () => { return await inventoryService.allocate(items); }); // Fulfill order const fulfillment = await context.step("fulfill-order", async () => { return await fulfillmentService.createShipment({ orderId, items: allocation.allocatedItems, address: validation.customer.shippingAddress }); }); return { orderId, status: 'completed', trackingNumber: fulfillment.trackingNumber }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution @durable_execution def lambda_handler(event, context: DurableContext): order_id = event['orderId'] customer_id = event['customerId'] items = event['items'] # Validate order details def validate_order(_): customer = customer_service.validate(customer_id) items_valid = inventory_service.validate_items(items) return {'customer': customer, 'itemsValid': items_valid} validation = context.step(validate_order, name='validate-order') if not validation['itemsValid']: return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_items'} # Authorize payment authorization = context.step( lambda _: payment_service.authorize( validation['customer']['paymentMethod'], calculate_total(items) ), name='authorize-payment' ) # Allocate inventory allocation = context.step( lambda _: inventory_service.allocate(items), name='allocate-inventory' ) # Fulfill order fulfillment = context.step( lambda _: fulfillment_service.create_shipment({ 'orderId': order_id, 'items': allocation['allocatedItems'], 'address': validation['customer']['shippingAddress'] }), name='fulfill-order' ) return { 'orderId': order_id, 'status': 'completed', 'trackingNumber': fulfillment['trackingNumber'] }

Dieses Muster stellt sicher, dass Bestellungen niemals in Zwischenzuständen stecken bleiben. Schlägt die Validierung fehl, wird die Bestellung vor der Zahlungsautorisierung abgelehnt. Wenn die Zahlungsautorisierung fehlschlägt, wird kein Inventar zugewiesen. Jeder Schritt baut auf dem vorherigen auf und bietet automatische Wiederholungen und Wiederherstellungen.

Hinweis

Die bedingte Prüfung if (!validation.itemsValid) befindet sich außerhalb eines Schritts und wird während der Wiedergabe erneut ausgeführt. Das ist sicher, weil es deterministisch ist — es liefert immer dasselbe Ergebnis bei demselben Überprüfungsobjekt.

Prozesse mit langer Laufzeit

Verwenden Sie langlebige Funktionen für Prozesse, die sich über Stunden, Tage oder Wochen erstrecken. Wartevorgänge unterbrechen die Ausführung, ohne dass Rechenkosten anfallen, wodurch Prozesse mit langer Laufzeit kostengünstig werden. Während Wartezeiten wird Ihre Funktion nicht mehr ausgeführt und Lambda recycelt die Ausführungsumgebung. Wenn es Zeit ist, fortzufahren, ruft Lambda Ihre Funktion erneut auf und spielt sie ab dem letzten Checkpoint erneut ab.

Dieses Ausführungsmodell macht langlebige Funktionen ideal für Prozesse, die über einen längeren Zeitraum unterbrochen werden müssen, unabhängig davon, ob sie auf menschliche Entscheidungen, externe Systemreaktionen, geplante Verarbeitungsfenster oder zeitbedingte Verzögerungen warten. Sie zahlen nur für aktive Rechenzeit, nicht für Wartezeiten.

Zu den gängigen Szenarien gehören Prozesse zur Genehmigung von Dokumenten, geplante Stapelverarbeitung, mehrtägige Onboarding-Prozesse, Abonnement-Testprozesse und Systeme für verzögerte Benachrichtigungen. Diese Szenarien haben gemeinsame Merkmale: längere Wartezeiten, gemessen in Stunden oder Tagen, die Notwendigkeit, den Ausführungsstatus während dieser Wartezeiten beizubehalten, und kostensensible Anforderungen, bei denen es unerschwinglich ist, für ungenutzte Rechenzeit zu bezahlen.

Human-in-the-loop Zulassungen

Unterbrechen Sie die Ausführung für Dokumentenprüfungen, Genehmigungen oder Entscheidungen und behalten Sie dabei den Ausführungsstatus bei. Die Funktion wartet auf externe Rückrufe, ohne Ressourcen zu verbrauchen, und wird automatisch fortgesetzt, wenn die Genehmigung eingeht.

Dieses Muster ist wichtig für Prozesse, die menschliches Urteilsvermögen oder externe Validierung erfordern. Die Funktion wird am Rückrufpunkt unterbrochen, sodass während des Wartens keine Rechenkosten anfallen. Wenn jemand seine Entscheidung über die API übermittelt, ruft Lambda Ihre Funktion erneut auf und spielt sie vom Checkpoint aus erneut ab, wobei das Genehmigungsergebnis angezeigt wird.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { documentId, reviewers } = event; // Step 1: Prepare document for review const prepared = await context.step("prepare-document", async () => { return await documentService.prepare(documentId); }); // Step 2: Request approval with callback const approval = await context.waitForCallback( "approval-callback", async (callbackId) => { await notificationService.sendApprovalRequest({ documentId, reviewers, callbackId, expiresIn: 86400 }); }, { timeout: { seconds: 86400 } } ); // Function resumes here when approval is received if (approval?.approved) { const finalized = await context.step("finalize-document", async () => { return await documentService.finalize(documentId, approval.comments); }); return { status: 'approved', documentId, finalizedAt: finalized.timestamp }; } // Handle rejection await context.step("archive-rejected", async () => { await documentService.archive(documentId, approval?.reason); }); return { status: 'rejected', documentId, reason: approval?.reason }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig @durable_execution def lambda_handler(event, context: DurableContext): document_id = event['documentId'] reviewers = event['reviewers'] # Step 1: Prepare document for review prepared = context.step( lambda _: document_service.prepare(document_id), name='prepare-document' ) # Step 2: Request approval with callback def send_approval_request(callback_id): notification_service.send_approval_request({ 'documentId': document_id, 'reviewers': reviewers, 'callbackId': callback_id, 'expiresIn': 86400 }) approval = context.wait_for_callback( send_approval_request, name='approval-callback', config=WaitConfig(timeout=86400) ) # Function resumes here when approval is received if approval and approval.get('approved'): finalized = context.step( lambda _: document_service.finalize(document_id, approval.get('comments')), name='finalize-document' ) return { 'status': 'approved', 'documentId': document_id, 'finalizedAt': finalized['timestamp'] } # Handle rejection context.step( lambda _: document_service.archive(document_id, approval.get('reason') if approval else None), name='archive-rejected' ) return { 'status': 'rejected', 'documentId': document_id, 'reason': approval.get('reason') if approval else None }

Wenn der Rückruf eingeht und Ihre Funktion wieder aufgenommen wird, wird er von Anfang an wiedergegeben. Der Schritt „Dokument vorbereiten“ gibt sofort das Ergebnis der Prüfung zurück. Der waitForCallback Vorgang kehrt außerdem sofort mit dem gespeicherten Genehmigungsergebnis zurück, anstatt erneut zu warten. Die Ausführung wird dann mit den Finalisierungs- oder Archivierungsschritten fortgesetzt.

Mehrstufige Daten-Pipelines

Verarbeiten Sie große Datensätze durch Extraktions-, Transformations- und Ladephasen mit Checkpoints zwischen den Phasen. Jede Phase kann Stunden dauern, bis sie abgeschlossen ist, und Checkpoints stellen sicher, dass die Pipeline bei einer Unterbrechung aus jeder Phase wieder aufgenommen werden kann.

Dieses Muster ist ideal für ETL-Workflows, Datenmigrationen oder Batch-Verarbeitungsaufträge, bei denen Sie Daten stufenweise mit Wiederherstellungspunkten zwischen ihnen verarbeiten müssen. Wenn eine Phase fehlschlägt, wird die Pipeline mit der letzten abgeschlossenen Phase fortgesetzt, anstatt von vorne neu zu starten. Sie können Warteoperationen auch verwenden, um zwischen den Phasen eine Pause einzulegen. Dabei können Sie Ratenbegrenzungen einhalten, warten, bis nachgelagerte Systeme bereit sind, oder die Verarbeitung außerhalb der Spitzenzeiten planen.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { datasetId, batchSize } = event; // Stage 1: Extract data from source const extracted = await context.step("extract-data", async () => { const records = await sourceDatabase.extractRecords(datasetId); return { recordCount: records.length, records }; }); // Wait 5 minutes to respect source system rate limits await context.wait({ seconds: 300 }); // Stage 2: Transform data in batches const transformed = await context.step("transform-data", async () => { const batches = chunkArray(extracted.records, batchSize); const results = []; for (const batch of batches) { const transformed = await transformService.processBatch(batch); results.push(transformed); } return { batchCount: batches.length, results }; }); // Wait until off-peak hours (e.g., 2 AM) const now = new Date(); const targetHour = 2; const msUntilTarget = calculateMsUntilHour(now, targetHour); await context.wait({ seconds: Math.floor(msUntilTarget / 1000) }); // Stage 3: Load data to destination const loaded = await context.step("load-data", async () => { let loadedCount = 0; for (const result of transformed.results) { await destinationDatabase.loadBatch(result); loadedCount += result.length; } return { loadedCount }; }); // Stage 4: Verify and finalize const verified = await context.step("verify-pipeline", async () => { const verification = await destinationDatabase.verifyRecords(datasetId); await pipelineService.markComplete(datasetId, verification); return verification; }); return { datasetId, recordsProcessed: extracted.recordCount, batchesProcessed: transformed.batchCount, recordsLoaded: loaded.loadedCount, verified: verified.success }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution from datetime import datetime @durable_execution def lambda_handler(event, context: DurableContext): dataset_id = event['datasetId'] batch_size = event['batchSize'] # Stage 1: Extract data from source def extract_data(_): records = source_database.extract_records(dataset_id) return {'recordCount': len(records), 'records': records} extracted = context.step(extract_data, name='extract-data') # Wait 5 minutes to respect source system rate limits context.wait(300) # Stage 2: Transform data in batches def transform_data(_): batches = chunk_array(extracted['records'], batch_size) results = [] for batch in batches: transformed = transform_service.process_batch(batch) results.append(transformed) return {'batchCount': len(batches), 'results': results} transformed = context.step(transform_data, name='transform-data') # Wait until off-peak hours (e.g., 2 AM) now = datetime.now() target_hour = 2 ms_until_target = calculate_ms_until_hour(now, target_hour) context.wait(ms_until_target // 1000) # Stage 3: Load data to destination def load_data(_): loaded_count = 0 for result in transformed['results']: destination_database.load_batch(result) loaded_count += len(result) return {'loadedCount': loaded_count} loaded = context.step(load_data, name='load-data') # Stage 4: Verify and finalize def verify_pipeline(_): verification = destination_database.verify_records(dataset_id) pipeline_service.mark_complete(dataset_id, verification) return verification verified = context.step(verify_pipeline, name='verify-pipeline') return { 'datasetId': dataset_id, 'recordsProcessed': extracted['recordCount'], 'batchesProcessed': transformed['batchCount'], 'recordsLoaded': loaded['loadedCount'], 'verified': verified['success'] }

Jede Phase ist in einem Schritt zusammengefasst, wodurch ein Checkpoint entsteht, über den die Pipeline bei einer Unterbrechung aus jeder Phase wieder aufgenommen werden kann. Bei der 5-minütigen Wartezeit zwischen Extrahieren und Transformieren werden die Geschwindigkeitsbegrenzungen des Quellsystems eingehalten, ohne Rechenressourcen zu verbrauchen, während bei der Wartezeit bis 2 Uhr der teure Ladevorgang außerhalb der Spitzenzeiten geplant wird.

Hinweis

Der new Date() Aufruf und die calculateMsUntilHour() Funktion befinden sich außerhalb von Schritten und werden während der Wiedergabe erneut ausgeführt. Für zeitbasierte Operationen, die bei allen Wiederholungen konsistent sein müssen, berechnen Sie den Zeitstempel innerhalb eines Schritts oder verwenden Sie ihn nur für Wartezeiten (die mit Checkpoints versehen sind).

Funktionsübergreifende verkettete Aufrufe

Rufen Sie andere Lambda-Funktionen aus einer dauerhaften Funktion heraus auf, indem Sie. context.invoke() Die aufrufende Funktion wird angehalten, während auf den Abschluss der aufgerufenen Funktion gewartet wird, wodurch ein Checkpoint erstellt wird, der das Ergebnis beibehält. Wenn die aufrufende Funktion nach Abschluss der aufgerufenen Funktion unterbrochen wird, wird sie mit dem gespeicherten Ergebnis fortgesetzt, ohne die Funktion erneut aufzurufen.

Verwenden Sie dieses Muster, wenn Sie über spezielle Funktionen verfügen, die bestimmte Bereiche (Kundenvalidierung, Zahlungsabwicklung, Bestandsverwaltung) behandeln und diese in einem Workflow koordinieren müssen. Jede Funktion behält ihre eigene Logik bei und kann von mehreren Orchestrator-Funktionen aufgerufen werden, wodurch Codeduplikationen vermieden werden.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; // Main orchestrator function export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { orderId, customerId } = event; // Step 1: Validate customer by invoking customer service function const customer = await context.invoke( "validate-customer", "arn:aws:lambda:us-east-1:123456789012:function:customer-service:1", { customerId } ); if (!customer.isValid) { return { orderId, status: "rejected", reason: "invalid_customer" }; } // Step 2: Check inventory by invoking inventory service function const inventory = await context.invoke( "check-inventory", "arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1", { orderId, items: event.items } ); if (!inventory.available) { return { orderId, status: "rejected", reason: "insufficient_inventory" }; } // Step 3: Process payment by invoking payment service function const payment = await context.invoke( "process-payment", "arn:aws:lambda:us-east-1:123456789012:function:payment-service:1", { customerId, amount: inventory.totalAmount, paymentMethod: customer.paymentMethod } ); // Step 4: Create shipment by invoking fulfillment service function const shipment = await context.invoke( "create-shipment", "arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1", { orderId, items: inventory.allocatedItems, address: customer.shippingAddress } ); return { orderId, status: "completed", trackingNumber: shipment.trackingNumber, estimatedDelivery: shipment.estimatedDelivery }; } );
Python
from aws_durable_execution_sdk_python import DurableContext, durable_execution # Main orchestrator function @durable_execution def lambda_handler(event, context: DurableContext): order_id = event['orderId'] customer_id = event['customerId'] # Step 1: Validate customer by invoking customer service function customer = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:customer-service:1', {'customerId': customer_id}, name='validate-customer' ) if not customer['isValid']: return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_customer'} # Step 2: Check inventory by invoking inventory service function inventory = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1', {'orderId': order_id, 'items': event['items']}, name='check-inventory' ) if not inventory['available']: return {'orderId': order_id, 'status': 'rejected', 'reason': 'insufficient_inventory'} # Step 3: Process payment by invoking payment service function payment = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:payment-service:1', { 'customerId': customer_id, 'amount': inventory['totalAmount'], 'paymentMethod': customer['paymentMethod'] }, name='process-payment' ) # Step 4: Create shipment by invoking fulfillment service function shipment = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1', { 'orderId': order_id, 'items': inventory['allocatedItems'], 'address': customer['shippingAddress'] }, name='create-shipment' ) return { 'orderId': order_id, 'status': 'completed', 'trackingNumber': shipment['trackingNumber'], 'estimatedDelivery': shipment['estimatedDelivery'] }

Jeder Aufruf erstellt einen Checkpoint in der Orchestrator-Funktion. Wenn der Orchestrator nach Abschluss der Kundenvalidierung unterbrochen wird, wird er von diesem Checkpoint aus mit den gespeicherten Kundendaten wieder aufgenommen und der Validierungsaufruf übersprungen. Dies verhindert doppelte Aufrufe von nachgelagerten Diensten und gewährleistet eine konsistente Ausführung auch bei Unterbrechungen.

Bei den aufgerufenen Funktionen kann es sich entweder um dauerhafte Funktionen oder um Standard-Lambda-Funktionen handeln. Wenn Sie eine dauerhafte Funktion aufrufen, kann sie über einen eigenen mehrstufigen Workflow mit Wartezeiten und Checkpoints verfügen. Der Orchestrator wartet einfach darauf, dass die komplette Durable-Ausführung abgeschlossen ist, und erhält das Endergebnis.

Anmerkung

Kontoübergreifende Aufrufe werden nicht unterstützt. Alle aufgerufenen Funktionen müssen sich im selben AWS Konto wie die aufrufende Funktion befinden.

Fortgeschrittene Muster

Verwenden Sie langlebige Funktionen, um komplexe mehrstufige Anwendungen zu erstellen, die mehrere dauerhafte Operationen, parallel Ausführung, Array-Verarbeitung, bedingte Logik und Polling kombinieren. Mit diesen Mustern können Sie anspruchsvolle Anwendungen erstellen, die viele Aufgaben koordinieren und gleichzeitig Fehlertoleranz und automatische Wiederherstellung gewährleisten.

Fortgeschrittene Muster gehen über einfache sequentielle Schritte hinaus. Sie können Operationen gleichzeitig mit diesen Primitiven ausführenparallel(), Arrays mit verarbeitenmap(), auf externe Bedingungen warten und diese Primitive kombinierenwaitForCondition(), um zuverlässige Anwendungen zu erstellen. Jeder dauerhafte Vorgang erstellt seine eigenen Prüfpunkte, sodass Ihre Anwendung bei einer Unterbrechung jederzeit wiederhergestellt werden kann.

Onboarding-Prozesse für Benutzer

Führen Sie Benutzer durch die Registrierung, E-Mail-Überprüfung, Profileinrichtung und Erstkonfiguration mit Wiederholungsversuchen. In diesem Beispiel werden aufeinanderfolgende Schritte, Rückrufe und bedingte Logik kombiniert, um einen vollständigen Onboarding-Prozess zu erstellen.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { userId, email } = event; // Step 1: Create user account const user = await context.step("create-account", async () => { return await userService.createAccount(userId, email); }); // Step 2: Send verification email await context.step("send-verification", async () => { return await emailService.sendVerification(email); }); // Step 3: Wait for email verification (up to 48 hours) const verified = await context.waitForCallback( "email-verification", async (callbackId) => { await notificationService.sendVerificationLink({ email, callbackId, expiresIn: 172800 }); }, { timeout: { seconds: 172800 } } ); if (!verified) { await context.step("send-reminder", async () => { await emailService.sendReminder(email); }); return { status: "verification_timeout", userId, message: "Email verification not completed within 48 hours" }; } // Step 4: Initialize user profile in parallel const setupResults = await context.parallel("profile-setup", [ async (ctx: DurableContext) => { return await ctx.step("create-preferences", async () => { return await preferencesService.createDefaults(userId); }); }, async (ctx: DurableContext) => { return await ctx.step("setup-notifications", async () => { return await notificationService.setupDefaults(userId); }); }, async (ctx: DurableContext) => { return await ctx.step("create-welcome-content", async () => { return await contentService.createWelcome(userId); }); } ]); // Step 5: Send welcome email await context.step("send-welcome", async () => { const [preferences, notifications, content] = setupResults.getResults(); return await emailService.sendWelcome({ email, preferences, notifications, content }); }); return { status: "onboarding_complete", userId, completedAt: new Date().toISOString() }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig from datetime import datetime @durable_execution def lambda_handler(event, context: DurableContext): user_id = event['userId'] email = event['email'] # Step 1: Create user account user = context.step( lambda _: user_service.create_account(user_id, email), name='create-account' ) # Step 2: Send verification email context.step( lambda _: email_service.send_verification(email), name='send-verification' ) # Step 3: Wait for email verification (up to 48 hours) def send_verification_link(callback_id): notification_service.send_verification_link({ 'email': email, 'callbackId': callback_id, 'expiresIn': 172800 }) verified = context.wait_for_callback( send_verification_link, name='email-verification', config=WaitConfig(timeout=172800) ) if not verified: context.step( lambda _: email_service.send_reminder(email), name='send-reminder' ) return { 'status': 'verification_timeout', 'userId': user_id, 'message': 'Email verification not completed within 48 hours' } # Step 4: Initialize user profile in parallel def create_preferences(ctx: DurableContext): return ctx.step( lambda _: preferences_service.create_defaults(user_id), name='create-preferences' ) def setup_notifications(ctx: DurableContext): return ctx.step( lambda _: notification_service.setup_defaults(user_id), name='setup-notifications' ) def create_welcome_content(ctx: DurableContext): return ctx.step( lambda _: content_service.create_welcome(user_id), name='create-welcome-content' ) setup_results = context.parallel( [create_preferences, setup_notifications, create_welcome_content], name='profile-setup' ) # Step 5: Send welcome email def send_welcome(_): results = setup_results.get_results() preferences, notifications, content = results[0], results[1], results[2] return email_service.send_welcome({ 'email': email, 'preferences': preferences, 'notifications': notifications, 'content': content }) context.step(send_welcome, name='send-welcome') return { 'status': 'onboarding_complete', 'userId': user_id, 'completedAt': datetime.now().isoformat() }

Der Prozess kombiniert aufeinanderfolgende Schritte mit Checkpoints für die Kontoerstellung und das Senden von E-Mails und wartet dann bis zu 48 Stunden lang auf die E-Mail-Bestätigung, ohne Ressourcen zu verbrauchen. Die bedingte Logik verarbeitet unterschiedliche Pfade, je nachdem, ob die Überprüfung abgeschlossen ist oder ob das Zeitlimit überschritten wird. Aufgaben zur Profileinrichtung werden gleichzeitig mit parallel Vorgängen ausgeführt, um die Gesamtausführungszeit zu reduzieren. Jeder Schritt wird bei vorübergehenden Ausfällen automatisch wiederholt, um sicherzustellen, dass das Onboarding zuverlässig abgeschlossen wird.

Stapelverarbeitung mit Checkpoints

Verarbeiten Sie Millionen von Datensätzen mit automatischer Wiederherstellung vom letzten erfolgreichen Checkpoint nach Ausfällen. Dieses Beispiel zeigt, wie langlebige Funktionen map() Operationen mit Chunking und Ratenbegrenzung kombinieren, um umfangreiche Datenverarbeitungen zu bewältigen.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; interface Batch { batchIndex: number; recordIds: string[]; } export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { datasetId, batchSize = 1000 } = event; // Step 1: Get all record IDs to process const recordIds = await context.step("fetch-record-ids", async () => { return await dataService.getRecordIds(datasetId); }); // Step 2: Split into batches const batches: Batch[] = []; for (let i = 0; i < recordIds.length; i += batchSize) { batches.push({ batchIndex: Math.floor(i / batchSize), recordIds: recordIds.slice(i, i + batchSize) }); } // Step 3: Process batches with controlled concurrency const batchResults = await context.map( "process-batches", batches, async (ctx: DurableContext, batch: Batch, index: number) => { const processed = await ctx.step(`batch-${batch.batchIndex}`, async () => { const results = []; for (const recordId of batch.recordIds) { const result = await recordService.process(recordId); results.push(result); } return results; }); const validated = await ctx.step(`validate-${batch.batchIndex}`, async () => { return await validationService.validateBatch(processed); }); return { batchIndex: batch.batchIndex, recordCount: batch.recordIds.length, successCount: validated.successCount, failureCount: validated.failureCount }; }, { maxConcurrency: 5 } ); // Step 4: Aggregate results const summary = await context.step("aggregate-results", async () => { const results = batchResults.getResults(); const totalSuccess = results.reduce((sum, r) => sum + r.successCount, 0); const totalFailure = results.reduce((sum, r) => sum + r.failureCount, 0); return { datasetId, totalRecords: recordIds.length, batchesProcessed: batches.length, successCount: totalSuccess, failureCount: totalFailure, completedAt: new Date().toISOString() }; }); return summary; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution, MapConfig from datetime import datetime from typing import List, Dict @durable_execution def lambda_handler(event, context: DurableContext): dataset_id = event['datasetId'] batch_size = event.get('batchSize', 1000) # Step 1: Get all record IDs to process record_ids = context.step( lambda _: data_service.get_record_ids(dataset_id), name='fetch-record-ids' ) # Step 2: Split into batches batches = [] for i in range(0, len(record_ids), batch_size): batches.append({ 'batchIndex': i // batch_size, 'recordIds': record_ids[i:i + batch_size] }) # Step 3: Process batches with controlled concurrency def process_batch(ctx: DurableContext, batch: Dict, index: int): batch_index = batch['batchIndex'] def process_records(_): results = [] for record_id in batch['recordIds']: result = record_service.process(record_id) results.append(result) return results processed = ctx.step(process_records, name=f'batch-{batch_index}') validated = ctx.step( lambda _: validation_service.validate_batch(processed), name=f'validate-{batch_index}' ) return { 'batchIndex': batch_index, 'recordCount': len(batch['recordIds']), 'successCount': validated['successCount'], 'failureCount': validated['failureCount'] } batch_results = context.map( process_batch, batches, name='process-batches', config=MapConfig(max_concurrency=5) ) # Step 4: Aggregate results def aggregate_results(_): results = batch_results.get_results() total_success = sum(r['successCount'] for r in results) total_failure = sum(r['failureCount'] for r in results) return { 'datasetId': dataset_id, 'totalRecords': len(record_ids), 'batchesProcessed': len(batches), 'successCount': total_success, 'failureCount': total_failure, 'completedAt': datetime.now().isoformat() } summary = context.step(aggregate_results, name='aggregate-results') return summary

Datensätze werden in überschaubare Stapel aufgeteilt, um eine Überlastung des Speichers oder nachgelagerte Dienste zu vermeiden. Anschließend werden mehrere Batches gleichzeitig verarbeitet, wobei die Parallelität kontrolliert wird. maxConcurrency Jeder Stapel hat seinen eigenen Prüfpunkt, sodass bei Fehlern nur der fehlgeschlagene Stapel wiederholt wird, anstatt alle Datensätze erneut zu verarbeiten. Dieses Muster ist ideal für ETL-Jobs, Datenmigrationen oder Massenvorgänge, bei denen die Verarbeitung Stunden dauern kann.

Nächste Schritte