Casos de uso e exemplos - AWS Lambda

Casos de uso e exemplos

As funções duráveis do Lambda permitem que você crie aplicações com tolerância a falhas de várias etapas usando operações duráveis, como etapas e esperas. Com o ponto de verificação automático e um modelo de reprodução de ponto de verificação, em que a execução é reiniciada do início após a falha, mas ignora os pontos de verificação concluídos, suas funções podem se recuperar das falhas e retomar a execução sem perder o progresso.

Processos com tolerância a falhas de curta duração

Use funções duráveis para criar operações confiáveis que normalmente são concluídas em minutos. Embora esses processos sejam mais curtos do que os fluxos de trabalho de longa duração, eles ainda se beneficiam da verificação automática e da tolerância a falhas em sistemas distribuídos. Funções duráveis garantem que seus processos de várias etapas sejam concluídos com êxito, mesmo quando as chamadas de serviço individuais falham, sem exigir tratamento complexo de erros ou código de gerenciamento de estado.

Os cenários comuns incluem sistemas de reserva de hotéis, plataformas de reserva de restaurantes, solicitações de viagens compartilhadas, compra de ingressos para eventos e atualizações de assinaturas de SaaS. Esses cenários compartilham características comuns: várias chamadas de serviço que devem ser concluídas juntas, a necessidade de nova tentativa automática em caso de falhas transitórias e a exigência de manter um estado consistente em todos os sistemas distribuídos.

Transações distribuídas entre microsserviços

Coordene pagamentos, inventário e envios em vários serviços com reversão automática em caso de falhas. Cada operação de serviço é agrupada em uma etapa, garantindo que a transação possa ser recuperada a qualquer momento se um serviço falhar.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { orderId, amount, items } = event; // Reserve inventory across multiple warehouses const inventory = await context.step("reserve-inventory", async () => { return await inventoryService.reserve(items); }); // Process payment const payment = await context.step("process-payment", async () => { return await paymentService.charge(amount); }); // Create shipment const shipment = await context.step("create-shipment", async () => { return await shippingService.createShipment(orderId, inventory); }); return { orderId, status: 'completed', shipment }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution @durable_execution def lambda_handler(event, context: DurableContext): order_id = event['orderId'] amount = event['amount'] items = event['items'] # Reserve inventory across multiple warehouses inventory = context.step( lambda _: inventory_service.reserve(items), name='reserve-inventory' ) # Process payment payment = context.step( lambda _: payment_service.charge(amount), name='process-payment' ) # Create shipment shipment = context.step( lambda _: shipping_service.create_shipment(order_id, inventory), name='create-shipment' ) return {'orderId': order_id, 'status': 'completed', 'shipment': shipment}

Se alguma etapa falhar, a função se repetirá automaticamente a partir do último ponto de verificação com êxito. A reserva de inventário persiste mesmo se o processamento do pagamento falhar temporariamente. Quando a função tenta novamente, ela pula a etapa de inventário concluída e prossegue diretamente para o processamento do pagamento. Isso elimina reservas duplicadas e garante um estado consistente em todo o sistema distribuído.

Processamento de pedidos com várias etapas

