Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Beispiele und Anwendungsfälle
Mit den dauerhaften Lambda-Funktionen können Sie fehlertolerante, mehrstufige Anwendungen mit dauerhaften Vorgängen wie Steps und Waits erstellen. Mit automatischem Checkpointing und einem Checkpoint-Replay-Modell, bei dem die Ausführung nach einem Ausfall wieder von vorne beginnt, abgeschlossene Checkpoints jedoch überspringt, können Ihre Funktionen nach Ausfällen wiederhergestellt und die Ausführung wieder aufgenommen werden, ohne dass der Fortschritt verloren geht.
Kurzlebige fehlertolerante Prozesse
Verwenden Sie langlebige Funktionen, um zuverlässige Abläufe aufzubauen, die in der Regel innerhalb von Minuten abgeschlossen werden. Diese Prozesse sind zwar kürzer als langwierige Workflows, profitieren aber dennoch von automatischem Checkpoint und Fehlertoleranz in verteilten Systemen. Zuverlässige Funktionen stellen sicher, dass Ihre mehrstufigen Prozesse auch dann erfolgreich abgeschlossen werden, wenn einzelne Serviceanrufe fehlschlagen, ohne dass eine komplexe Fehlerbehandlung oder ein Code für die Statusverwaltung erforderlich sind.
Zu den gängigen Szenarien gehören Hotelbuchungssysteme, Restaurantreservierungsplattformen, Anfragen für Mitfahrgelegenheiten, der Kauf von Veranstaltungstickets und SaaS-Abonnement-Upgrades. Diese Szenarien weisen gemeinsame Merkmale auf: mehrere Serviceanrufe, die zusammen abgeschlossen werden müssen, die Notwendigkeit, bei vorübergehenden Ausfällen automatische Wiederholungsversuche durchzuführen, und die Anforderung, einen konsistenten Status über verteilte Systeme hinweg aufrechtzuerhalten.
Verteilte Transaktionen über Microservices hinweg
Koordinieren Sie Zahlungen, Inventar und Versand für mehrere Dienste mit automatischem Rollback bei Ausfällen. Jeder Servicevorgang ist in einem Schritt zusammengefasst, sodass sichergestellt wird, dass die Transaktion jederzeit wiederhergestellt werden kann, falls ein Service ausfällt.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { orderId, amount, items } = event;
// Reserve inventory across multiple warehouses
const inventory = await context.step("reserve-inventory", async () => {
return await inventoryService.reserve(items);
});
// Process payment
const payment = await context.step("process-payment", async () => {
return await paymentService.charge(amount);
});
// Create shipment
const shipment = await context.step("create-shipment", async () => {
return await shippingService.createShipment(orderId, inventory);
});
return { orderId, status: 'completed', shipment };
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution
@durable_execution
def lambda_handler(event, context: DurableContext):
order_id = event['orderId']
amount = event['amount']
items = event['items']
# Reserve inventory across multiple warehouses
inventory = context.step(
lambda _: inventory_service.reserve(items),
name='reserve-inventory'
)
# Process payment
payment = context.step(
lambda _: payment_service.charge(amount),
name='process-payment'
)
# Create shipment
shipment = context.step(
lambda _: shipping_service.create_shipment(order_id, inventory),
name='create-shipment'
)
return {'orderId': order_id, 'status': 'completed', 'shipment': shipment}
Schlägt ein Schritt fehl, wiederholt die Funktion den Vorgang automatisch vom letzten erfolgreichen Checkpoint aus. Die Inventarreservierung bleibt auch dann bestehen, wenn die Zahlungsabwicklung vorübergehend fehlschlägt. Wenn die Funktion es erneut versucht, überspringt sie den abgeschlossenen Inventarisierungsschritt und fährt direkt mit der Zahlungsabwicklung fort. Dadurch werden doppelte Reservierungen vermieden und ein einheitlicher Status in Ihrem gesamten verteilten System gewährleistet.
Auftragsabwicklung mit mehreren Schritten
Bearbeiten Sie Bestellungen durch Validierung, Zahlungsautorisierung, Inventarzuweisung und Versand mit automatischer Wiederholung und Wiederherstellung. Jeder Schritt wird überprüft, sodass sichergestellt wird, dass die Bestellung auch dann bearbeitet wird, wenn einzelne Schritte fehlschlagen und es erneut versucht wird.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { orderId, customerId, items } = event;
// Validate order details
const validation = await context.step("validate-order", async () => {
const customer = await customerService.validate(customerId);
const itemsValid = await inventoryService.validateItems(items);
return { customer, itemsValid };
});
if (!validation.itemsValid) {
return { orderId, status: 'rejected', reason: 'invalid_items' };
}
// Authorize payment
const authorization = await context.step("authorize-payment", async () => {
return await paymentService.authorize(
validation.customer.paymentMethod,
calculateTotal(items)
);
});
// Allocate inventory
const allocation = await context.step("allocate-inventory", async () => {
return await inventoryService.allocate(items);
});
// Fulfill order
const fulfillment = await context.step("fulfill-order", async () => {
return await fulfillmentService.createShipment({
orderId,
items: allocation.allocatedItems,
address: validation.customer.shippingAddress
});
});
return {
orderId,
status: 'completed',
trackingNumber: fulfillment.trackingNumber
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution
@durable_execution
def lambda_handler(event, context: DurableContext):
order_id = event['orderId']
customer_id = event['customerId']
items = event['items']
# Validate order details
def validate_order(_):
customer = customer_service.validate(customer_id)
items_valid = inventory_service.validate_items(items)
return {'customer': customer, 'itemsValid': items_valid}
validation = context.step(validate_order, name='validate-order')
if not validation['itemsValid']:
return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_items'}
# Authorize payment
authorization = context.step(
lambda _: payment_service.authorize(
validation['customer']['paymentMethod'],
calculate_total(items)
),
name='authorize-payment'
)
# Allocate inventory
allocation = context.step(
lambda _: inventory_service.allocate(items),
name='allocate-inventory'
)
# Fulfill order
fulfillment = context.step(
lambda _: fulfillment_service.create_shipment({
'orderId': order_id,
'items': allocation['allocatedItems'],
'address': validation['customer']['shippingAddress']
}),
name='fulfill-order'
)
return {
'orderId': order_id,
'status': 'completed',
'trackingNumber': fulfillment['trackingNumber']
}
Dieses Muster stellt sicher, dass Bestellungen niemals in Zwischenzuständen stecken bleiben. Schlägt die Validierung fehl, wird die Bestellung vor der Zahlungsautorisierung abgelehnt. Wenn die Zahlungsautorisierung fehlschlägt, wird kein Inventar zugewiesen. Jeder Schritt baut auf dem vorherigen auf und bietet automatische Wiederholungen und Wiederherstellungen.
Hinweis
Die bedingte Prüfung if (!validation.itemsValid) befindet sich außerhalb eines Schritts und wird während der Wiedergabe erneut ausgeführt. Das ist sicher, weil es deterministisch ist — es liefert immer dasselbe Ergebnis bei demselben Überprüfungsobjekt.
Prozesse mit langer Laufzeit
Verwenden Sie langlebige Funktionen für Prozesse, die sich über Stunden, Tage oder Wochen erstrecken. Wartevorgänge unterbrechen die Ausführung, ohne dass Rechenkosten anfallen, wodurch Prozesse mit langer Laufzeit kostengünstig werden. Während Wartezeiten wird Ihre Funktion nicht mehr ausgeführt und Lambda recycelt die Ausführungsumgebung. Wenn es Zeit ist, fortzufahren, ruft Lambda Ihre Funktion erneut auf und spielt sie ab dem letzten Checkpoint erneut ab.
Dieses Ausführungsmodell macht langlebige Funktionen ideal für Prozesse, die über einen längeren Zeitraum unterbrochen werden müssen, unabhängig davon, ob sie auf menschliche Entscheidungen, externe Systemreaktionen, geplante Verarbeitungsfenster oder zeitbedingte Verzögerungen warten. Sie zahlen nur für aktive Rechenzeit, nicht für Wartezeiten.
Zu den gängigen Szenarien gehören Prozesse zur Genehmigung von Dokumenten, geplante Stapelverarbeitung, mehrtägige Onboarding-Prozesse, Abonnement-Testprozesse und Systeme für verzögerte Benachrichtigungen. Diese Szenarien haben gemeinsame Merkmale: längere Wartezeiten, gemessen in Stunden oder Tagen, die Notwendigkeit, den Ausführungsstatus während dieser Wartezeiten beizubehalten, und kostensensible Anforderungen, bei denen es unerschwinglich ist, für ungenutzte Rechenzeit zu bezahlen.
Human-in-the-loop Zulassungen
Unterbrechen Sie die Ausführung für Dokumentenprüfungen, Genehmigungen oder Entscheidungen und behalten Sie dabei den Ausführungsstatus bei. Die Funktion wartet auf externe Rückrufe, ohne Ressourcen zu verbrauchen, und wird automatisch fortgesetzt, wenn die Genehmigung eingeht.
Dieses Muster ist wichtig für Prozesse, die menschliches Urteilsvermögen oder externe Validierung erfordern. Die Funktion wird am Rückrufpunkt unterbrochen, sodass während des Wartens keine Rechenkosten anfallen. Wenn jemand seine Entscheidung über die API übermittelt, ruft Lambda Ihre Funktion erneut auf und spielt sie vom Checkpoint aus erneut ab, wobei das Genehmigungsergebnis angezeigt wird.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { documentId, reviewers } = event;
// Step 1: Prepare document for review
const prepared = await context.step("prepare-document", async () => {
return await documentService.prepare(documentId);
});
// Step 2: Request approval with callback
const approval = await context.waitForCallback(
"approval-callback",
async (callbackId) => {
await notificationService.sendApprovalRequest({
documentId,
reviewers,
callbackId,
expiresIn: 86400
});
},
{
timeout: { seconds: 86400 }
}
);
// Function resumes here when approval is received
if (approval?.approved) {
const finalized = await context.step("finalize-document", async () => {
return await documentService.finalize(documentId, approval.comments);
});
return {
status: 'approved',
documentId,
finalizedAt: finalized.timestamp
};
}
// Handle rejection
await context.step("archive-rejected", async () => {
await documentService.archive(documentId, approval?.reason);
});
return {
status: 'rejected',
documentId,
reason: approval?.reason
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig
@durable_execution
def lambda_handler(event, context: DurableContext):
document_id = event['documentId']
reviewers = event['reviewers']
# Step 1: Prepare document for review
prepared = context.step(
lambda _: document_service.prepare(document_id),
name='prepare-document'
)
# Step 2: Request approval with callback
def send_approval_request(callback_id):
notification_service.send_approval_request({
'documentId': document_id,
'reviewers': reviewers,
'callbackId': callback_id,
'expiresIn': 86400
})
approval = context.wait_for_callback(
send_approval_request,
name='approval-callback',
config=WaitConfig(timeout=86400)
)
# Function resumes here when approval is received
if approval and approval.get('approved'):
finalized = context.step(
lambda _: document_service.finalize(document_id, approval.get('comments')),
name='finalize-document'
)
return {
'status': 'approved',
'documentId': document_id,
'finalizedAt': finalized['timestamp']
}
# Handle rejection
context.step(
lambda _: document_service.archive(document_id, approval.get('reason') if approval else None),
name='archive-rejected'
)
return {
'status': 'rejected',
'documentId': document_id,
'reason': approval.get('reason') if approval else None
}
Wenn der Rückruf eingeht und Ihre Funktion wieder aufgenommen wird, wird er von Anfang an wiedergegeben. Der Schritt „Dokument vorbereiten“ gibt sofort das Ergebnis der Prüfung zurück. Der waitForCallback Vorgang kehrt außerdem sofort mit dem gespeicherten Genehmigungsergebnis zurück, anstatt erneut zu warten. Die Ausführung wird dann mit den Finalisierungs- oder Archivierungsschritten fortgesetzt.
Mehrstufige Daten-Pipelines
Verarbeiten Sie große Datensätze durch Extraktions-, Transformations- und Ladephasen mit Checkpoints zwischen den Phasen. Jede Phase kann Stunden dauern, bis sie abgeschlossen ist, und Checkpoints stellen sicher, dass die Pipeline bei einer Unterbrechung aus jeder Phase wieder aufgenommen werden kann.
Dieses Muster ist ideal für ETL-Workflows, Datenmigrationen oder Batch-Verarbeitungsaufträge, bei denen Sie Daten stufenweise mit Wiederherstellungspunkten zwischen ihnen verarbeiten müssen. Wenn eine Phase fehlschlägt, wird die Pipeline mit der letzten abgeschlossenen Phase fortgesetzt, anstatt von vorne neu zu starten. Sie können Warteoperationen auch verwenden, um zwischen den Phasen eine Pause einzulegen. Dabei können Sie Ratenbegrenzungen einhalten, warten, bis nachgelagerte Systeme bereit sind, oder die Verarbeitung außerhalb der Spitzenzeiten planen.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { datasetId, batchSize } = event;
// Stage 1: Extract data from source
const extracted = await context.step("extract-data", async () => {
const records = await sourceDatabase.extractRecords(datasetId);
return { recordCount: records.length, records };
});
// Wait 5 minutes to respect source system rate limits
await context.wait({ seconds: 300 });
// Stage 2: Transform data in batches
const transformed = await context.step("transform-data", async () => {
const batches = chunkArray(extracted.records, batchSize);
const results = [];
for (const batch of batches) {
const transformed = await transformService.processBatch(batch);
results.push(transformed);
}
return { batchCount: batches.length, results };
});
// Wait until off-peak hours (e.g., 2 AM)
const now = new Date();
const targetHour = 2;
const msUntilTarget = calculateMsUntilHour(now, targetHour);
await context.wait({ seconds: Math.floor(msUntilTarget / 1000) });
// Stage 3: Load data to destination
const loaded = await context.step("load-data", async () => {
let loadedCount = 0;
for (const result of transformed.results) {
await destinationDatabase.loadBatch(result);
loadedCount += result.length;
}
return { loadedCount };
});
// Stage 4: Verify and finalize
const verified = await context.step("verify-pipeline", async () => {
const verification = await destinationDatabase.verifyRecords(datasetId);
await pipelineService.markComplete(datasetId, verification);
return verification;
});
return {
datasetId,
recordsProcessed: extracted.recordCount,
batchesProcessed: transformed.batchCount,
recordsLoaded: loaded.loadedCount,
verified: verified.success
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution
from datetime import datetime
@durable_execution
def lambda_handler(event, context: DurableContext):
dataset_id = event['datasetId']
batch_size = event['batchSize']
# Stage 1: Extract data from source
def extract_data(_):
records = source_database.extract_records(dataset_id)
return {'recordCount': len(records), 'records': records}
extracted = context.step(extract_data, name='extract-data')
# Wait 5 minutes to respect source system rate limits
context.wait(300)
# Stage 2: Transform data in batches
def transform_data(_):
batches = chunk_array(extracted['records'], batch_size)
results = []
for batch in batches:
transformed = transform_service.process_batch(batch)
results.append(transformed)
return {'batchCount': len(batches), 'results': results}
transformed = context.step(transform_data, name='transform-data')
# Wait until off-peak hours (e.g., 2 AM)
now = datetime.now()
target_hour = 2
ms_until_target = calculate_ms_until_hour(now, target_hour)
context.wait(ms_until_target // 1000)
# Stage 3: Load data to destination
def load_data(_):
loaded_count = 0
for result in transformed['results']:
destination_database.load_batch(result)
loaded_count += len(result)
return {'loadedCount': loaded_count}
loaded = context.step(load_data, name='load-data')
# Stage 4: Verify and finalize
def verify_pipeline(_):
verification = destination_database.verify_records(dataset_id)
pipeline_service.mark_complete(dataset_id, verification)
return verification
verified = context.step(verify_pipeline, name='verify-pipeline')
return {
'datasetId': dataset_id,
'recordsProcessed': extracted['recordCount'],
'batchesProcessed': transformed['batchCount'],
'recordsLoaded': loaded['loadedCount'],
'verified': verified['success']
}
Jede Phase ist in einem Schritt zusammengefasst, wodurch ein Checkpoint entsteht, über den die Pipeline bei einer Unterbrechung aus jeder Phase wieder aufgenommen werden kann. Bei der 5-minütigen Wartezeit zwischen Extrahieren und Transformieren werden die Geschwindigkeitsbegrenzungen des Quellsystems eingehalten, ohne Rechenressourcen zu verbrauchen, während bei der Wartezeit bis 2 Uhr der teure Ladevorgang außerhalb der Spitzenzeiten geplant wird.
Hinweis
Der new Date() Aufruf und die calculateMsUntilHour() Funktion befinden sich außerhalb von Schritten und werden während der Wiedergabe erneut ausgeführt. Für zeitbasierte Operationen, die bei allen Wiederholungen konsistent sein müssen, berechnen Sie den Zeitstempel innerhalb eines Schritts oder verwenden Sie ihn nur für Wartezeiten (die mit Checkpoints versehen sind).
Funktionsübergreifende verkettete Aufrufe
Rufen Sie andere Lambda-Funktionen aus einer dauerhaften Funktion heraus auf, indem Sie. context.invoke() Die aufrufende Funktion wird angehalten, während auf den Abschluss der aufgerufenen Funktion gewartet wird, wodurch ein Checkpoint erstellt wird, der das Ergebnis beibehält. Wenn die aufrufende Funktion nach Abschluss der aufgerufenen Funktion unterbrochen wird, wird sie mit dem gespeicherten Ergebnis fortgesetzt, ohne die Funktion erneut aufzurufen.
Verwenden Sie dieses Muster, wenn Sie über spezielle Funktionen verfügen, die bestimmte Bereiche (Kundenvalidierung, Zahlungsabwicklung, Bestandsverwaltung) behandeln und diese in einem Workflow koordinieren müssen. Jede Funktion behält ihre eigene Logik bei und kann von mehreren Orchestrator-Funktionen aufgerufen werden, wodurch Codeduplikationen vermieden werden.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
// Main orchestrator function
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { orderId, customerId } = event;
// Step 1: Validate customer by invoking customer service function
const customer = await context.invoke(
"validate-customer",
"arn:aws:lambda:us-east-1:123456789012:function:customer-service:1",
{ customerId }
);
if (!customer.isValid) {
return { orderId, status: "rejected", reason: "invalid_customer" };
}
// Step 2: Check inventory by invoking inventory service function
const inventory = await context.invoke(
"check-inventory",
"arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1",
{ orderId, items: event.items }
);
if (!inventory.available) {
return { orderId, status: "rejected", reason: "insufficient_inventory" };
}
// Step 3: Process payment by invoking payment service function
const payment = await context.invoke(
"process-payment",
"arn:aws:lambda:us-east-1:123456789012:function:payment-service:1",
{
customerId,
amount: inventory.totalAmount,
paymentMethod: customer.paymentMethod
}
);
// Step 4: Create shipment by invoking fulfillment service function
const shipment = await context.invoke(
"create-shipment",
"arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1",
{
orderId,
items: inventory.allocatedItems,
address: customer.shippingAddress
}
);
return {
orderId,
status: "completed",
trackingNumber: shipment.trackingNumber,
estimatedDelivery: shipment.estimatedDelivery
};
}
);
- Python
-
from aws_durable_execution_sdk_python import DurableContext, durable_execution
# Main orchestrator function
@durable_execution
def lambda_handler(event, context: DurableContext):
order_id = event['orderId']
customer_id = event['customerId']
# Step 1: Validate customer by invoking customer service function
customer = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:customer-service:1',
{'customerId': customer_id},
name='validate-customer'
)
if not customer['isValid']:
return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_customer'}
# Step 2: Check inventory by invoking inventory service function
inventory = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1',
{'orderId': order_id, 'items': event['items']},
name='check-inventory'
)
if not inventory['available']:
return {'orderId': order_id, 'status': 'rejected', 'reason': 'insufficient_inventory'}
# Step 3: Process payment by invoking payment service function
payment = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:payment-service:1',
{
'customerId': customer_id,
'amount': inventory['totalAmount'],
'paymentMethod': customer['paymentMethod']
},
name='process-payment'
)
# Step 4: Create shipment by invoking fulfillment service function
shipment = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1',
{
'orderId': order_id,
'items': inventory['allocatedItems'],
'address': customer['shippingAddress']
},
name='create-shipment'
)
return {
'orderId': order_id,
'status': 'completed',
'trackingNumber': shipment['trackingNumber'],
'estimatedDelivery': shipment['estimatedDelivery']
}
Jeder Aufruf erstellt einen Checkpoint in der Orchestrator-Funktion. Wenn der Orchestrator nach Abschluss der Kundenvalidierung unterbrochen wird, wird er von diesem Checkpoint aus mit den gespeicherten Kundendaten wieder aufgenommen und der Validierungsaufruf übersprungen. Dies verhindert doppelte Aufrufe von nachgelagerten Diensten und gewährleistet eine konsistente Ausführung auch bei Unterbrechungen.
Bei den aufgerufenen Funktionen kann es sich entweder um dauerhafte Funktionen oder um Standard-Lambda-Funktionen handeln. Wenn Sie eine dauerhafte Funktion aufrufen, kann sie über einen eigenen mehrstufigen Workflow mit Wartezeiten und Checkpoints verfügen. Der Orchestrator wartet einfach darauf, dass die komplette Durable-Ausführung abgeschlossen ist, und erhält das Endergebnis.
Kontoübergreifende Aufrufe werden nicht unterstützt. Alle aufgerufenen Funktionen müssen sich im selben AWS Konto wie die aufrufende Funktion befinden.
Fortgeschrittene Muster
Verwenden Sie langlebige Funktionen, um komplexe mehrstufige Anwendungen zu erstellen, die mehrere dauerhafte Operationen, parallel Ausführung, Array-Verarbeitung, bedingte Logik und Polling kombinieren. Mit diesen Mustern können Sie anspruchsvolle Anwendungen erstellen, die viele Aufgaben koordinieren und gleichzeitig Fehlertoleranz und automatische Wiederherstellung gewährleisten.
Fortgeschrittene Muster gehen über einfache sequentielle Schritte hinaus. Sie können Operationen gleichzeitig mit diesen Primitiven ausführenparallel(), Arrays mit verarbeitenmap(), auf externe Bedingungen warten und diese Primitive kombinierenwaitForCondition(), um zuverlässige Anwendungen zu erstellen. Jeder dauerhafte Vorgang erstellt seine eigenen Prüfpunkte, sodass Ihre Anwendung bei einer Unterbrechung jederzeit wiederhergestellt werden kann.
Onboarding-Prozesse für Benutzer
Führen Sie Benutzer durch die Registrierung, E-Mail-Überprüfung, Profileinrichtung und Erstkonfiguration mit Wiederholungsversuchen. In diesem Beispiel werden aufeinanderfolgende Schritte, Rückrufe und bedingte Logik kombiniert, um einen vollständigen Onboarding-Prozess zu erstellen.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { userId, email } = event;
// Step 1: Create user account
const user = await context.step("create-account", async () => {
return await userService.createAccount(userId, email);
});
// Step 2: Send verification email
await context.step("send-verification", async () => {
return await emailService.sendVerification(email);
});
// Step 3: Wait for email verification (up to 48 hours)
const verified = await context.waitForCallback(
"email-verification",
async (callbackId) => {
await notificationService.sendVerificationLink({
email,
callbackId,
expiresIn: 172800
});
},
{
timeout: { seconds: 172800 }
}
);
if (!verified) {
await context.step("send-reminder", async () => {
await emailService.sendReminder(email);
});
return {
status: "verification_timeout",
userId,
message: "Email verification not completed within 48 hours"
};
}
// Step 4: Initialize user profile in parallel
const setupResults = await context.parallel("profile-setup", [
async (ctx: DurableContext) => {
return await ctx.step("create-preferences", async () => {
return await preferencesService.createDefaults(userId);
});
},
async (ctx: DurableContext) => {
return await ctx.step("setup-notifications", async () => {
return await notificationService.setupDefaults(userId);
});
},
async (ctx: DurableContext) => {
return await ctx.step("create-welcome-content", async () => {
return await contentService.createWelcome(userId);
});
}
]);
// Step 5: Send welcome email
await context.step("send-welcome", async () => {
const [preferences, notifications, content] = setupResults.getResults();
return await emailService.sendWelcome({
email,
preferences,
notifications,
content
});
});
return {
status: "onboarding_complete",
userId,
completedAt: new Date().toISOString()
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig
from datetime import datetime
@durable_execution
def lambda_handler(event, context: DurableContext):
user_id = event['userId']
email = event['email']
# Step 1: Create user account
user = context.step(
lambda _: user_service.create_account(user_id, email),
name='create-account'
)
# Step 2: Send verification email
context.step(
lambda _: email_service.send_verification(email),
name='send-verification'
)
# Step 3: Wait for email verification (up to 48 hours)
def send_verification_link(callback_id):
notification_service.send_verification_link({
'email': email,
'callbackId': callback_id,
'expiresIn': 172800
})
verified = context.wait_for_callback(
send_verification_link,
name='email-verification',
config=WaitConfig(timeout=172800)
)
if not verified:
context.step(
lambda _: email_service.send_reminder(email),
name='send-reminder'
)
return {
'status': 'verification_timeout',
'userId': user_id,
'message': 'Email verification not completed within 48 hours'
}
# Step 4: Initialize user profile in parallel
def create_preferences(ctx: DurableContext):
return ctx.step(
lambda _: preferences_service.create_defaults(user_id),
name='create-preferences'
)
def setup_notifications(ctx: DurableContext):
return ctx.step(
lambda _: notification_service.setup_defaults(user_id),
name='setup-notifications'
)
def create_welcome_content(ctx: DurableContext):
return ctx.step(
lambda _: content_service.create_welcome(user_id),
name='create-welcome-content'
)
setup_results = context.parallel(
[create_preferences, setup_notifications, create_welcome_content],
name='profile-setup'
)
# Step 5: Send welcome email
def send_welcome(_):
results = setup_results.get_results()
preferences, notifications, content = results[0], results[1], results[2]
return email_service.send_welcome({
'email': email,
'preferences': preferences,
'notifications': notifications,
'content': content
})
context.step(send_welcome, name='send-welcome')
return {
'status': 'onboarding_complete',
'userId': user_id,
'completedAt': datetime.now().isoformat()
}
Der Prozess kombiniert aufeinanderfolgende Schritte mit Checkpoints für die Kontoerstellung und das Senden von E-Mails und wartet dann bis zu 48 Stunden lang auf die E-Mail-Bestätigung, ohne Ressourcen zu verbrauchen. Die bedingte Logik verarbeitet unterschiedliche Pfade, je nachdem, ob die Überprüfung abgeschlossen ist oder ob das Zeitlimit überschritten wird. Aufgaben zur Profileinrichtung werden gleichzeitig mit parallel Vorgängen ausgeführt, um die Gesamtausführungszeit zu reduzieren. Jeder Schritt wird bei vorübergehenden Ausfällen automatisch wiederholt, um sicherzustellen, dass das Onboarding zuverlässig abgeschlossen wird.
Stapelverarbeitung mit Checkpoints
Verarbeiten Sie Millionen von Datensätzen mit automatischer Wiederherstellung vom letzten erfolgreichen Checkpoint nach Ausfällen. Dieses Beispiel zeigt, wie langlebige Funktionen map() Operationen mit Chunking und Ratenbegrenzung kombinieren, um umfangreiche Datenverarbeitungen zu bewältigen.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
interface Batch {
batchIndex: number;
recordIds: string[];
}
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { datasetId, batchSize = 1000 } = event;
// Step 1: Get all record IDs to process
const recordIds = await context.step("fetch-record-ids", async () => {
return await dataService.getRecordIds(datasetId);
});
// Step 2: Split into batches
const batches: Batch[] = [];
for (let i = 0; i < recordIds.length; i += batchSize) {
batches.push({
batchIndex: Math.floor(i / batchSize),
recordIds: recordIds.slice(i, i + batchSize)
});
}
// Step 3: Process batches with controlled concurrency
const batchResults = await context.map(
"process-batches",
batches,
async (ctx: DurableContext, batch: Batch, index: number) => {
const processed = await ctx.step(`batch-${batch.batchIndex}`, async () => {
const results = [];
for (const recordId of batch.recordIds) {
const result = await recordService.process(recordId);
results.push(result);
}
return results;
});
const validated = await ctx.step(`validate-${batch.batchIndex}`, async () => {
return await validationService.validateBatch(processed);
});
return {
batchIndex: batch.batchIndex,
recordCount: batch.recordIds.length,
successCount: validated.successCount,
failureCount: validated.failureCount
};
},
{
maxConcurrency: 5
}
);
// Step 4: Aggregate results
const summary = await context.step("aggregate-results", async () => {
const results = batchResults.getResults();
const totalSuccess = results.reduce((sum, r) => sum + r.successCount, 0);
const totalFailure = results.reduce((sum, r) => sum + r.failureCount, 0);
return {
datasetId,
totalRecords: recordIds.length,
batchesProcessed: batches.length,
successCount: totalSuccess,
failureCount: totalFailure,
completedAt: new Date().toISOString()
};
});
return summary;
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, MapConfig
from datetime import datetime
from typing import List, Dict
@durable_execution
def lambda_handler(event, context: DurableContext):
dataset_id = event['datasetId']
batch_size = event.get('batchSize', 1000)
# Step 1: Get all record IDs to process
record_ids = context.step(
lambda _: data_service.get_record_ids(dataset_id),
name='fetch-record-ids'
)
# Step 2: Split into batches
batches = []
for i in range(0, len(record_ids), batch_size):
batches.append({
'batchIndex': i // batch_size,
'recordIds': record_ids[i:i + batch_size]
})
# Step 3: Process batches with controlled concurrency
def process_batch(ctx: DurableContext, batch: Dict, index: int):
batch_index = batch['batchIndex']
def process_records(_):
results = []
for record_id in batch['recordIds']:
result = record_service.process(record_id)
results.append(result)
return results
processed = ctx.step(process_records, name=f'batch-{batch_index}')
validated = ctx.step(
lambda _: validation_service.validate_batch(processed),
name=f'validate-{batch_index}'
)
return {
'batchIndex': batch_index,
'recordCount': len(batch['recordIds']),
'successCount': validated['successCount'],
'failureCount': validated['failureCount']
}
batch_results = context.map(
process_batch,
batches,
name='process-batches',
config=MapConfig(max_concurrency=5)
)
# Step 4: Aggregate results
def aggregate_results(_):
results = batch_results.get_results()
total_success = sum(r['successCount'] for r in results)
total_failure = sum(r['failureCount'] for r in results)
return {
'datasetId': dataset_id,
'totalRecords': len(record_ids),
'batchesProcessed': len(batches),
'successCount': total_success,
'failureCount': total_failure,
'completedAt': datetime.now().isoformat()
}
summary = context.step(aggregate_results, name='aggregate-results')
return summary
Datensätze werden in überschaubare Stapel aufgeteilt, um eine Überlastung des Speichers oder nachgelagerte Dienste zu vermeiden. Anschließend werden mehrere Batches gleichzeitig verarbeitet, wobei die Parallelität kontrolliert wird. maxConcurrency Jeder Stapel hat seinen eigenen Prüfpunkt, sodass bei Fehlern nur der fehlgeschlagene Stapel wiederholt wird, anstatt alle Datensätze erneut zu verarbeiten. Dieses Muster ist ideal für ETL-Jobs, Datenmigrationen oder Massenvorgänge, bei denen die Verarbeitung Stunden dauern kann.
Nächste Schritte