예제 및 사용 사례
Lambda 지속성 함수를 사용하면 단계 및 대기와 같은 지속성 작업을 사용하여 내결함성 다단계 애플리케이션을 빌드할 수 있습니다. 자동 체크포인트 지정과 실패 후 처음부터 실행이 다시 시작되지만 완료된 체크포인트를 건너뛰는 체크포인트-재생 모델을 사용하면 함수가 실패로부터 복구되어 진행 상황 손실 없이 실행을 재개할 수 있습니다.
단기 내결함성 프로세스
지속성 함수를 사용하여 일반적으로 몇 분 내에 완료되는 안정적인 작업을 빌드합니다. 이러한 프로세스는 장기 실행 워크플로보다 짧지만 분산 시스템에서 자동 체크포인트 지정 및 내결함성의 이점을 계속해서 활용할 수 있습니다. 지속성 함수를 사용하면 복잡한 오류 처리 또는 상태 관리 코드 없이 개별 서비스 직접 호출이 실패하더라도 다단계 프로세스가 성공적으로 완료됩니다.
일반적인 시나리오로는 호텔 예약 시스템, 레스토랑 예약 플랫폼, 차량 공유 운행 요청, 이벤트 티켓 구매, SaaS 구독 업그레이드 등이 있습니다. 이러한 시나리오는 공통의 특징, 즉 함께 완료해야 하는 여러 서비스 직접 호출, 일시적 실패에 대한 자동 재시도 필요성, 분산 시스템에서 일관된 상태를 유지해야 한다는 요구 사항을 공유합니다.
마이크로서비스 간 분산 트랜잭션
실패 시 자동 롤백을 통해 여러 서비스에서 결제, 재고 및 배송을 조정합니다. 각 서비스 작업은 단계로 래핑되므로 서비스가 실패할 경우 트랜잭션이 어느 시점에서든 복구될 수 있습니다.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { orderId, amount, items } = event;
// Reserve inventory across multiple warehouses
const inventory = await context.step("reserve-inventory", async () => {
return await inventoryService.reserve(items);
});
// Process payment
const payment = await context.step("process-payment", async () => {
return await paymentService.charge(amount);
});
// Create shipment
const shipment = await context.step("create-shipment", async () => {
return await shippingService.createShipment(orderId, inventory);
});
return { orderId, status: 'completed', shipment };
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution
@durable_execution
def lambda_handler(event, context: DurableContext):
order_id = event['orderId']
amount = event['amount']
items = event['items']
# Reserve inventory across multiple warehouses
inventory = context.step(
lambda _: inventory_service.reserve(items),
name='reserve-inventory'
)
# Process payment
payment = context.step(
lambda _: payment_service.charge(amount),
name='process-payment'
)
# Create shipment
shipment = context.step(
lambda _: shipping_service.create_shipment(order_id, inventory),
name='create-shipment'
)
return {'orderId': order_id, 'status': 'completed', 'shipment': shipment}
단계가 실패하면 함수는 마지막으로 성공한 체크포인트에서 자동으로 재시도합니다. 재고 예약은 결제 처리가 일시적으로 실패하더라도 유지됩니다. 함수가 재시도되면 완료된 재고 단계를 건너뛰고 결제 처리가 바로 진행됩니다. 이를 통해 중복 예약이 제거되고 분산 시스템에서 일관된 상태가 보장됩니다.
여러 단계를 통한 주문 처리
자동 재시도와 복구를 활용하여 검증, 결제 권한 부여, 재고 할당, 이행을 통해 주문을 처리합니다. 각 단계에 체크포인트가 지정되어 개별 단계가 실패하고 재시도되더라도 순서는 진행됩니다.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { orderId, customerId, items } = event;
// Validate order details
const validation = await context.step("validate-order", async () => {
const customer = await customerService.validate(customerId);
const itemsValid = await inventoryService.validateItems(items);
return { customer, itemsValid };
});
if (!validation.itemsValid) {
return { orderId, status: 'rejected', reason: 'invalid_items' };
}
// Authorize payment
const authorization = await context.step("authorize-payment", async () => {
return await paymentService.authorize(
validation.customer.paymentMethod,
calculateTotal(items)
);
});
// Allocate inventory
const allocation = await context.step("allocate-inventory", async () => {
return await inventoryService.allocate(items);
});
// Fulfill order
const fulfillment = await context.step("fulfill-order", async () => {
return await fulfillmentService.createShipment({
orderId,
items: allocation.allocatedItems,
address: validation.customer.shippingAddress
});
});
return {
orderId,
status: 'completed',
trackingNumber: fulfillment.trackingNumber
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution
@durable_execution
def lambda_handler(event, context: DurableContext):
order_id = event['orderId']
customer_id = event['customerId']
items = event['items']
# Validate order details
def validate_order(_):
customer = customer_service.validate(customer_id)
items_valid = inventory_service.validate_items(items)
return {'customer': customer, 'itemsValid': items_valid}
validation = context.step(validate_order, name='validate-order')
if not validation['itemsValid']:
return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_items'}
# Authorize payment
authorization = context.step(
lambda _: payment_service.authorize(
validation['customer']['paymentMethod'],
calculate_total(items)
),
name='authorize-payment'
)
# Allocate inventory
allocation = context.step(
lambda _: inventory_service.allocate(items),
name='allocate-inventory'
)
# Fulfill order
fulfillment = context.step(
lambda _: fulfillment_service.create_shipment({
'orderId': order_id,
'items': allocation['allocatedItems'],
'address': validation['customer']['shippingAddress']
}),
name='fulfill-order'
)
return {
'orderId': order_id,
'status': 'completed',
'trackingNumber': fulfillment['trackingNumber']
}
이 패턴을 활용하면 주문이 중간 상태에서 멈추지 않습니다. 검증에 실패하면 결제 권한 부여 전에 주문이 거부됩니다. 결제 권한 부여에 실패하면 재고가 할당되지 않습니다. 각 단계는 이전 단계를 기반으로 하고 자동 재시도 및 복구를 활용합니다.
Note
조건부 검사 if (!validation.itemsValid)는 단계에서 벗어나며 재생 중에 다시 실행됩니다. 이는 결정론적이므로 안전합니다. 동일한 검증 객체는 항상 동일한 결과를 생성합니다.
장기 실행 프로세스
몇 시간, 며칠 또는 몇 주에 걸친 프로세스에 지속성 함수를 사용합니다. 대기 작업은 컴퓨팅 요금을 발생시키지 않고 실행을 일시 중지하므로 장기 실행 프로세스의 비용 효과성이 향상됩니다. 대기 기간 동안 함수는 실행을 중지하고 Lambda는 실행 환경을 재활용합니다. 재개할 때가 되면 Lambda는 함수를 다시 간접 호출하고 마지막 체크포인트에서 재생합니다.
이 실행 모델은 인적 결정, 외부 시스템 응답, 예약된 처리 기간 또는 시간 기반 지연을 기다리는 등 장기간 일시 중지해야 하는 프로세스에 적합한 지속성 함수를 제공합니다. 활성 컴퓨팅 시간에만 비용을 지불하고 대기 시간에는 비용을 지불하지 않습니다.
일반적인 시나리오로는 문서 승인 프로세스, 예약된 배치 처리, 며칠이 걸리는 온보딩 프로세스, 구독 평가판 프로세스, 지연 알림 시스템이 있습니다. 이러한 시나리오의 일반적인 특징으로는 몇 시간 또는 며칠 단위로 측정되는 장시간 대기, 해당 대기 시간 동안 실행 상태를 유지해야 하는 필요성, 유휴 컴퓨팅 시간에 대한 비용 지불을 금지하는 비용에 민감한 요구 사항 등이 있습니다.
휴먼 인 더 루프 승인
실행 상태를 유지하면서 문서 검토, 승인 또는 결정을 위해 실행을 일시 중지합니다. 이 함수는 리소스를 소비하지 않고 외부 콜백을 대기하며, 승인을 받으면 자동으로 재개됩니다.
이 패턴은 사람의 판단이나 외부 검증이 필요한 프로세스에 반드시 필요합니다. 함수는 콜백 지점에서 일시 중지되어 대기 중에 컴퓨팅 요금이 발생하지 않습니다. 누군가 API를 통해 결정을 제출하면 Lambda는 함수를 다시 간접 호출하고 체크포인트에서 다시 재생하여 승인 결과를 계속 진행합니다.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { documentId, reviewers } = event;
// Step 1: Prepare document for review
const prepared = await context.step("prepare-document", async () => {
return await documentService.prepare(documentId);
});
// Step 2: Request approval with callback
const approval = await context.waitForCallback(
"approval-callback",
async (callbackId) => {
await notificationService.sendApprovalRequest({
documentId,
reviewers,
callbackId,
expiresIn: 86400
});
},
{
timeout: { seconds: 86400 }
}
);
// Function resumes here when approval is received
if (approval?.approved) {
const finalized = await context.step("finalize-document", async () => {
return await documentService.finalize(documentId, approval.comments);
});
return {
status: 'approved',
documentId,
finalizedAt: finalized.timestamp
};
}
// Handle rejection
await context.step("archive-rejected", async () => {
await documentService.archive(documentId, approval?.reason);
});
return {
status: 'rejected',
documentId,
reason: approval?.reason
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig
@durable_execution
def lambda_handler(event, context: DurableContext):
document_id = event['documentId']
reviewers = event['reviewers']
# Step 1: Prepare document for review
prepared = context.step(
lambda _: document_service.prepare(document_id),
name='prepare-document'
)
# Step 2: Request approval with callback
def send_approval_request(callback_id):
notification_service.send_approval_request({
'documentId': document_id,
'reviewers': reviewers,
'callbackId': callback_id,
'expiresIn': 86400
})
approval = context.wait_for_callback(
send_approval_request,
name='approval-callback',
config=WaitConfig(timeout=86400)
)
# Function resumes here when approval is received
if approval and approval.get('approved'):
finalized = context.step(
lambda _: document_service.finalize(document_id, approval.get('comments')),
name='finalize-document'
)
return {
'status': 'approved',
'documentId': document_id,
'finalizedAt': finalized['timestamp']
}
# Handle rejection
context.step(
lambda _: document_service.archive(document_id, approval.get('reason') if approval else None),
name='archive-rejected'
)
return {
'status': 'rejected',
'documentId': document_id,
'reason': approval.get('reason') if approval else None
}
콜백이 수신되고 함수가 재개되면 처음부터 재생됩니다. prepare-document 단계에서 체크포인트가 지정된 결과를 즉시 반환합니다. 또한 waitForCallback 작업은 다시 기다리는 대신 저장된 승인 결과를 즉시 반환합니다. 이후 실행은 finalize 또는 archive 단계로 계속됩니다.
다중 스테이지 데이터 파이프라인
스테이지 간 체크포인트를 사용하는 추출, 변환 및 로드 단계를 통해 대규모 데이터세트를 처리합니다. 각 스테이지를 완료하는 데 몇 시간이 걸릴 수 있고, 체크포인트는 중단된 경우 임의의 스테이지에서 파이프라인을 재개할 수 있도록 합니다.
이 패턴은 중간에 복구 지점이 있는 스테이지에서 데이터를 처리해야 하는 ETL 워크플로, 데이터 마이그레이션 또는 배치 처리 작업에 적합합니다. 스테이지가 실패하면 파이프라인은 처음부터 다시 시작하지 않고 마지막으로 완료된 스테이지에서 다시 시작됩니다. 대기 작업을 사용하여 스테이지 간에 일시 중지할 수도 있습니다. 즉, 속도 제한을 준수하거나, 다운스트림 시스템이 준비될 때까지 기다리거나, 사용량이 적은 시간에 처리를 예약할 수 있습니다.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { datasetId, batchSize } = event;
// Stage 1: Extract data from source
const extracted = await context.step("extract-data", async () => {
const records = await sourceDatabase.extractRecords(datasetId);
return { recordCount: records.length, records };
});
// Wait 5 minutes to respect source system rate limits
await context.wait({ seconds: 300 });
// Stage 2: Transform data in batches
const transformed = await context.step("transform-data", async () => {
const batches = chunkArray(extracted.records, batchSize);
const results = [];
for (const batch of batches) {
const transformed = await transformService.processBatch(batch);
results.push(transformed);
}
return { batchCount: batches.length, results };
});
// Wait until off-peak hours (e.g., 2 AM)
const now = new Date();
const targetHour = 2;
const msUntilTarget = calculateMsUntilHour(now, targetHour);
await context.wait({ seconds: Math.floor(msUntilTarget / 1000) });
// Stage 3: Load data to destination
const loaded = await context.step("load-data", async () => {
let loadedCount = 0;
for (const result of transformed.results) {
await destinationDatabase.loadBatch(result);
loadedCount += result.length;
}
return { loadedCount };
});
// Stage 4: Verify and finalize
const verified = await context.step("verify-pipeline", async () => {
const verification = await destinationDatabase.verifyRecords(datasetId);
await pipelineService.markComplete(datasetId, verification);
return verification;
});
return {
datasetId,
recordsProcessed: extracted.recordCount,
batchesProcessed: transformed.batchCount,
recordsLoaded: loaded.loadedCount,
verified: verified.success
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution
from datetime import datetime
@durable_execution
def lambda_handler(event, context: DurableContext):
dataset_id = event['datasetId']
batch_size = event['batchSize']
# Stage 1: Extract data from source
def extract_data(_):
records = source_database.extract_records(dataset_id)
return {'recordCount': len(records), 'records': records}
extracted = context.step(extract_data, name='extract-data')
# Wait 5 minutes to respect source system rate limits
context.wait(300)
# Stage 2: Transform data in batches
def transform_data(_):
batches = chunk_array(extracted['records'], batch_size)
results = []
for batch in batches:
transformed = transform_service.process_batch(batch)
results.append(transformed)
return {'batchCount': len(batches), 'results': results}
transformed = context.step(transform_data, name='transform-data')
# Wait until off-peak hours (e.g., 2 AM)
now = datetime.now()
target_hour = 2
ms_until_target = calculate_ms_until_hour(now, target_hour)
context.wait(ms_until_target // 1000)
# Stage 3: Load data to destination
def load_data(_):
loaded_count = 0
for result in transformed['results']:
destination_database.load_batch(result)
loaded_count += len(result)
return {'loadedCount': loaded_count}
loaded = context.step(load_data, name='load-data')
# Stage 4: Verify and finalize
def verify_pipeline(_):
verification = destination_database.verify_records(dataset_id)
pipeline_service.mark_complete(dataset_id, verification)
return verification
verified = context.step(verify_pipeline, name='verify-pipeline')
return {
'datasetId': dataset_id,
'recordsProcessed': extracted['recordCount'],
'batchesProcessed': transformed['batchCount'],
'recordsLoaded': loaded['loadedCount'],
'verified': verified['success']
}
각 스테이지는 단계로 래핑되어 중단될 경우 임의의 스테이지에서 파이프라인을 재개할 수 있는 체크포인트를 생성합니다. 추출과 변환 사이의 5분 대기는 컴퓨팅 리소스를 소비하지 않으면서 소스 시스템 속도 제한을 준수하고, 오전 2시까지 대기하면 사용량이 적은 시간에 비용이 많이 드는 로드 작업이 예약됩니다.
Note
new Date() 직접 호출 및 calculateMsUntilHour() 함수는 단계를 벗어나며 재생 중에 다시 실행됩니다. 재생 간에 일관성이 있어야 하는 시간 기반 작업의 경우 단계 내의 타임스탬프를 계산하거나 대기 기간(체크포인트 지정됨)에만 사용합니다.
함수 간 연쇄 간접 호출
context.invoke()를 사용하여 지속성 함수 내에서 다른 Lambda 함수를 간접 호출합니다. 호출 함수는 간접 호출된 함수가 완료될 때까지 기다리는 동안 일시 중지되어 결과를 보존하는 체크포인트를 생성합니다. 간접 호출된 함수가 완료된 후 호출 함수가 중단되면 함수를 다시 간접 호출하지 않고 저장된 결과로 재개됩니다.
특정 도메인(고객 검증, 결제 처리, 재고 관리)을 처리하고 워크플로에서 조정해야 하는 특정 함수가 있는 경우 이 패턴을 사용합니다. 각 함수는 자체 로직을 유지하고 코드 중복을 방지하기 위해 여러 오케스트레이터 함수에 의해 간접 호출될 수 있습니다.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
// Main orchestrator function
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { orderId, customerId } = event;
// Step 1: Validate customer by invoking customer service function
const customer = await context.invoke(
"validate-customer",
"arn:aws:lambda:us-east-1:123456789012:function:customer-service:1",
{ customerId }
);
if (!customer.isValid) {
return { orderId, status: "rejected", reason: "invalid_customer" };
}
// Step 2: Check inventory by invoking inventory service function
const inventory = await context.invoke(
"check-inventory",
"arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1",
{ orderId, items: event.items }
);
if (!inventory.available) {
return { orderId, status: "rejected", reason: "insufficient_inventory" };
}
// Step 3: Process payment by invoking payment service function
const payment = await context.invoke(
"process-payment",
"arn:aws:lambda:us-east-1:123456789012:function:payment-service:1",
{
customerId,
amount: inventory.totalAmount,
paymentMethod: customer.paymentMethod
}
);
// Step 4: Create shipment by invoking fulfillment service function
const shipment = await context.invoke(
"create-shipment",
"arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1",
{
orderId,
items: inventory.allocatedItems,
address: customer.shippingAddress
}
);
return {
orderId,
status: "completed",
trackingNumber: shipment.trackingNumber,
estimatedDelivery: shipment.estimatedDelivery
};
}
);
- Python
-
from aws_durable_execution_sdk_python import DurableContext, durable_execution
# Main orchestrator function
@durable_execution
def lambda_handler(event, context: DurableContext):
order_id = event['orderId']
customer_id = event['customerId']
# Step 1: Validate customer by invoking customer service function
customer = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:customer-service:1',
{'customerId': customer_id},
name='validate-customer'
)
if not customer['isValid']:
return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_customer'}
# Step 2: Check inventory by invoking inventory service function
inventory = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1',
{'orderId': order_id, 'items': event['items']},
name='check-inventory'
)
if not inventory['available']:
return {'orderId': order_id, 'status': 'rejected', 'reason': 'insufficient_inventory'}
# Step 3: Process payment by invoking payment service function
payment = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:payment-service:1',
{
'customerId': customer_id,
'amount': inventory['totalAmount'],
'paymentMethod': customer['paymentMethod']
},
name='process-payment'
)
# Step 4: Create shipment by invoking fulfillment service function
shipment = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1',
{
'orderId': order_id,
'items': inventory['allocatedItems'],
'address': customer['shippingAddress']
},
name='create-shipment'
)
return {
'orderId': order_id,
'status': 'completed',
'trackingNumber': shipment['trackingNumber'],
'estimatedDelivery': shipment['estimatedDelivery']
}
각 간접 호출로 오케스트레이터 함수에 체크포인트가 생성됩니다. 고객 검증이 완료된 후 오케스트레이터가 중단되면 해당 체크포인트에서 저장된 고객 데이터로 재개되어 검증 간접 호출을 건너뜁니다. 이를 통해 다운스트림 서비스에 대한 중복 호출이 방지되고 중단 시에도 일관적인 실행이 보장될 수 있습니다.
간접 호출된 함수는 지속성 또는 표준 Lambda 함수일 수 있습니다. 지속성 함수를 간접 호출하면 대기 및 체크포인트가 있는 자체 다단계 워크플로가 있을 수 있습니다. 오케스트레이터는 전체 지속성 실행이 완료될 때까지 기다렸다가 최종 결과를 수신합니다.
교차 계정 간접 호출은 지원되지 않습니다. 간접 호출된 모든 함수는 직접 호출 함수와 동일한 AWS 계정에 있어야 합니다.
고급 패턴
지속성 함수를 사용하여 여러 지속성 작업, 병렬 실행, 배열 처리, 조건부 로직 및 폴링을 결합하는 복잡한 다단계 애플리케이션을 빌드합니다. 이러한 패턴을 사용하면 내결함성 및 자동 복구를 유지하면서 많은 태스크를 조정하는 정교한 애플리케이션을 빌드할 수 있습니다.
고급 패턴은 단순한 순차적 단계 이상입니다. parallel()로 동시에 작업을 실행하고, map()을 사용하여 배열을 처리하고, waitForCondition()을 사용하여 외부 조건을 기다리고, 이러한 기본 요소를 결합하여 신뢰할 수 있는 애플리케이션을 빌드할 수 있습니다. 각각의 지속성 작업은 자체 체크포인트를 생성하므로 중단된 경우 언제든지 애플리케이션을 복구할 수 있습니다.
사용자 온보딩 프로세스
재시도 처리를 통해 사용자에게 등록, 이메일 확인, 프로필, 설정, 최초 구성을 안내합니다. 이 예제에서는 순차적 단계, 콜백 및 조건부 로직을 결합하여 전체 온보딩 프로세스를 생성합니다.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { userId, email } = event;
// Step 1: Create user account
const user = await context.step("create-account", async () => {
return await userService.createAccount(userId, email);
});
// Step 2: Send verification email
await context.step("send-verification", async () => {
return await emailService.sendVerification(email);
});
// Step 3: Wait for email verification (up to 48 hours)
const verified = await context.waitForCallback(
"email-verification",
async (callbackId) => {
await notificationService.sendVerificationLink({
email,
callbackId,
expiresIn: 172800
});
},
{
timeout: { seconds: 172800 }
}
);
if (!verified) {
await context.step("send-reminder", async () => {
await emailService.sendReminder(email);
});
return {
status: "verification_timeout",
userId,
message: "Email verification not completed within 48 hours"
};
}
// Step 4: Initialize user profile in parallel
const setupResults = await context.parallel("profile-setup", [
async (ctx: DurableContext) => {
return await ctx.step("create-preferences", async () => {
return await preferencesService.createDefaults(userId);
});
},
async (ctx: DurableContext) => {
return await ctx.step("setup-notifications", async () => {
return await notificationService.setupDefaults(userId);
});
},
async (ctx: DurableContext) => {
return await ctx.step("create-welcome-content", async () => {
return await contentService.createWelcome(userId);
});
}
]);
// Step 5: Send welcome email
await context.step("send-welcome", async () => {
const [preferences, notifications, content] = setupResults.getResults();
return await emailService.sendWelcome({
email,
preferences,
notifications,
content
});
});
return {
status: "onboarding_complete",
userId,
completedAt: new Date().toISOString()
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig
from datetime import datetime
@durable_execution
def lambda_handler(event, context: DurableContext):
user_id = event['userId']
email = event['email']
# Step 1: Create user account
user = context.step(
lambda _: user_service.create_account(user_id, email),
name='create-account'
)
# Step 2: Send verification email
context.step(
lambda _: email_service.send_verification(email),
name='send-verification'
)
# Step 3: Wait for email verification (up to 48 hours)
def send_verification_link(callback_id):
notification_service.send_verification_link({
'email': email,
'callbackId': callback_id,
'expiresIn': 172800
})
verified = context.wait_for_callback(
send_verification_link,
name='email-verification',
config=WaitConfig(timeout=172800)
)
if not verified:
context.step(
lambda _: email_service.send_reminder(email),
name='send-reminder'
)
return {
'status': 'verification_timeout',
'userId': user_id,
'message': 'Email verification not completed within 48 hours'
}
# Step 4: Initialize user profile in parallel
def create_preferences(ctx: DurableContext):
return ctx.step(
lambda _: preferences_service.create_defaults(user_id),
name='create-preferences'
)
def setup_notifications(ctx: DurableContext):
return ctx.step(
lambda _: notification_service.setup_defaults(user_id),
name='setup-notifications'
)
def create_welcome_content(ctx: DurableContext):
return ctx.step(
lambda _: content_service.create_welcome(user_id),
name='create-welcome-content'
)
setup_results = context.parallel(
[create_preferences, setup_notifications, create_welcome_content],
name='profile-setup'
)
# Step 5: Send welcome email
def send_welcome(_):
results = setup_results.get_results()
preferences, notifications, content = results[0], results[1], results[2]
return email_service.send_welcome({
'email': email,
'preferences': preferences,
'notifications': notifications,
'content': content
})
context.step(send_welcome, name='send-welcome')
return {
'status': 'onboarding_complete',
'userId': user_id,
'completedAt': datetime.now().isoformat()
}
이 프로세스는 계정 생성 및 이메일 전송을 위해 체크포인트와 순차적 단계를 결합한 다음 최대 48시간 동안 일시 중지되어 리소스를 소비하지 않고 이메일 확인을 기다립니다. 조건부 로직은 확인 완료 또는 제한 시간 초과 여부에 따라 다양한 경로를 처리합니다. 프로필 설정 태스크는 병렬 작업을 사용하여 동시에 실행되어 총 실행 시간을 줄이고, 각 단계는 안정적인 온보딩을 위해 일시적 실패 시 자동으로 재시도됩니다.
체크포인트를 사용한 배치 처리
실패 후 마지막으로 성공한 체크포인트에서 자동으로 복구하여 수백만 개의 레코드를 처리합니다. 이 예제에서는 지속성 함수가 map() 작업을 청킹 및 속도 제한과 결합하여 대규모 데이터 처리를 처리하는 방법을 보여줍니다.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
interface Batch {
batchIndex: number;
recordIds: string[];
}
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { datasetId, batchSize = 1000 } = event;
// Step 1: Get all record IDs to process
const recordIds = await context.step("fetch-record-ids", async () => {
return await dataService.getRecordIds(datasetId);
});
// Step 2: Split into batches
const batches: Batch[] = [];
for (let i = 0; i < recordIds.length; i += batchSize) {
batches.push({
batchIndex: Math.floor(i / batchSize),
recordIds: recordIds.slice(i, i + batchSize)
});
}
// Step 3: Process batches with controlled concurrency
const batchResults = await context.map(
"process-batches",
batches,
async (ctx: DurableContext, batch: Batch, index: number) => {
const processed = await ctx.step(`batch-${batch.batchIndex}`, async () => {
const results = [];
for (const recordId of batch.recordIds) {
const result = await recordService.process(recordId);
results.push(result);
}
return results;
});
const validated = await ctx.step(`validate-${batch.batchIndex}`, async () => {
return await validationService.validateBatch(processed);
});
return {
batchIndex: batch.batchIndex,
recordCount: batch.recordIds.length,
successCount: validated.successCount,
failureCount: validated.failureCount
};
},
{
maxConcurrency: 5
}
);
// Step 4: Aggregate results
const summary = await context.step("aggregate-results", async () => {
const results = batchResults.getResults();
const totalSuccess = results.reduce((sum, r) => sum + r.successCount, 0);
const totalFailure = results.reduce((sum, r) => sum + r.failureCount, 0);
return {
datasetId,
totalRecords: recordIds.length,
batchesProcessed: batches.length,
successCount: totalSuccess,
failureCount: totalFailure,
completedAt: new Date().toISOString()
};
});
return summary;
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, MapConfig
from datetime import datetime
from typing import List, Dict
@durable_execution
def lambda_handler(event, context: DurableContext):
dataset_id = event['datasetId']
batch_size = event.get('batchSize', 1000)
# Step 1: Get all record IDs to process
record_ids = context.step(
lambda _: data_service.get_record_ids(dataset_id),
name='fetch-record-ids'
)
# Step 2: Split into batches
batches = []
for i in range(0, len(record_ids), batch_size):
batches.append({
'batchIndex': i // batch_size,
'recordIds': record_ids[i:i + batch_size]
})
# Step 3: Process batches with controlled concurrency
def process_batch(ctx: DurableContext, batch: Dict, index: int):
batch_index = batch['batchIndex']
def process_records(_):
results = []
for record_id in batch['recordIds']:
result = record_service.process(record_id)
results.append(result)
return results
processed = ctx.step(process_records, name=f'batch-{batch_index}')
validated = ctx.step(
lambda _: validation_service.validate_batch(processed),
name=f'validate-{batch_index}'
)
return {
'batchIndex': batch_index,
'recordCount': len(batch['recordIds']),
'successCount': validated['successCount'],
'failureCount': validated['failureCount']
}
batch_results = context.map(
process_batch,
batches,
name='process-batches',
config=MapConfig(max_concurrency=5)
)
# Step 4: Aggregate results
def aggregate_results(_):
results = batch_results.get_results()
total_success = sum(r['successCount'] for r in results)
total_failure = sum(r['failureCount'] for r in results)
return {
'datasetId': dataset_id,
'totalRecords': len(record_ids),
'batchesProcessed': len(batches),
'successCount': total_success,
'failureCount': total_failure,
'completedAt': datetime.now().isoformat()
}
summary = context.step(aggregate_results, name='aggregate-results')
return summary
레코드는 메모리 또는 다운스트림 서비스 과부하가 발생하지 않도록 관리 가능한 배치로 분할된 다음 병렬 처리를 제어하는 maxConcurrency로 동시에 여러 배치를 처리합니다. 각 배치에는 자체 체크포인트가 있으므로 실패가 발생하면 모든 레코드를 재처리하는 대신 실패한 배치만 재시도합니다. 이 패턴은 처리에 시간이 걸릴 수 있는 ETL 작업, 데이터 마이그레이션 또는 대량 작업에 적합합니다.
다음 단계
DurableContext, 단계 및 대기를 이해하기 위한 기본 개념 살펴보기
결정론적 코드 작성 및 성능 최적화 모범 사례 검토
로컬 및 클라우드에서 지속성 함수 테스트 방법 알아보기