Processe pedidos por meio de validação, autorização de pagamento, alocação de estoque e atendimento com nova tentativa e recuperação automáticas. Cada etapa é verificada, garantindo que o pedido avance mesmo que as etapas individuais falhem e sejam tentadas novamente.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { orderId, customerId, items } = event; // Validate order details const validation = await context.step("validate-order", async () => { const customer = await customerService.validate(customerId); const itemsValid = await inventoryService.validateItems(items); return { customer, itemsValid }; }); if (!validation.itemsValid) { return { orderId, status: 'rejected', reason: 'invalid_items' }; } // Authorize payment const authorization = await context.step("authorize-payment", async () => { return await paymentService.authorize( validation.customer.paymentMethod, calculateTotal(items) ); }); // Allocate inventory const allocation = await context.step("allocate-inventory", async () => { return await inventoryService.allocate(items); }); // Fulfill order const fulfillment = await context.step("fulfill-order", async () => { return await fulfillmentService.createShipment({ orderId, items: allocation.allocatedItems, address: validation.customer.shippingAddress }); }); return { orderId, status: 'completed', trackingNumber: fulfillment.trackingNumber }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution @durable_execution def lambda_handler(event, context: DurableContext): order_id = event['orderId'] customer_id = event['customerId'] items = event['items'] # Validate order details def validate_order(_): customer = customer_service.validate(customer_id) items_valid = inventory_service.validate_items(items) return {'customer': customer, 'itemsValid': items_valid} validation = context.step(validate_order, name='validate-order') if not validation['itemsValid']: return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_items'} # Authorize payment authorization = context.step( lambda _: payment_service.authorize( validation['customer']['paymentMethod'], calculate_total(items) ), name='authorize-payment' ) # Allocate inventory allocation = context.step( lambda _: inventory_service.allocate(items), name='allocate-inventory' ) # Fulfill order fulfillment = context.step( lambda _: fulfillment_service.create_shipment({ 'orderId': order_id, 'items': allocation['allocatedItems'], 'address': validation['customer']['shippingAddress'] }), name='fulfill-order' ) return { 'orderId': order_id, 'status': 'completed', 'trackingNumber': fulfillment['trackingNumber'] }

Esse padrão garante que os pedidos nunca fiquem presos em estados intermediários. Se a validação falhar, o pedido será rejeitado antes da autorização de pagamento. Se a autorização de pagamento falhar, o inventário não será alocado. Cada etapa se desenvolve a partir da anterior com nova tentativa e recuperação automáticas.

Observação

A verificação condicional if (!validation.itemsValid) está fora de uma etapa e será reexecutada durante a reprodução. Isso é seguro porque é determinístico: sempre produz o mesmo resultado com o mesmo objeto de validação.

Processos de longa execução

Use funções duráveis para processos que se estendem por horas, dias ou semanas. As operações de espera suspendem a execução sem incorrer em custos computacionais, tornando os processos de longa execução econômicos. Durante os períodos de espera, sua função interrompe a execução e o Lambda recicla o ambiente de execução. Quando chega a hora de retomar, o Lambda invoca sua função novamente e a reproduz a partir do último ponto de verificação.

Esse modelo de execução torna as funções duráveis ideais para processos que precisem ser pausados por longos períodos, seja aguardando decisões humanas, respostas externas do sistema, janelas de processamento programadas ou retardos baseados em tempo. Você paga somente pelo tempo de computação ativo, não pela espera.

Os cenários comuns incluem processos de aprovação de documentos, processamento em lote programado, processos de integração de vários dias, processos de teste de assinatura e sistemas de notificação atrasada. Esses cenários compartilham características comuns: períodos de espera estendidos medidos em horas ou dias, a necessidade de manter o estado de execução em todas essas esperas e requisitos econômicos em que pagar pelo tempo de computação ocioso é proibitivo.

Aprovações conm humano no loop

Pause a execução para revisões, aprovações ou decisões de documentos, mantendo o estado da execução. A função aguarda retornos de chamada externos sem consumir recursos, sendo retomada automaticamente quando a aprovação é recebida.

