Ejemplos y casos de uso - AWS Lambda

Ejemplos y casos de uso

Las funciones duraderas de Lambda le permiten crear aplicaciones de varios pasos con tolerancia a errores mediante operaciones duraderas como pasos y esperas. Con la creación automática de puntos de control y un modelo de reproducción de puntos de control, en el que la ejecución se reinicia desde el principio tras un error, pero omite los puntos de control completados, sus funciones pueden recuperarse de los errores y reanudar la ejecución sin perder el progreso.

Procesos tolerantes a errores de corta duración

Utilice funciones duraderas para crear operaciones fiables que, por lo general, se completan en cuestión de minutos. Si bien estos procesos son más cortos que los flujos de trabajo de larga duración, se benefician de la creación automática de puntos de control y de la tolerancia a errores en todos los sistemas distribuidos. Las funciones duraderas garantizan que sus procesos de varios pasos se completen correctamente incluso cuando las llamadas de servicio individuales fallan, sin necesidad de una compleja gestión de errores ni de un código de gestión del estado.

Los escenarios más comunes incluyen los sistemas de reserva de hoteles, las plataformas de reserva de restaurantes, las solicitudes de viajes compartidos, la compra de entradas para eventos y las actualizaciones de suscripciones de SaaS. Estos escenarios comparten características comunes: varias llamadas de servicio que deben completarse juntas, la necesidad de reintento automático en caso de errores transitorios y el requisito de mantener un estado uniforme en todos los sistemas distribuidos.

Transacciones distribuidas en todos los microservicios

Coordine los pagos, el inventario y los envíos entre varios servicios con la reversión automática de los errores. Cada operación de servicio se encapsula en un paso, lo que garantiza que la transacción se pueda recuperar desde cualquier punto en caso de que un servicio falle.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { orderId, amount, items } = event; // Reserve inventory across multiple warehouses const inventory = await context.step("reserve-inventory", async () => { return await inventoryService.reserve(items); }); // Process payment const payment = await context.step("process-payment", async () => { return await paymentService.charge(amount); }); // Create shipment const shipment = await context.step("create-shipment", async () => { return await shippingService.createShipment(orderId, inventory); }); return { orderId, status: 'completed', shipment }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution @durable_execution def lambda_handler(event, context: DurableContext): order_id = event['orderId'] amount = event['amount'] items = event['items'] # Reserve inventory across multiple warehouses inventory = context.step( lambda _: inventory_service.reserve(items), name='reserve-inventory' ) # Process payment payment = context.step( lambda _: payment_service.charge(amount), name='process-payment' ) # Create shipment shipment = context.step( lambda _: shipping_service.create_shipment(order_id, inventory), name='create-shipment' ) return {'orderId': order_id, 'status': 'completed', 'shipment': shipment}

Si algún paso falla, la función lo reintenta automáticamente desde el último punto de control correcto. La reserva de inventario se mantiene aunque el procesamiento del pago falle temporalmente. Cuando la función hace un reintento, se salta el paso de inventario completado y pasa directamente al procesamiento del pago. Esto elimina las reservas duplicadas y garantiza un estado uniforme en todo el sistema distribuido.

Procesamiento de pedidos en varios pasos

