Casos de uso e exemplos
As funções duráveis do Lambda permitem que você crie aplicações com tolerância a falhas de várias etapas usando operações duráveis, como etapas e esperas. Com o ponto de verificação automático e um modelo de reprodução de ponto de verificação, em que a execução é reiniciada do início após a falha, mas ignora os pontos de verificação concluídos, suas funções podem se recuperar das falhas e retomar a execução sem perder o progresso.
Processos com tolerância a falhas de curta duração
Use funções duráveis para criar operações confiáveis que normalmente são concluídas em minutos. Embora esses processos sejam mais curtos do que os fluxos de trabalho de longa duração, eles ainda se beneficiam da verificação automática e da tolerância a falhas em sistemas distribuídos. Funções duráveis garantem que seus processos de várias etapas sejam concluídos com êxito, mesmo quando as chamadas de serviço individuais falham, sem exigir tratamento complexo de erros ou código de gerenciamento de estado.
Os cenários comuns incluem sistemas de reserva de hotéis, plataformas de reserva de restaurantes, solicitações de viagens compartilhadas, compra de ingressos para eventos e atualizações de assinaturas de SaaS. Esses cenários compartilham características comuns: várias chamadas de serviço que devem ser concluídas juntas, a necessidade de nova tentativa automática em caso de falhas transitórias e a exigência de manter um estado consistente em todos os sistemas distribuídos.
Transações distribuídas entre microsserviços
Coordene pagamentos, inventário e envios em vários serviços com reversão automática em caso de falhas. Cada operação de serviço é agrupada em uma etapa, garantindo que a transação possa ser recuperada a qualquer momento se um serviço falhar.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { orderId, amount, items } = event;
// Reserve inventory across multiple warehouses
const inventory = await context.step("reserve-inventory", async () => {
return await inventoryService.reserve(items);
});
// Process payment
const payment = await context.step("process-payment", async () => {
return await paymentService.charge(amount);
});
// Create shipment
const shipment = await context.step("create-shipment", async () => {
return await shippingService.createShipment(orderId, inventory);
});
return { orderId, status: 'completed', shipment };
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution
@durable_execution
def lambda_handler(event, context: DurableContext):
order_id = event['orderId']
amount = event['amount']
items = event['items']
# Reserve inventory across multiple warehouses
inventory = context.step(
lambda _: inventory_service.reserve(items),
name='reserve-inventory'
)
# Process payment
payment = context.step(
lambda _: payment_service.charge(amount),
name='process-payment'
)
# Create shipment
shipment = context.step(
lambda _: shipping_service.create_shipment(order_id, inventory),
name='create-shipment'
)
return {'orderId': order_id, 'status': 'completed', 'shipment': shipment}
Se alguma etapa falhar, a função se repetirá automaticamente a partir do último ponto de verificação com êxito. A reserva de inventário persiste mesmo se o processamento do pagamento falhar temporariamente. Quando a função tenta novamente, ela pula a etapa de inventário concluída e prossegue diretamente para o processamento do pagamento. Isso elimina reservas duplicadas e garante um estado consistente em todo o sistema distribuído.
Processamento de pedidos com várias etapas
Processe pedidos por meio de validação, autorização de pagamento, alocação de estoque e atendimento com nova tentativa e recuperação automáticas. Cada etapa é verificada, garantindo que o pedido avance mesmo que as etapas individuais falhem e sejam tentadas novamente.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { orderId, customerId, items } = event;
// Validate order details
const validation = await context.step("validate-order", async () => {
const customer = await customerService.validate(customerId);
const itemsValid = await inventoryService.validateItems(items);
return { customer, itemsValid };
});
if (!validation.itemsValid) {
return { orderId, status: 'rejected', reason: 'invalid_items' };
}
// Authorize payment
const authorization = await context.step("authorize-payment", async () => {
return await paymentService.authorize(
validation.customer.paymentMethod,
calculateTotal(items)
);
});
// Allocate inventory
const allocation = await context.step("allocate-inventory", async () => {
return await inventoryService.allocate(items);
});
// Fulfill order
const fulfillment = await context.step("fulfill-order", async () => {
return await fulfillmentService.createShipment({
orderId,
items: allocation.allocatedItems,
address: validation.customer.shippingAddress
});
});
return {
orderId,
status: 'completed',
trackingNumber: fulfillment.trackingNumber
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution
@durable_execution
def lambda_handler(event, context: DurableContext):
order_id = event['orderId']
customer_id = event['customerId']
items = event['items']
# Validate order details
def validate_order(_):
customer = customer_service.validate(customer_id)
items_valid = inventory_service.validate_items(items)
return {'customer': customer, 'itemsValid': items_valid}
validation = context.step(validate_order, name='validate-order')
if not validation['itemsValid']:
return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_items'}
# Authorize payment
authorization = context.step(
lambda _: payment_service.authorize(
validation['customer']['paymentMethod'],
calculate_total(items)
),
name='authorize-payment'
)
# Allocate inventory
allocation = context.step(
lambda _: inventory_service.allocate(items),
name='allocate-inventory'
)
# Fulfill order
fulfillment = context.step(
lambda _: fulfillment_service.create_shipment({
'orderId': order_id,
'items': allocation['allocatedItems'],
'address': validation['customer']['shippingAddress']
}),
name='fulfill-order'
)
return {
'orderId': order_id,
'status': 'completed',
'trackingNumber': fulfillment['trackingNumber']
}
Esse padrão garante que os pedidos nunca fiquem presos em estados intermediários. Se a validação falhar, o pedido será rejeitado antes da autorização de pagamento. Se a autorização de pagamento falhar, o inventário não será alocado. Cada etapa se desenvolve a partir da anterior com nova tentativa e recuperação automáticas.
Observação
A verificação condicional if (!validation.itemsValid) está fora de uma etapa e será reexecutada durante a reprodução. Isso é seguro porque é determinístico: sempre produz o mesmo resultado com o mesmo objeto de validação.
Processos de longa execução
Use funções duráveis para processos que se estendem por horas, dias ou semanas. As operações de espera suspendem a execução sem incorrer em custos computacionais, tornando os processos de longa execução econômicos. Durante os períodos de espera, sua função interrompe a execução e o Lambda recicla o ambiente de execução. Quando chega a hora de retomar, o Lambda invoca sua função novamente e a reproduz a partir do último ponto de verificação.
Esse modelo de execução torna as funções duráveis ideais para processos que precisem ser pausados por longos períodos, seja aguardando decisões humanas, respostas externas do sistema, janelas de processamento programadas ou retardos baseados em tempo. Você paga somente pelo tempo de computação ativo, não pela espera.
Os cenários comuns incluem processos de aprovação de documentos, processamento em lote programado, processos de integração de vários dias, processos de teste de assinatura e sistemas de notificação atrasada. Esses cenários compartilham características comuns: períodos de espera estendidos medidos em horas ou dias, a necessidade de manter o estado de execução em todas essas esperas e requisitos econômicos em que pagar pelo tempo de computação ocioso é proibitivo.
Aprovações conm humano no loop
Pause a execução para revisões, aprovações ou decisões de documentos, mantendo o estado da execução. A função aguarda retornos de chamada externos sem consumir recursos, sendo retomada automaticamente quando a aprovação é recebida.
Esse padrão é essencial para processos que exijam julgamento humano ou validação externa. A função é suspensa no ponto de retorno de chamada, sem custos computacionais durante a espera. Quando alguém envia sua decisão via API, o Lambda invoca sua função novamente e a reproduz a partir do ponto de verificação, continuando com o resultado da aprovação.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { documentId, reviewers } = event;
// Step 1: Prepare document for review
const prepared = await context.step("prepare-document", async () => {
return await documentService.prepare(documentId);
});
// Step 2: Request approval with callback
const approval = await context.waitForCallback(
"approval-callback",
async (callbackId) => {
await notificationService.sendApprovalRequest({
documentId,
reviewers,
callbackId,
expiresIn: 86400
});
},
{
timeout: { seconds: 86400 }
}
);
// Function resumes here when approval is received
if (approval?.approved) {
const finalized = await context.step("finalize-document", async () => {
return await documentService.finalize(documentId, approval.comments);
});
return {
status: 'approved',
documentId,
finalizedAt: finalized.timestamp
};
}
// Handle rejection
await context.step("archive-rejected", async () => {
await documentService.archive(documentId, approval?.reason);
});
return {
status: 'rejected',
documentId,
reason: approval?.reason
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig
@durable_execution
def lambda_handler(event, context: DurableContext):
document_id = event['documentId']
reviewers = event['reviewers']
# Step 1: Prepare document for review
prepared = context.step(
lambda _: document_service.prepare(document_id),
name='prepare-document'
)
# Step 2: Request approval with callback
def send_approval_request(callback_id):
notification_service.send_approval_request({
'documentId': document_id,
'reviewers': reviewers,
'callbackId': callback_id,
'expiresIn': 86400
})
approval = context.wait_for_callback(
send_approval_request,
name='approval-callback',
config=WaitConfig(timeout=86400)
)
# Function resumes here when approval is received
if approval and approval.get('approved'):
finalized = context.step(
lambda _: document_service.finalize(document_id, approval.get('comments')),
name='finalize-document'
)
return {
'status': 'approved',
'documentId': document_id,
'finalizedAt': finalized['timestamp']
}
# Handle rejection
context.step(
lambda _: document_service.archive(document_id, approval.get('reason') if approval else None),
name='archive-rejected'
)
return {
'status': 'rejected',
'documentId': document_id,
'reason': approval.get('reason') if approval else None
}
Quando o retorno de chamada é recebido e sua função é retomada, ela é reproduzida desde o início. A etapa de preparação do documento retorna instantaneamente o resultado de verificação. A operação waitForCallback também retorna instantaneamente com o resultado da aprovação armazenado, em vez de esperar novamente. A execução então continua até as etapas de finalização ou arquivamento.
Pipelines de dados de vários estágios
Processe grandes conjuntos de dados por meio de fases de extração, transformação e carregamento com pontos de verificação entre os estágios. Cada estágio pode levar horas para ser concluído, e os pontos de verificação garantem que o pipeline possa ser retomado a partir de qualquer estágio se for interrompido.
Esse padrão é ideal para fluxos de trabalho de ETL, migrações de dados ou trabalhos de processamento em lote em que você precise processar dados em estágios com pontos de recuperação entre eles. Se um estágio falhar, o pipeline é retomado a partir do último estágio concluído, em vez de ser reiniciado do início. Também é possível usar as operações de espera para pausar entre os estágios, respeitando os limites de taxa, esperando que os sistemas posteriores estejam prontos ou programando o processamento fora do horário de pico.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { datasetId, batchSize } = event;
// Stage 1: Extract data from source
const extracted = await context.step("extract-data", async () => {
const records = await sourceDatabase.extractRecords(datasetId);
return { recordCount: records.length, records };
});
// Wait 5 minutes to respect source system rate limits
await context.wait({ seconds: 300 });
// Stage 2: Transform data in batches
const transformed = await context.step("transform-data", async () => {
const batches = chunkArray(extracted.records, batchSize);
const results = [];
for (const batch of batches) {
const transformed = await transformService.processBatch(batch);
results.push(transformed);
}
return { batchCount: batches.length, results };
});
// Wait until off-peak hours (e.g., 2 AM)
const now = new Date();
const targetHour = 2;
const msUntilTarget = calculateMsUntilHour(now, targetHour);
await context.wait({ seconds: Math.floor(msUntilTarget / 1000) });
// Stage 3: Load data to destination
const loaded = await context.step("load-data", async () => {
let loadedCount = 0;
for (const result of transformed.results) {
await destinationDatabase.loadBatch(result);
loadedCount += result.length;
}
return { loadedCount };
});
// Stage 4: Verify and finalize
const verified = await context.step("verify-pipeline", async () => {
const verification = await destinationDatabase.verifyRecords(datasetId);
await pipelineService.markComplete(datasetId, verification);
return verification;
});
return {
datasetId,
recordsProcessed: extracted.recordCount,
batchesProcessed: transformed.batchCount,
recordsLoaded: loaded.loadedCount,
verified: verified.success
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution
from datetime import datetime
@durable_execution
def lambda_handler(event, context: DurableContext):
dataset_id = event['datasetId']
batch_size = event['batchSize']
# Stage 1: Extract data from source
def extract_data(_):
records = source_database.extract_records(dataset_id)
return {'recordCount': len(records), 'records': records}
extracted = context.step(extract_data, name='extract-data')
# Wait 5 minutes to respect source system rate limits
context.wait(300)
# Stage 2: Transform data in batches
def transform_data(_):
batches = chunk_array(extracted['records'], batch_size)
results = []
for batch in batches:
transformed = transform_service.process_batch(batch)
results.append(transformed)
return {'batchCount': len(batches), 'results': results}
transformed = context.step(transform_data, name='transform-data')
# Wait until off-peak hours (e.g., 2 AM)
now = datetime.now()
target_hour = 2
ms_until_target = calculate_ms_until_hour(now, target_hour)
context.wait(ms_until_target // 1000)
# Stage 3: Load data to destination
def load_data(_):
loaded_count = 0
for result in transformed['results']:
destination_database.load_batch(result)
loaded_count += len(result)
return {'loadedCount': loaded_count}
loaded = context.step(load_data, name='load-data')
# Stage 4: Verify and finalize
def verify_pipeline(_):
verification = destination_database.verify_records(dataset_id)
pipeline_service.mark_complete(dataset_id, verification)
return verification
verified = context.step(verify_pipeline, name='verify-pipeline')
return {
'datasetId': dataset_id,
'recordsProcessed': extracted['recordCount'],
'batchesProcessed': transformed['batchCount'],
'recordsLoaded': loaded['loadedCount'],
'verified': verified['success']
}
Cada estágio é dividido em uma etapa, criando um ponto de verificação que permite que o pipeline seja retomado a partir de qualquer estágio se for interrompido. A espera de 5 minutos entre a extração e a transformação respeita os limites de taxa do sistema de origem sem consumir recursos computacionais, enquanto a espera até às 2 da manhã programa a dispendiosa operação de carregamento fora do horário de pico.
Observação
A chamada new Date() e a função calculateMsUntilHour() são etapas externas e serão reexecutadas durante a reprodução. Para operações baseadas em tempo que devam ser consistentes em todas as repetições, calcule o timestamp dentro de uma etapa ou use-o somente para períodos de espera (que são controlados).
Invocações encadeadas ao longo das funções
Invoque outras funções do Lambda de dentro de uma função durável usando context.invoke(). A função de chamada é suspensa enquanto aguarda a função invocada ser concluída, criando um ponto de verificação que preserva o resultado. Se a função de chamada for interrompida após a conclusão da função invocada, ela será retomada com o resultado armazenado sem invocar novamente a função.
Use esse padrão quando você tiver funções especializadas que lidem com domínios específicos (validação de clientes, processamento de pagamentos, gerenciamento de inventário) e precisar coordená-las em um fluxo de trabalho. Cada função mantém sua própria lógica e pode ser invocada por várias funções do orquestrador, evitando a duplicação de código.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
// Main orchestrator function
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { orderId, customerId } = event;
// Step 1: Validate customer by invoking customer service function
const customer = await context.invoke(
"validate-customer",
"arn:aws:lambda:us-east-1:123456789012:function:customer-service:1",
{ customerId }
);
if (!customer.isValid) {
return { orderId, status: "rejected", reason: "invalid_customer" };
}
// Step 2: Check inventory by invoking inventory service function
const inventory = await context.invoke(
"check-inventory",
"arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1",
{ orderId, items: event.items }
);
if (!inventory.available) {
return { orderId, status: "rejected", reason: "insufficient_inventory" };
}
// Step 3: Process payment by invoking payment service function
const payment = await context.invoke(
"process-payment",
"arn:aws:lambda:us-east-1:123456789012:function:payment-service:1",
{
customerId,
amount: inventory.totalAmount,
paymentMethod: customer.paymentMethod
}
);
// Step 4: Create shipment by invoking fulfillment service function
const shipment = await context.invoke(
"create-shipment",
"arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1",
{
orderId,
items: inventory.allocatedItems,
address: customer.shippingAddress
}
);
return {
orderId,
status: "completed",
trackingNumber: shipment.trackingNumber,
estimatedDelivery: shipment.estimatedDelivery
};
}
);
- Python
-
from aws_durable_execution_sdk_python import DurableContext, durable_execution
# Main orchestrator function
@durable_execution
def lambda_handler(event, context: DurableContext):
order_id = event['orderId']
customer_id = event['customerId']
# Step 1: Validate customer by invoking customer service function
customer = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:customer-service:1',
{'customerId': customer_id},
name='validate-customer'
)
if not customer['isValid']:
return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_customer'}
# Step 2: Check inventory by invoking inventory service function
inventory = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1',
{'orderId': order_id, 'items': event['items']},
name='check-inventory'
)
if not inventory['available']:
return {'orderId': order_id, 'status': 'rejected', 'reason': 'insufficient_inventory'}
# Step 3: Process payment by invoking payment service function
payment = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:payment-service:1',
{
'customerId': customer_id,
'amount': inventory['totalAmount'],
'paymentMethod': customer['paymentMethod']
},
name='process-payment'
)
# Step 4: Create shipment by invoking fulfillment service function
shipment = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1',
{
'orderId': order_id,
'items': inventory['allocatedItems'],
'address': customer['shippingAddress']
},
name='create-shipment'
)
return {
'orderId': order_id,
'status': 'completed',
'trackingNumber': shipment['trackingNumber'],
'estimatedDelivery': shipment['estimatedDelivery']
}
Cada invocação cria um ponto de verificação na função do orquestrador. Se o orquestrador for interrompido após a conclusão da validação do cliente, ele será retomado a partir desse ponto de verificação com os dados armazenados do cliente, ignorando a invocação da validação. Isso evita chamadas duplicadas para serviços posteriores e garante uma execução consistente em todas as interrupções.
As funções invocadas podem ser funções do Lambda duráveis ou padrão. Se você invocar uma função durável, ela poderá ter seu próprio fluxo de trabalho em várias etapas com esperas e pontos de verificação. O orquestrador simplesmente espera que a execução durável completa termine, recebendo o resultado final.
Não há suporte a invocações entre contas. Todas as funções invocadas devem estar na mesma conta da AWS da função de chamada.
Padrões avançados
Use funções duráveis para criar aplicações complexas de várias etapas que combinem várias operações duráveis, execução paralela, processamento de matrizes, lógica condicional e sondagem. Esses padrões permitem criar aplicações sofisticadas que coordenem muitas tarefas enquanto mantêm a tolerância a falhas e a recuperação automática.
Os padrões avançados vão além de simples etapas sequenciais. É possível executar operações simultaneamente com parallel(), processar matrizes com map(), aguardar condições externas com waitForCondition() e combinar essas primitivas para criar aplicações confiáveis. Cada operação durável cria seus próprios pontos de verificação, para que sua aplicação possa se recuperar de qualquer ponto se for interrompida.
Processos de integração de usuários
Guie os usuários durante o registro, a verificação de e-mail, a configuração do perfil e a configuração inicial com o tratamento de novas tentativas. Este exemplo combina etapas sequenciais, retornos de chamada e lógica condicional para criar um processo de integração completo.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { userId, email } = event;
// Step 1: Create user account
const user = await context.step("create-account", async () => {
return await userService.createAccount(userId, email);
});
// Step 2: Send verification email
await context.step("send-verification", async () => {
return await emailService.sendVerification(email);
});
// Step 3: Wait for email verification (up to 48 hours)
const verified = await context.waitForCallback(
"email-verification",
async (callbackId) => {
await notificationService.sendVerificationLink({
email,
callbackId,
expiresIn: 172800
});
},
{
timeout: { seconds: 172800 }
}
);
if (!verified) {
await context.step("send-reminder", async () => {
await emailService.sendReminder(email);
});
return {
status: "verification_timeout",
userId,
message: "Email verification not completed within 48 hours"
};
}
// Step 4: Initialize user profile in parallel
const setupResults = await context.parallel("profile-setup", [
async (ctx: DurableContext) => {
return await ctx.step("create-preferences", async () => {
return await preferencesService.createDefaults(userId);
});
},
async (ctx: DurableContext) => {
return await ctx.step("setup-notifications", async () => {
return await notificationService.setupDefaults(userId);
});
},
async (ctx: DurableContext) => {
return await ctx.step("create-welcome-content", async () => {
return await contentService.createWelcome(userId);
});
}
]);
// Step 5: Send welcome email
await context.step("send-welcome", async () => {
const [preferences, notifications, content] = setupResults.getResults();
return await emailService.sendWelcome({
email,
preferences,
notifications,
content
});
});
return {
status: "onboarding_complete",
userId,
completedAt: new Date().toISOString()
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig
from datetime import datetime
@durable_execution
def lambda_handler(event, context: DurableContext):
user_id = event['userId']
email = event['email']
# Step 1: Create user account
user = context.step(
lambda _: user_service.create_account(user_id, email),
name='create-account'
)
# Step 2: Send verification email
context.step(
lambda _: email_service.send_verification(email),
name='send-verification'
)
# Step 3: Wait for email verification (up to 48 hours)
def send_verification_link(callback_id):
notification_service.send_verification_link({
'email': email,
'callbackId': callback_id,
'expiresIn': 172800
})
verified = context.wait_for_callback(
send_verification_link,
name='email-verification',
config=WaitConfig(timeout=172800)
)
if not verified:
context.step(
lambda _: email_service.send_reminder(email),
name='send-reminder'
)
return {
'status': 'verification_timeout',
'userId': user_id,
'message': 'Email verification not completed within 48 hours'
}
# Step 4: Initialize user profile in parallel
def create_preferences(ctx: DurableContext):
return ctx.step(
lambda _: preferences_service.create_defaults(user_id),
name='create-preferences'
)
def setup_notifications(ctx: DurableContext):
return ctx.step(
lambda _: notification_service.setup_defaults(user_id),
name='setup-notifications'
)
def create_welcome_content(ctx: DurableContext):
return ctx.step(
lambda _: content_service.create_welcome(user_id),
name='create-welcome-content'
)
setup_results = context.parallel(
[create_preferences, setup_notifications, create_welcome_content],
name='profile-setup'
)
# Step 5: Send welcome email
def send_welcome(_):
results = setup_results.get_results()
preferences, notifications, content = results[0], results[1], results[2]
return email_service.send_welcome({
'email': email,
'preferences': preferences,
'notifications': notifications,
'content': content
})
context.step(send_welcome, name='send-welcome')
return {
'status': 'onboarding_complete',
'userId': user_id,
'completedAt': datetime.now().isoformat()
}
O processo combina etapas sequenciais com pontos de verificação para a criação da conta e envio de e-mail e, em seguida, faz uma pausa de até 48 horas aguardando a verificação do e-mail sem consumir recursos. A lógica condicional manipula caminhos diferentes com base na conclusão ou no tempo limite da verificação. As tarefas de configuração do perfil são executadas simultaneamente usando operações paralelas para reduzir o tempo total de execução, e cada etapa é repetida automaticamente em caso de falhas transitórias para garantir que a integração seja concluída de forma confiável.
Processamento em lote com pontos de verificação
Processe milhões de registros com recuperação automática do último ponto de verificação com êxito após falhas. Este exemplo demonstra como funções duráveis combinam map() operações com fragmentação e limitação de taxa para lidar com o processamento de dados em grande escala.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
interface Batch {
batchIndex: number;
recordIds: string[];
}
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { datasetId, batchSize = 1000 } = event;
// Step 1: Get all record IDs to process
const recordIds = await context.step("fetch-record-ids", async () => {
return await dataService.getRecordIds(datasetId);
});
// Step 2: Split into batches
const batches: Batch[] = [];
for (let i = 0; i < recordIds.length; i += batchSize) {
batches.push({
batchIndex: Math.floor(i / batchSize),
recordIds: recordIds.slice(i, i + batchSize)
});
}
// Step 3: Process batches with controlled concurrency
const batchResults = await context.map(
"process-batches",
batches,
async (ctx: DurableContext, batch: Batch, index: number) => {
const processed = await ctx.step(`batch-${batch.batchIndex}`, async () => {
const results = [];
for (const recordId of batch.recordIds) {
const result = await recordService.process(recordId);
results.push(result);
}
return results;
});
const validated = await ctx.step(`validate-${batch.batchIndex}`, async () => {
return await validationService.validateBatch(processed);
});
return {
batchIndex: batch.batchIndex,
recordCount: batch.recordIds.length,
successCount: validated.successCount,
failureCount: validated.failureCount
};
},
{
maxConcurrency: 5
}
);
// Step 4: Aggregate results
const summary = await context.step("aggregate-results", async () => {
const results = batchResults.getResults();
const totalSuccess = results.reduce((sum, r) => sum + r.successCount, 0);
const totalFailure = results.reduce((sum, r) => sum + r.failureCount, 0);
return {
datasetId,
totalRecords: recordIds.length,
batchesProcessed: batches.length,
successCount: totalSuccess,
failureCount: totalFailure,
completedAt: new Date().toISOString()
};
});
return summary;
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, MapConfig
from datetime import datetime
from typing import List, Dict
@durable_execution
def lambda_handler(event, context: DurableContext):
dataset_id = event['datasetId']
batch_size = event.get('batchSize', 1000)
# Step 1: Get all record IDs to process
record_ids = context.step(
lambda _: data_service.get_record_ids(dataset_id),
name='fetch-record-ids'
)
# Step 2: Split into batches
batches = []
for i in range(0, len(record_ids), batch_size):
batches.append({
'batchIndex': i // batch_size,
'recordIds': record_ids[i:i + batch_size]
})
# Step 3: Process batches with controlled concurrency
def process_batch(ctx: DurableContext, batch: Dict, index: int):
batch_index = batch['batchIndex']
def process_records(_):
results = []
for record_id in batch['recordIds']:
result = record_service.process(record_id)
results.append(result)
return results
processed = ctx.step(process_records, name=f'batch-{batch_index}')
validated = ctx.step(
lambda _: validation_service.validate_batch(processed),
name=f'validate-{batch_index}'
)
return {
'batchIndex': batch_index,
'recordCount': len(batch['recordIds']),
'successCount': validated['successCount'],
'failureCount': validated['failureCount']
}
batch_results = context.map(
process_batch,
batches,
name='process-batches',
config=MapConfig(max_concurrency=5)
)
# Step 4: Aggregate results
def aggregate_results(_):
results = batch_results.get_results()
total_success = sum(r['successCount'] for r in results)
total_failure = sum(r['failureCount'] for r in results)
return {
'datasetId': dataset_id,
'totalRecords': len(record_ids),
'batchesProcessed': len(batches),
'successCount': total_success,
'failureCount': total_failure,
'completedAt': datetime.now().isoformat()
}
summary = context.step(aggregate_results, name='aggregate-results')
return summary
Os registros são divididos em lotes gerenciáveis para evitar sobrecarregar a memória ou os serviços posteriores e, em seguida, vários lotes são processados simultaneamente com maxConcurrency controlando o paralelismo. Cada lote tem seu próprio ponto de verificação, portanto, as falhas apenas repetem o lote com falha, em vez de reprocessar todos os registros. Esse padrão é ideal para trabalhos de ETL, migrações de dados ou operações em massa em que o processamento pode levar horas.
Próximas etapas