Esse padrão é essencial para processos que exijam julgamento humano ou validação externa. A função é suspensa no ponto de retorno de chamada, sem custos computacionais durante a espera. Quando alguém envia sua decisão via API, o Lambda invoca sua função novamente e a reproduz a partir do ponto de verificação, continuando com o resultado da aprovação.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { documentId, reviewers } = event; // Step 1: Prepare document for review const prepared = await context.step("prepare-document", async () => { return await documentService.prepare(documentId); }); // Step 2: Request approval with callback const approval = await context.waitForCallback( "approval-callback", async (callbackId) => { await notificationService.sendApprovalRequest({ documentId, reviewers, callbackId, expiresIn: 86400 }); }, { timeout: { seconds: 86400 } } ); // Function resumes here when approval is received if (approval?.approved) { const finalized = await context.step("finalize-document", async () => { return await documentService.finalize(documentId, approval.comments); }); return { status: 'approved', documentId, finalizedAt: finalized.timestamp }; } // Handle rejection await context.step("archive-rejected", async () => { await documentService.archive(documentId, approval?.reason); }); return { status: 'rejected', documentId, reason: approval?.reason }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig @durable_execution def lambda_handler(event, context: DurableContext): document_id = event['documentId'] reviewers = event['reviewers'] # Step 1: Prepare document for review prepared = context.step( lambda _: document_service.prepare(document_id), name='prepare-document' ) # Step 2: Request approval with callback def send_approval_request(callback_id): notification_service.send_approval_request({ 'documentId': document_id, 'reviewers': reviewers, 'callbackId': callback_id, 'expiresIn': 86400 }) approval = context.wait_for_callback( send_approval_request, name='approval-callback', config=WaitConfig(timeout=86400) ) # Function resumes here when approval is received if approval and approval.get('approved'): finalized = context.step( lambda _: document_service.finalize(document_id, approval.get('comments')), name='finalize-document' ) return { 'status': 'approved', 'documentId': document_id, 'finalizedAt': finalized['timestamp'] } # Handle rejection context.step( lambda _: document_service.archive(document_id, approval.get('reason') if approval else None), name='archive-rejected' ) return { 'status': 'rejected', 'documentId': document_id, 'reason': approval.get('reason') if approval else None }

Quando o retorno de chamada é recebido e sua função é retomada, ela é reproduzida desde o início. A etapa de preparação do documento retorna instantaneamente o resultado de verificação. A operação waitForCallback também retorna instantaneamente com o resultado da aprovação armazenado, em vez de esperar novamente. A execução então continua até as etapas de finalização ou arquivamento.

Pipelines de dados de vários estágios

Processe grandes conjuntos de dados por meio de fases de extração, transformação e carregamento com pontos de verificação entre os estágios. Cada estágio pode levar horas para ser concluído, e os pontos de verificação garantem que o pipeline possa ser retomado a partir de qualquer estágio se for interrompido.