Procese los pedidos mediante la validación, la autorización de pago, la asignación de inventario y el cumplimiento con reintentos y recuperación automáticos. Cada paso tiene un punto de control, lo que garantiza que el pedido avance incluso si los pasos individuales fallan y se reintentan.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { orderId, customerId, items } = event; // Validate order details const validation = await context.step("validate-order", async () => { const customer = await customerService.validate(customerId); const itemsValid = await inventoryService.validateItems(items); return { customer, itemsValid }; }); if (!validation.itemsValid) { return { orderId, status: 'rejected', reason: 'invalid_items' }; } // Authorize payment const authorization = await context.step("authorize-payment", async () => { return await paymentService.authorize( validation.customer.paymentMethod, calculateTotal(items) ); }); // Allocate inventory const allocation = await context.step("allocate-inventory", async () => { return await inventoryService.allocate(items); }); // Fulfill order const fulfillment = await context.step("fulfill-order", async () => { return await fulfillmentService.createShipment({ orderId, items: allocation.allocatedItems, address: validation.customer.shippingAddress }); }); return { orderId, status: 'completed', trackingNumber: fulfillment.trackingNumber }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution @durable_execution def lambda_handler(event, context: DurableContext): order_id = event['orderId'] customer_id = event['customerId'] items = event['items'] # Validate order details def validate_order(_): customer = customer_service.validate(customer_id) items_valid = inventory_service.validate_items(items) return {'customer': customer, 'itemsValid': items_valid} validation = context.step(validate_order, name='validate-order') if not validation['itemsValid']: return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_items'} # Authorize payment authorization = context.step( lambda _: payment_service.authorize( validation['customer']['paymentMethod'], calculate_total(items) ), name='authorize-payment' ) # Allocate inventory allocation = context.step( lambda _: inventory_service.allocate(items), name='allocate-inventory' ) # Fulfill order fulfillment = context.step( lambda _: fulfillment_service.create_shipment({ 'orderId': order_id, 'items': allocation['allocatedItems'], 'address': validation['customer']['shippingAddress'] }), name='fulfill-order' ) return { 'orderId': order_id, 'status': 'completed', 'trackingNumber': fulfillment['trackingNumber'] }

Este patrón garantiza que los pedidos nunca se atasquen en estados intermedios. Si la validación falla, el pedido se rechaza antes de la autorización de pago. Si se produce un error en la autorización de pago, el inventario no se asigna. Cada paso se basa en el anterior con un reintento y una recuperación automáticos.

Nota

La comprobación condicional if (!validation.itemsValid) está fuera de un paso y volverá a ejecutarse durante la reproducción. Esto es seguro porque es determinista: siempre produce el mismo resultado con el mismo objeto de validación.

Procesos de larga duración

Utilice funciones duraderas para procesos que abarquen horas, días o semanas. Las operaciones de espera suspenden la ejecución sin incurrir en gastos de cómputo, lo que hace que los procesos de larga duración sean rentables. Durante los períodos de espera, la función deja de ejecutarse y Lambda recicla el entorno de ejecución. Cuando llega el momento de reanudarla, Lambda vuelve a invocar la función y la reproduce desde el último punto de control.

Este modelo de ejecución hace que las funciones duraderas sean ideales para los procesos que necesitan pausarse durante períodos prolongados, ya sea a la espera de una decisión humana, respuestas de un sistema externo, períodos de procesamiento programados o retrasos basados en el tiempo. Solo paga por el tiempo de cómputo activo, no por la espera.

Los escenarios más frecuentes incluyen los procesos de aprobación de documentos, el procesamiento por lotes programado, los procesos de incorporación de varios días, los procesos de prueba de suscripciones y los sistemas de notificación diferida. Estos escenarios comparten características comunes: períodos de espera prolongados medidos en horas o días, la necesidad de mantener el estado de ejecución durante esas esperas y requisitos que tengan en cuenta los costos, en los que pagar por el tiempo de cómputo inactivo es prohibitivo.

Aprobaciones con intervención humana

Pause la ejecución para revisar o aprobar documentos o tomar decisiones sobre estos y, al mismo tiempo, mantenga el estado de ejecución. La función espera las devoluciones de llamada externas sin consumir recursos y se reanuda automáticamente cuando se recibe la aprobación.