Esse padrão é ideal para fluxos de trabalho de ETL, migrações de dados ou trabalhos de processamento em lote em que você precise processar dados em estágios com pontos de recuperação entre eles. Se um estágio falhar, o pipeline é retomado a partir do último estágio concluído, em vez de ser reiniciado do início. Também é possível usar as operações de espera para pausar entre os estágios, respeitando os limites de taxa, esperando que os sistemas posteriores estejam prontos ou programando o processamento fora do horário de pico.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { datasetId, batchSize } = event; // Stage 1: Extract data from source const extracted = await context.step("extract-data", async () => { const records = await sourceDatabase.extractRecords(datasetId); return { recordCount: records.length, records }; }); // Wait 5 minutes to respect source system rate limits await context.wait({ seconds: 300 }); // Stage 2: Transform data in batches const transformed = await context.step("transform-data", async () => { const batches = chunkArray(extracted.records, batchSize); const results = []; for (const batch of batches) { const transformed = await transformService.processBatch(batch); results.push(transformed); } return { batchCount: batches.length, results }; }); // Wait until off-peak hours (e.g., 2 AM) const now = new Date(); const targetHour = 2; const msUntilTarget = calculateMsUntilHour(now, targetHour); await context.wait({ seconds: Math.floor(msUntilTarget / 1000) }); // Stage 3: Load data to destination const loaded = await context.step("load-data", async () => { let loadedCount = 0; for (const result of transformed.results) { await destinationDatabase.loadBatch(result); loadedCount += result.length; } return { loadedCount }; }); // Stage 4: Verify and finalize const verified = await context.step("verify-pipeline", async () => { const verification = await destinationDatabase.verifyRecords(datasetId); await pipelineService.markComplete(datasetId, verification); return verification; }); return { datasetId, recordsProcessed: extracted.recordCount, batchesProcessed: transformed.batchCount, recordsLoaded: loaded.loadedCount, verified: verified.success }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution from datetime import datetime @durable_execution def lambda_handler(event, context: DurableContext): dataset_id = event['datasetId'] batch_size = event['batchSize'] # Stage 1: Extract data from source def extract_data(_): records = source_database.extract_records(dataset_id) return {'recordCount': len(records), 'records': records} extracted = context.step(extract_data, name='extract-data') # Wait 5 minutes to respect source system rate limits context.wait(300) # Stage 2: Transform data in batches def transform_data(_): batches = chunk_array(extracted['records'], batch_size) results = [] for batch in batches: transformed = transform_service.process_batch(batch) results.append(transformed) return {'batchCount': len(batches), 'results': results} transformed = context.step(transform_data, name='transform-data') # Wait until off-peak hours (e.g., 2 AM) now = datetime.now() target_hour = 2 ms_until_target = calculate_ms_until_hour(now, target_hour) context.wait(ms_until_target // 1000) # Stage 3: Load data to destination def load_data(_): loaded_count = 0 for result in transformed['results']: destination_database.load_batch(result) loaded_count += len(result) return {'loadedCount': loaded_count} loaded = context.step(load_data, name='load-data') # Stage 4: Verify and finalize def verify_pipeline(_): verification = destination_database.verify_records(dataset_id) pipeline_service.mark_complete(dataset_id, verification) return verification verified = context.step(verify_pipeline, name='verify-pipeline') return { 'datasetId': dataset_id, 'recordsProcessed': extracted['recordCount'], 'batchesProcessed': transformed['batchCount'], 'recordsLoaded': loaded['loadedCount'], 'verified': verified['success'] }

Cada estágio é dividido em uma etapa, criando um ponto de verificação que permite que o pipeline seja retomado a partir de qualquer estágio se for interrompido. A espera de 5 minutos entre a extração e a transformação respeita os limites de taxa do sistema de origem sem consumir recursos computacionais, enquanto a espera até às 2 da manhã programa a dispendiosa operação de carregamento fora do horário de pico.

Observação

A chamada new Date() e a função calculateMsUntilHour() são etapas externas e serão reexecutadas durante a reprodução. Para operações baseadas em tempo que devam ser consistentes em todas as repetições, calcule o timestamp dentro de uma etapa ou use-o somente para períodos de espera (que são controlados).

Invocações encadeadas ao longo das funções

Invoque outras funções do Lambda de dentro de uma função durável usando context.invoke(). A função de chamada é suspensa enquanto aguarda a função invocada ser concluída, criando um ponto de verificação que preserva o resultado. Se a função de chamada for interrompida após a conclusão da função invocada, ela será retomada com o resultado armazenado sem invocar novamente a função.

Use esse padrão quando você tiver funções especializadas que lidem com domínios específicos (validação de clientes, processamento de pagamentos, gerenciamento de inventário) e precisar coordená-las em um fluxo de trabalho. Cada função mantém sua própria lógica e pode ser invocada por várias funções do orquestrador, evitando a duplicação de código.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; // Main orchestrator function export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { orderId, customerId } = event; // Step 1: Validate customer by invoking customer service function const customer = await context.invoke( "validate-customer", "arn:aws:lambda:us-east-1:123456789012:function:customer-service:1", { customerId } ); if (!customer.isValid) { return { orderId, status: "rejected", reason: "invalid_customer" }; } // Step 2: Check inventory by invoking inventory service function const inventory = await context.invoke( "check-inventory", "arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1", { orderId, items: event.items } ); if (!inventory.available) { return { orderId, status: "rejected", reason: "insufficient_inventory" }; } // Step 3: Process payment by invoking payment service function const payment = await context.invoke( "process-payment", "arn:aws:lambda:us-east-1:123456789012:function:payment-service:1", { customerId, amount: inventory.totalAmount, paymentMethod: customer.paymentMethod } ); // Step 4: Create shipment by invoking fulfillment service function const shipment = await context.invoke( "create-shipment", "arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1", { orderId, items: inventory.allocatedItems, address: customer.shippingAddress } ); return { orderId, status: "completed", trackingNumber: shipment.trackingNumber, estimatedDelivery: shipment.estimatedDelivery }; } );
Python
from aws_durable_execution_sdk_python import DurableContext, durable_execution # Main orchestrator function @durable_execution def lambda_handler(event, context: DurableContext): order_id = event['orderId'] customer_id = event['customerId'] # Step 1: Validate customer by invoking customer service function customer = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:customer-service:1', {'customerId': customer_id}, name='validate-customer' ) if not customer['isValid']: return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_customer'} # Step 2: Check inventory by invoking inventory service function inventory = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1', {'orderId': order_id, 'items': event['items']}, name='check-inventory' ) if not inventory['available']: return {'orderId': order_id, 'status': 'rejected', 'reason': 'insufficient_inventory'} # Step 3: Process payment by invoking payment service function payment = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:payment-service:1', { 'customerId': customer_id, 'amount': inventory['totalAmount'], 'paymentMethod': customer['paymentMethod'] }, name='process-payment' ) # Step 4: Create shipment by invoking fulfillment service function shipment = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1', { 'orderId': order_id, 'items': inventory['allocatedItems'], 'address': customer['shippingAddress'] }, name='create-shipment' ) return { 'orderId': order_id, 'status': 'completed', 'trackingNumber': shipment['trackingNumber'], 'estimatedDelivery': shipment['estimatedDelivery'] }

Cada invocação cria um ponto de verificação na função do orquestrador. Se o orquestrador for interrompido após a conclusão da validação do cliente, ele será retomado a partir desse ponto de verificação com os dados armazenados do cliente, ignorando a invocação da validação. Isso evita chamadas duplicadas para serviços posteriores e garante uma execução consistente em todas as interrupções.

As funções invocadas podem ser funções do Lambda duráveis ou padrão. Se você invocar uma função durável, ela poderá ter seu próprio fluxo de trabalho em várias etapas com esperas e pontos de verificação. O orquestrador simplesmente espera que a execução durável completa termine, recebendo o resultado final.

nota

Não há suporte a invocações entre contas. Todas as funções invocadas devem estar na mesma conta da AWS da função de chamada.

Padrões avançados

Use funções duráveis para criar aplicações complexas de várias etapas que combinem várias operações duráveis, execução paralela, processamento de matrizes, lógica condicional e sondagem. Esses padrões permitem criar aplicações sofisticadas que coordenem muitas tarefas enquanto mantêm a tolerância a falhas e a recuperação automática.

Os padrões avançados vão além de simples etapas sequenciais. É possível executar operações simultaneamente com parallel(), processar matrizes com map(), aguardar condições externas com waitForCondition() e combinar essas primitivas para criar aplicações confiáveis. Cada operação durável cria seus próprios pontos de verificação, para que sua aplicação possa se recuperar de qualquer ponto se for interrompida.

Processos de integração de usuários