Este patrón es esencial para los procesos que requieren el juicio humano o la validación externa. La función se suspende en el punto de devolución de llamada, por lo que no se incurre en gastos de cómputo durante la espera. Cuando alguien envía su decisión a través de la API, Lambda vuelve a invocar la función y la reproduce desde el punto de control; de este modo, continúa con el resultado de la aprobación.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { documentId, reviewers } = event; // Step 1: Prepare document for review const prepared = await context.step("prepare-document", async () => { return await documentService.prepare(documentId); }); // Step 2: Request approval with callback const approval = await context.waitForCallback( "approval-callback", async (callbackId) => { await notificationService.sendApprovalRequest({ documentId, reviewers, callbackId, expiresIn: 86400 }); }, { timeout: { seconds: 86400 } } ); // Function resumes here when approval is received if (approval?.approved) { const finalized = await context.step("finalize-document", async () => { return await documentService.finalize(documentId, approval.comments); }); return { status: 'approved', documentId, finalizedAt: finalized.timestamp }; } // Handle rejection await context.step("archive-rejected", async () => { await documentService.archive(documentId, approval?.reason); }); return { status: 'rejected', documentId, reason: approval?.reason }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig @durable_execution def lambda_handler(event, context: DurableContext): document_id = event['documentId'] reviewers = event['reviewers'] # Step 1: Prepare document for review prepared = context.step( lambda _: document_service.prepare(document_id), name='prepare-document' ) # Step 2: Request approval with callback def send_approval_request(callback_id): notification_service.send_approval_request({ 'documentId': document_id, 'reviewers': reviewers, 'callbackId': callback_id, 'expiresIn': 86400 }) approval = context.wait_for_callback( send_approval_request, name='approval-callback', config=WaitConfig(timeout=86400) ) # Function resumes here when approval is received if approval and approval.get('approved'): finalized = context.step( lambda _: document_service.finalize(document_id, approval.get('comments')), name='finalize-document' ) return { 'status': 'approved', 'documentId': document_id, 'finalizedAt': finalized['timestamp'] } # Handle rejection context.step( lambda _: document_service.archive(document_id, approval.get('reason') if approval else None), name='archive-rejected' ) return { 'status': 'rejected', 'documentId': document_id, 'reason': approval.get('reason') if approval else None }

Cuando se recibe la llamada y la función se reanuda, se reproduce desde el principio. El paso de preparación del documento devuelve el resultado del punto de control al instante. La operación waitForCallback también regresa al instante con el resultado de la aprobación almacenado, en lugar de tener que volver a esperar. Luego, la ejecución continúa con los pasos de finalización o archivo.

Canalizaciones de datos de varias etapas

Procese conjuntos de datos de gran tamaño durante las fases de extracción, transformación y carga con puntos de control entre las etapas. Cada etapa puede tardar horas en completarse, y los puntos de control garantizan que la canalización pueda reanudarse desde cualquier etapa si se interrumpe.