Guie os usuários durante o registro, a verificação de e-mail, a configuração do perfil e a configuração inicial com o tratamento de novas tentativas. Este exemplo combina etapas sequenciais, retornos de chamada e lógica condicional para criar um processo de integração completo.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { userId, email } = event; // Step 1: Create user account const user = await context.step("create-account", async () => { return await userService.createAccount(userId, email); }); // Step 2: Send verification email await context.step("send-verification", async () => { return await emailService.sendVerification(email); }); // Step 3: Wait for email verification (up to 48 hours) const verified = await context.waitForCallback( "email-verification", async (callbackId) => { await notificationService.sendVerificationLink({ email, callbackId, expiresIn: 172800 }); }, { timeout: { seconds: 172800 } } ); if (!verified) { await context.step("send-reminder", async () => { await emailService.sendReminder(email); }); return { status: "verification_timeout", userId, message: "Email verification not completed within 48 hours" }; } // Step 4: Initialize user profile in parallel const setupResults = await context.parallel("profile-setup", [ async (ctx: DurableContext) => { return await ctx.step("create-preferences", async () => { return await preferencesService.createDefaults(userId); }); }, async (ctx: DurableContext) => { return await ctx.step("setup-notifications", async () => { return await notificationService.setupDefaults(userId); }); }, async (ctx: DurableContext) => { return await ctx.step("create-welcome-content", async () => { return await contentService.createWelcome(userId); }); } ]); // Step 5: Send welcome email await context.step("send-welcome", async () => { const [preferences, notifications, content] = setupResults.getResults(); return await emailService.sendWelcome({ email, preferences, notifications, content }); }); return { status: "onboarding_complete", userId, completedAt: new Date().toISOString() }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig from datetime import datetime @durable_execution def lambda_handler(event, context: DurableContext): user_id = event['userId'] email = event['email'] # Step 1: Create user account user = context.step( lambda _: user_service.create_account(user_id, email), name='create-account' ) # Step 2: Send verification email context.step( lambda _: email_service.send_verification(email), name='send-verification' ) # Step 3: Wait for email verification (up to 48 hours) def send_verification_link(callback_id): notification_service.send_verification_link({ 'email': email, 'callbackId': callback_id, 'expiresIn': 172800 }) verified = context.wait_for_callback( send_verification_link, name='email-verification', config=WaitConfig(timeout=172800) ) if not verified: context.step( lambda _: email_service.send_reminder(email), name='send-reminder' ) return { 'status': 'verification_timeout', 'userId': user_id, 'message': 'Email verification not completed within 48 hours' } # Step 4: Initialize user profile in parallel def create_preferences(ctx: DurableContext): return ctx.step( lambda _: preferences_service.create_defaults(user_id), name='create-preferences' ) def setup_notifications(ctx: DurableContext): return ctx.step( lambda _: notification_service.setup_defaults(user_id), name='setup-notifications' ) def create_welcome_content(ctx: DurableContext): return ctx.step( lambda _: content_service.create_welcome(user_id), name='create-welcome-content' ) setup_results = context.parallel( [create_preferences, setup_notifications, create_welcome_content], name='profile-setup' ) # Step 5: Send welcome email def send_welcome(_): results = setup_results.get_results() preferences, notifications, content = results[0], results[1], results[2] return email_service.send_welcome({ 'email': email, 'preferences': preferences, 'notifications': notifications, 'content': content }) context.step(send_welcome, name='send-welcome') return { 'status': 'onboarding_complete', 'userId': user_id, 'completedAt': datetime.now().isoformat() }

O processo combina etapas sequenciais com pontos de verificação para a criação da conta e envio de e-mail e, em seguida, faz uma pausa de até 48 horas aguardando a verificação do e-mail sem consumir recursos. A lógica condicional manipula caminhos diferentes com base na conclusão ou no tempo limite da verificação. As tarefas de configuração do perfil são executadas simultaneamente usando operações paralelas para reduzir o tempo total de execução, e cada etapa é repetida automaticamente em caso de falhas transitórias para garantir que a integração seja concluída de forma confiável.

Processamento em lote com pontos de verificação