Este patrón es ideal para los flujos de trabajo de ETL, las migraciones de datos o los trabajos de procesamiento por lotes en los que es necesario procesar los datos por etapas con puntos de recuperación entre ellos. Si una etapa falla, la canalización se reanuda desde la última etapa completada en lugar de reiniciarse desde el principio. También puede utilizar las operaciones de espera para hacer pausas entre etapas, respetando los límites de tasa, esperando a que los sistemas posteriores estén listos o programando el procesamiento durante las horas de menor actividad.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { datasetId, batchSize } = event; // Stage 1: Extract data from source const extracted = await context.step("extract-data", async () => { const records = await sourceDatabase.extractRecords(datasetId); return { recordCount: records.length, records }; }); // Wait 5 minutes to respect source system rate limits await context.wait({ seconds: 300 }); // Stage 2: Transform data in batches const transformed = await context.step("transform-data", async () => { const batches = chunkArray(extracted.records, batchSize); const results = []; for (const batch of batches) { const transformed = await transformService.processBatch(batch); results.push(transformed); } return { batchCount: batches.length, results }; }); // Wait until off-peak hours (e.g., 2 AM) const now = new Date(); const targetHour = 2; const msUntilTarget = calculateMsUntilHour(now, targetHour); await context.wait({ seconds: Math.floor(msUntilTarget / 1000) }); // Stage 3: Load data to destination const loaded = await context.step("load-data", async () => { let loadedCount = 0; for (const result of transformed.results) { await destinationDatabase.loadBatch(result); loadedCount += result.length; } return { loadedCount }; }); // Stage 4: Verify and finalize const verified = await context.step("verify-pipeline", async () => { const verification = await destinationDatabase.verifyRecords(datasetId); await pipelineService.markComplete(datasetId, verification); return verification; }); return { datasetId, recordsProcessed: extracted.recordCount, batchesProcessed: transformed.batchCount, recordsLoaded: loaded.loadedCount, verified: verified.success }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution from datetime import datetime @durable_execution def lambda_handler(event, context: DurableContext): dataset_id = event['datasetId'] batch_size = event['batchSize'] # Stage 1: Extract data from source def extract_data(_): records = source_database.extract_records(dataset_id) return {'recordCount': len(records), 'records': records} extracted = context.step(extract_data, name='extract-data') # Wait 5 minutes to respect source system rate limits context.wait(300) # Stage 2: Transform data in batches def transform_data(_): batches = chunk_array(extracted['records'], batch_size) results = [] for batch in batches: transformed = transform_service.process_batch(batch) results.append(transformed) return {'batchCount': len(batches), 'results': results} transformed = context.step(transform_data, name='transform-data') # Wait until off-peak hours (e.g., 2 AM) now = datetime.now() target_hour = 2 ms_until_target = calculate_ms_until_hour(now, target_hour) context.wait(ms_until_target // 1000) # Stage 3: Load data to destination def load_data(_): loaded_count = 0 for result in transformed['results']: destination_database.load_batch(result) loaded_count += len(result) return {'loadedCount': loaded_count} loaded = context.step(load_data, name='load-data') # Stage 4: Verify and finalize def verify_pipeline(_): verification = destination_database.verify_records(dataset_id) pipeline_service.mark_complete(dataset_id, verification) return verification verified = context.step(verify_pipeline, name='verify-pipeline') return { 'datasetId': dataset_id, 'recordsProcessed': extracted['recordCount'], 'batchesProcessed': transformed['batchCount'], 'recordsLoaded': loaded['loadedCount'], 'verified': verified['success'] }

Cada etapa se encapsula en un paso, lo que crea un punto de control que permite que la canalización se reanude desde cualquier etapa si se interrumpe. La espera de 5 minutos entre la extracción y la transformación respeta los límites de tasa del sistema de origen sin consumir recursos de cómputo, mientras que la espera hasta las 2 de la madrugada programa la costosa operación de carga fuera de las horas pico.

Nota

La llamada new Date() y la función calculateMsUntilHour() están fuera de los pasos y volverán a ejecutarse durante la reproducción. En el caso de las operaciones basadas en el tiempo que deben ser coherentes en todas las reproducciones, calcule la marca de tiempo dentro de un paso o utilícela solo para los períodos de espera (que vienen determinados por puntos de control).

Invocaciones encadenadas entre funciones

Invoque otras funciones de Lambda desde dentro de una función duradera mediante context.invoke(). La función de llamada se suspende mientras espera que se complete la función invocada, lo que crea un punto de control que conserva el resultado. Si la función de llamada se interrumpe una vez completada la función invocada, se reanudará con el resultado almacenado sin volver a invocar la función.

Utilice este patrón cuando tenga funciones especializadas que gestionen dominios específicos (validación de clientes, procesamiento de pagos, gestión de inventario) y necesite coordinarlas en un flujo de trabajo. Cada función mantiene su propia lógica y puede ser invocada por varias funciones del orquestador, lo que evita la duplicación de código.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; // Main orchestrator function export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { orderId, customerId } = event; // Step 1: Validate customer by invoking customer service function const customer = await context.invoke( "validate-customer", "arn:aws:lambda:us-east-1:123456789012:function:customer-service:1", { customerId } ); if (!customer.isValid) { return { orderId, status: "rejected", reason: "invalid_customer" }; } // Step 2: Check inventory by invoking inventory service function const inventory = await context.invoke( "check-inventory", "arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1", { orderId, items: event.items } ); if (!inventory.available) { return { orderId, status: "rejected", reason: "insufficient_inventory" }; } // Step 3: Process payment by invoking payment service function const payment = await context.invoke( "process-payment", "arn:aws:lambda:us-east-1:123456789012:function:payment-service:1", { customerId, amount: inventory.totalAmount, paymentMethod: customer.paymentMethod } ); // Step 4: Create shipment by invoking fulfillment service function const shipment = await context.invoke( "create-shipment", "arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1", { orderId, items: inventory.allocatedItems, address: customer.shippingAddress } ); return { orderId, status: "completed", trackingNumber: shipment.trackingNumber, estimatedDelivery: shipment.estimatedDelivery }; } );
Python
from aws_durable_execution_sdk_python import DurableContext, durable_execution # Main orchestrator function @durable_execution def lambda_handler(event, context: DurableContext): order_id = event['orderId'] customer_id = event['customerId'] # Step 1: Validate customer by invoking customer service function customer = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:customer-service:1', {'customerId': customer_id}, name='validate-customer' ) if not customer['isValid']: return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_customer'} # Step 2: Check inventory by invoking inventory service function inventory = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1', {'orderId': order_id, 'items': event['items']}, name='check-inventory' ) if not inventory['available']: return {'orderId': order_id, 'status': 'rejected', 'reason': 'insufficient_inventory'} # Step 3: Process payment by invoking payment service function payment = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:payment-service:1', { 'customerId': customer_id, 'amount': inventory['totalAmount'], 'paymentMethod': customer['paymentMethod'] }, name='process-payment' ) # Step 4: Create shipment by invoking fulfillment service function shipment = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1', { 'orderId': order_id, 'items': inventory['allocatedItems'], 'address': customer['shippingAddress'] }, name='create-shipment' ) return { 'orderId': order_id, 'status': 'completed', 'trackingNumber': shipment['trackingNumber'], 'estimatedDelivery': shipment['estimatedDelivery'] }

Cada invocación crea un punto de control en la función del orquestador. Si el orquestador se interrumpe una vez finalizada la validación del cliente, se reanudará desde ese punto de control con los datos del cliente almacenados y omitirá la invocación de validación. De este modo, se evitan las llamadas duplicadas a los servicios posteriores y se garantiza una ejecución uniforme en todas las interrupciones.

Las funciones invocadas pueden ser funciones duraderas o estándar de Lambda. Si invoca una función duradera, puede tener su propio flujo de trabajo de varios pasos con esperas y puntos de control. El orquestador simplemente espera a que finalice la ejecución duradera completa para recibir el resultado final.

nota

No se admiten invocaciones entre cuentas. Todas las funciones invocadas deben estar en la misma cuenta de AWS que la función de llamada.

Patrones avanzados

Utilice funciones duraderas para crear aplicaciones complejas de varios pasos que combinen múltiples operaciones duraderas, ejecución paralela, procesamiento de matrices, lógica condicional y sondeos. Estos patrones le permiten crear aplicaciones sofisticadas que coordinan muchas tareas y, al mismo tiempo, mantienen la tolerancia a errores y la recuperación automática.

Los patrones avanzados van más allá de simples pasos secuenciales. Puede ejecutar operaciones simultáneamente con parallel(), procesar matrices con map(), esperar a que se produzcan condiciones externas con waitForCondition() y combinar estas primitivas para crear aplicaciones fiables. Cada operación duradera crea sus propios puntos de control, por lo que la aplicación puede recuperarse desde cualquier punto en caso de interrupción.

Procesos de incorporación de usuarios