Processe milhões de registros com recuperação automática do último ponto de verificação com êxito após falhas. Este exemplo demonstra como funções duráveis combinam map() operações com fragmentação e limitação de taxa para lidar com o processamento de dados em grande escala.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; interface Batch { batchIndex: number; recordIds: string[]; } export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { datasetId, batchSize = 1000 } = event; // Step 1: Get all record IDs to process const recordIds = await context.step("fetch-record-ids", async () => { return await dataService.getRecordIds(datasetId); }); // Step 2: Split into batches const batches: Batch[] = []; for (let i = 0; i < recordIds.length; i += batchSize) { batches.push({ batchIndex: Math.floor(i / batchSize), recordIds: recordIds.slice(i, i + batchSize) }); } // Step 3: Process batches with controlled concurrency const batchResults = await context.map( "process-batches", batches, async (ctx: DurableContext, batch: Batch, index: number) => { const processed = await ctx.step(`batch-${batch.batchIndex}`, async () => { const results = []; for (const recordId of batch.recordIds) { const result = await recordService.process(recordId); results.push(result); } return results; }); const validated = await ctx.step(`validate-${batch.batchIndex}`, async () => { return await validationService.validateBatch(processed); }); return { batchIndex: batch.batchIndex, recordCount: batch.recordIds.length, successCount: validated.successCount, failureCount: validated.failureCount }; }, { maxConcurrency: 5 } ); // Step 4: Aggregate results const summary = await context.step("aggregate-results", async () => { const results = batchResults.getResults(); const totalSuccess = results.reduce((sum, r) => sum + r.successCount, 0); const totalFailure = results.reduce((sum, r) => sum + r.failureCount, 0); return { datasetId, totalRecords: recordIds.length, batchesProcessed: batches.length, successCount: totalSuccess, failureCount: totalFailure, completedAt: new Date().toISOString() }; }); return summary; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution, MapConfig from datetime import datetime from typing import List, Dict @durable_execution def lambda_handler(event, context: DurableContext): dataset_id = event['datasetId'] batch_size = event.get('batchSize', 1000) # Step 1: Get all record IDs to process record_ids = context.step( lambda _: data_service.get_record_ids(dataset_id), name='fetch-record-ids' ) # Step 2: Split into batches batches = [] for i in range(0, len(record_ids), batch_size): batches.append({ 'batchIndex': i // batch_size, 'recordIds': record_ids[i:i + batch_size] }) # Step 3: Process batches with controlled concurrency def process_batch(ctx: DurableContext, batch: Dict, index: int): batch_index = batch['batchIndex'] def process_records(_): results = [] for record_id in batch['recordIds']: result = record_service.process(record_id) results.append(result) return results processed = ctx.step(process_records, name=f'batch-{batch_index}') validated = ctx.step( lambda _: validation_service.validate_batch(processed), name=f'validate-{batch_index}' ) return { 'batchIndex': batch_index, 'recordCount': len(batch['recordIds']), 'successCount': validated['successCount'], 'failureCount': validated['failureCount'] } batch_results = context.map( process_batch, batches, name='process-batches', config=MapConfig(max_concurrency=5) ) # Step 4: Aggregate results def aggregate_results(_): results = batch_results.get_results() total_success = sum(r['successCount'] for r in results) total_failure = sum(r['failureCount'] for r in results) return { 'datasetId': dataset_id, 'totalRecords': len(record_ids), 'batchesProcessed': len(batches), 'successCount': total_success, 'failureCount': total_failure, 'completedAt': datetime.now().isoformat() } summary = context.step(aggregate_results, name='aggregate-results') return summary

Os registros são divididos em lotes gerenciáveis para evitar sobrecarregar a memória ou os serviços posteriores e, em seguida, vários lotes são processados simultaneamente com maxConcurrency controlando o paralelismo. Cada lote tem seu próprio ponto de verificação, portanto, as falhas apenas repetem o lote com falha, em vez de reprocessar todos os registros. Esse padrão é ideal para trabalhos de ETL, migrações de dados ou operações em massa em que o processamento pode levar horas.

Próximas etapas