Guíe a los usuarios a través del registro, la verificación del correo electrónico, la configuración del perfil y la configuración inicial mediante la gestión de reintentos. En este ejemplo, se combinan pasos secuenciales, devoluciones de llamada y lógica condicional para crear un proceso de incorporación completo.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { userId, email } = event; // Step 1: Create user account const user = await context.step("create-account", async () => { return await userService.createAccount(userId, email); }); // Step 2: Send verification email await context.step("send-verification", async () => { return await emailService.sendVerification(email); }); // Step 3: Wait for email verification (up to 48 hours) const verified = await context.waitForCallback( "email-verification", async (callbackId) => { await notificationService.sendVerificationLink({ email, callbackId, expiresIn: 172800 }); }, { timeout: { seconds: 172800 } } ); if (!verified) { await context.step("send-reminder", async () => { await emailService.sendReminder(email); }); return { status: "verification_timeout", userId, message: "Email verification not completed within 48 hours" }; } // Step 4: Initialize user profile in parallel const setupResults = await context.parallel("profile-setup", [ async (ctx: DurableContext) => { return await ctx.step("create-preferences", async () => { return await preferencesService.createDefaults(userId); }); }, async (ctx: DurableContext) => { return await ctx.step("setup-notifications", async () => { return await notificationService.setupDefaults(userId); }); }, async (ctx: DurableContext) => { return await ctx.step("create-welcome-content", async () => { return await contentService.createWelcome(userId); }); } ]); // Step 5: Send welcome email await context.step("send-welcome", async () => { const [preferences, notifications, content] = setupResults.getResults(); return await emailService.sendWelcome({ email, preferences, notifications, content }); }); return { status: "onboarding_complete", userId, completedAt: new Date().toISOString() }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig from datetime import datetime @durable_execution def lambda_handler(event, context: DurableContext): user_id = event['userId'] email = event['email'] # Step 1: Create user account user = context.step( lambda _: user_service.create_account(user_id, email), name='create-account' ) # Step 2: Send verification email context.step( lambda _: email_service.send_verification(email), name='send-verification' ) # Step 3: Wait for email verification (up to 48 hours) def send_verification_link(callback_id): notification_service.send_verification_link({ 'email': email, 'callbackId': callback_id, 'expiresIn': 172800 }) verified = context.wait_for_callback( send_verification_link, name='email-verification', config=WaitConfig(timeout=172800) ) if not verified: context.step( lambda _: email_service.send_reminder(email), name='send-reminder' ) return { 'status': 'verification_timeout', 'userId': user_id, 'message': 'Email verification not completed within 48 hours' } # Step 4: Initialize user profile in parallel def create_preferences(ctx: DurableContext): return ctx.step( lambda _: preferences_service.create_defaults(user_id), name='create-preferences' ) def setup_notifications(ctx: DurableContext): return ctx.step( lambda _: notification_service.setup_defaults(user_id), name='setup-notifications' ) def create_welcome_content(ctx: DurableContext): return ctx.step( lambda _: content_service.create_welcome(user_id), name='create-welcome-content' ) setup_results = context.parallel( [create_preferences, setup_notifications, create_welcome_content], name='profile-setup' ) # Step 5: Send welcome email def send_welcome(_): results = setup_results.get_results() preferences, notifications, content = results[0], results[1], results[2] return email_service.send_welcome({ 'email': email, 'preferences': preferences, 'notifications': notifications, 'content': content }) context.step(send_welcome, name='send-welcome') return { 'status': 'onboarding_complete', 'userId': user_id, 'completedAt': datetime.now().isoformat() }

El proceso combina pasos secuenciales con puntos de control para la creación de cuentas y el envío de correos electrónicos, y luego se detiene hasta 48 horas a la espera de que se verifique el correo electrónico sin consumir recursos. La lógica condicional gestiona diferentes rutas en función de si la verificación se completa o si se agota el tiempo de espera. Las tareas de configuración de perfiles se ejecutan simultáneamente mediante operaciones en paralelo para reducir el tiempo total de ejecución, y cada paso se reintenta automáticamente en caso de errores transitorios para garantizar que la incorporación se complete de forma fiable.

Procesamiento por lotes con puntos de control

Procese millones de registros con recuperación automática desde el último punto de control exitoso después de un error. Este ejemplo demuestra cómo las funciones duraderas combinan las operaciones de map() con la fragmentación y la limitación de tasa para gestionar el procesamiento de datos a gran escala.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; interface Batch { batchIndex: number; recordIds: string[]; } export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { datasetId, batchSize = 1000 } = event; // Step 1: Get all record IDs to process const recordIds = await context.step("fetch-record-ids", async () => { return await dataService.getRecordIds(datasetId); }); // Step 2: Split into batches const batches: Batch[] = []; for (let i = 0; i < recordIds.length; i += batchSize) { batches.push({ batchIndex: Math.floor(i / batchSize), recordIds: recordIds.slice(i, i + batchSize) }); } // Step 3: Process batches with controlled concurrency const batchResults = await context.map( "process-batches", batches, async (ctx: DurableContext, batch: Batch, index: number) => { const processed = await ctx.step(`batch-${batch.batchIndex}`, async () => { const results = []; for (const recordId of batch.recordIds) { const result = await recordService.process(recordId); results.push(result); } return results; }); const validated = await ctx.step(`validate-${batch.batchIndex}`, async () => { return await validationService.validateBatch(processed); }); return { batchIndex: batch.batchIndex, recordCount: batch.recordIds.length, successCount: validated.successCount, failureCount: validated.failureCount }; }, { maxConcurrency: 5 } ); // Step 4: Aggregate results const summary = await context.step("aggregate-results", async () => { const results = batchResults.getResults(); const totalSuccess = results.reduce((sum, r) => sum + r.successCount, 0); const totalFailure = results.reduce((sum, r) => sum + r.failureCount, 0); return { datasetId, totalRecords: recordIds.length, batchesProcessed: batches.length, successCount: totalSuccess, failureCount: totalFailure, completedAt: new Date().toISOString() }; }); return summary; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution, MapConfig from datetime import datetime from typing import List, Dict @durable_execution def lambda_handler(event, context: DurableContext): dataset_id = event['datasetId'] batch_size = event.get('batchSize', 1000) # Step 1: Get all record IDs to process record_ids = context.step( lambda _: data_service.get_record_ids(dataset_id), name='fetch-record-ids' ) # Step 2: Split into batches batches = [] for i in range(0, len(record_ids), batch_size): batches.append({ 'batchIndex': i // batch_size, 'recordIds': record_ids[i:i + batch_size] }) # Step 3: Process batches with controlled concurrency def process_batch(ctx: DurableContext, batch: Dict, index: int): batch_index = batch['batchIndex'] def process_records(_): results = [] for record_id in batch['recordIds']: result = record_service.process(record_id) results.append(result) return results processed = ctx.step(process_records, name=f'batch-{batch_index}') validated = ctx.step( lambda _: validation_service.validate_batch(processed), name=f'validate-{batch_index}' ) return { 'batchIndex': batch_index, 'recordCount': len(batch['recordIds']), 'successCount': validated['successCount'], 'failureCount': validated['failureCount'] } batch_results = context.map( process_batch, batches, name='process-batches', config=MapConfig(max_concurrency=5) ) # Step 4: Aggregate results def aggregate_results(_): results = batch_results.get_results() total_success = sum(r['successCount'] for r in results) total_failure = sum(r['failureCount'] for r in results) return { 'datasetId': dataset_id, 'totalRecords': len(record_ids), 'batchesProcessed': len(batches), 'successCount': total_success, 'failureCount': total_failure, 'completedAt': datetime.now().isoformat() } summary = context.step(aggregate_results, name='aggregate-results') return summary

Los registros se dividen en lotes administrables para evitar sobrecargar la memoria o los servicios posteriores. A continuación, se procesan varios lotes simultáneamente con maxConcurrency en control del paralelismo. Cada lote tiene su propio punto de control, por lo que los errores reintentan solo el lote fallido en lugar de volver a procesar todos los registros. Este patrón es ideal para trabajos de ETL, migraciones de datos u operaciones masivas en las que el procesamiento puede tardar horas.

Siguientes pasos