Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Contoh dan kasus penggunaan
Fungsi Lambda yang tahan lama memungkinkan Anda membangun aplikasi multi-langkah yang toleran terhadap kesalahan menggunakan operasi yang tahan lama seperti langkah dan menunggu. Dengan checkpointing otomatis dan model checkpoint-replay. Di mana eksekusi dimulai ulang dari awal setelah kegagalan tetapi melewatkan pos pemeriksaan yang selesai, fungsi Anda dapat pulih dari kegagalan dan melanjutkan eksekusi tanpa kehilangan kemajuan.
Proses toleran kesalahan berumur pendek
Gunakan fungsi yang tahan lama untuk membangun operasi andal yang biasanya selesai dalam hitungan menit. Meskipun proses ini lebih pendek dari alur kerja yang berjalan lama, mereka masih mendapat manfaat dari pemeriksaan otomatis dan toleransi kesalahan di seluruh sistem terdistribusi. Fungsi tahan lama memastikan proses multi-langkah Anda berhasil diselesaikan bahkan ketika panggilan layanan individu gagal, tanpa memerlukan penanganan kesalahan yang rumit atau kode manajemen status.
Skenario umum termasuk sistem pemesanan hotel, platform reservasi restoran, permintaan perjalanan berbagi perjalanan, pembelian tiket acara, dan peningkatan langganan SaaS. Skenario ini memiliki karakteristik umum: beberapa panggilan layanan yang harus diselesaikan bersama, kebutuhan untuk mencoba ulang otomatis pada kegagalan sementara, dan persyaratan untuk mempertahankan status konsisten di seluruh sistem terdistribusi.
Transaksi terdistribusi di seluruh layanan mikro
Mengkoordinasikan pembayaran, inventaris, dan pengiriman di beberapa layanan dengan rollback otomatis pada kegagalan. Setiap operasi layanan dibungkus dalam satu langkah, memastikan transaksi dapat pulih dari titik mana pun jika layanan gagal.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { orderId, amount, items } = event;
// Reserve inventory across multiple warehouses
const inventory = await context.step("reserve-inventory", async () => {
return await inventoryService.reserve(items);
});
// Process payment
const payment = await context.step("process-payment", async () => {
return await paymentService.charge(amount);
});
// Create shipment
const shipment = await context.step("create-shipment", async () => {
return await shippingService.createShipment(orderId, inventory);
});
return { orderId, status: 'completed', shipment };
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution
@durable_execution
def lambda_handler(event, context: DurableContext):
order_id = event['orderId']
amount = event['amount']
items = event['items']
# Reserve inventory across multiple warehouses
inventory = context.step(
lambda _: inventory_service.reserve(items),
name='reserve-inventory'
)
# Process payment
payment = context.step(
lambda _: payment_service.charge(amount),
name='process-payment'
)
# Create shipment
shipment = context.step(
lambda _: shipping_service.create_shipment(order_id, inventory),
name='create-shipment'
)
return {'orderId': order_id, 'status': 'completed', 'shipment': shipment}
Jika ada langkah yang gagal, fungsi secara otomatis mencoba ulang dari pos pemeriksaan terakhir yang berhasil. Reservasi inventaris tetap ada meskipun pemrosesan pembayaran gagal sementara. Ketika fungsi mencoba lagi, ia melewatkan langkah inventaris yang telah selesai dan langsung melanjutkan ke pemrosesan pembayaran. Ini menghilangkan reservasi duplikat dan memastikan status konsisten di seluruh sistem terdistribusi Anda.
Pemrosesan pesanan dengan beberapa langkah
Memproses pesanan melalui validasi, otorisasi pembayaran, alokasi inventaris, dan pemenuhan dengan percobaan ulang dan pemulihan otomatis. Setiap langkah diperiksa, memastikan pesanan berlanjut bahkan jika langkah individu gagal dan coba lagi.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { orderId, customerId, items } = event;
// Validate order details
const validation = await context.step("validate-order", async () => {
const customer = await customerService.validate(customerId);
const itemsValid = await inventoryService.validateItems(items);
return { customer, itemsValid };
});
if (!validation.itemsValid) {
return { orderId, status: 'rejected', reason: 'invalid_items' };
}
// Authorize payment
const authorization = await context.step("authorize-payment", async () => {
return await paymentService.authorize(
validation.customer.paymentMethod,
calculateTotal(items)
);
});
// Allocate inventory
const allocation = await context.step("allocate-inventory", async () => {
return await inventoryService.allocate(items);
});
// Fulfill order
const fulfillment = await context.step("fulfill-order", async () => {
return await fulfillmentService.createShipment({
orderId,
items: allocation.allocatedItems,
address: validation.customer.shippingAddress
});
});
return {
orderId,
status: 'completed',
trackingNumber: fulfillment.trackingNumber
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution
@durable_execution
def lambda_handler(event, context: DurableContext):
order_id = event['orderId']
customer_id = event['customerId']
items = event['items']
# Validate order details
def validate_order(_):
customer = customer_service.validate(customer_id)
items_valid = inventory_service.validate_items(items)
return {'customer': customer, 'itemsValid': items_valid}
validation = context.step(validate_order, name='validate-order')
if not validation['itemsValid']:
return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_items'}
# Authorize payment
authorization = context.step(
lambda _: payment_service.authorize(
validation['customer']['paymentMethod'],
calculate_total(items)
),
name='authorize-payment'
)
# Allocate inventory
allocation = context.step(
lambda _: inventory_service.allocate(items),
name='allocate-inventory'
)
# Fulfill order
fulfillment = context.step(
lambda _: fulfillment_service.create_shipment({
'orderId': order_id,
'items': allocation['allocatedItems'],
'address': validation['customer']['shippingAddress']
}),
name='fulfill-order'
)
return {
'orderId': order_id,
'status': 'completed',
'trackingNumber': fulfillment['trackingNumber']
}
Pola ini memastikan pesanan tidak pernah terjebak dalam keadaan menengah. Jika validasi gagal, pesanan ditolak sebelum otorisasi pembayaran. Jika otorisasi pembayaran gagal, inventaris tidak dialokasikan. Setiap langkah dibangun di atas yang sebelumnya dengan percobaan ulang dan pemulihan otomatis.
Catatan
Pemeriksaan bersyarat if (!validation.itemsValid) berada di luar langkah dan akan dijalankan kembali selama pemutaran ulang. Ini aman karena deterministik—selalu menghasilkan hasil yang sama dengan objek validasi yang sama.
Proses yang berjalan lama
Gunakan fungsi tahan lama untuk proses yang berlangsung berjam-jam, berhari-hari, atau berminggu-minggu. Operasi tunggu menangguhkan eksekusi tanpa menimbulkan biaya komputasi, membuat proses yang berjalan lama menjadi hemat biaya. Selama periode tunggu, fungsi Anda berhenti berjalan dan Lambda mendaur ulang lingkungan eksekusi. Ketika tiba waktunya untuk melanjutkan, Lambda memanggil fungsi Anda lagi dan memutar ulang dari pos pemeriksaan terakhir.
Model eksekusi ini membuat fungsi tahan lama ideal untuk proses yang perlu dijeda untuk waktu yang lama, baik menunggu keputusan manusia, respons sistem eksternal, jendela pemrosesan terjadwal, atau penundaan berbasis waktu. Anda hanya membayar untuk waktu komputasi aktif, bukan untuk menunggu.
Skenario umum termasuk proses persetujuan dokumen, pemrosesan batch terjadwal, proses orientasi multi-hari, proses uji coba berlangganan, dan sistem notifikasi tertunda. Skenario ini memiliki karakteristik umum: periode tunggu yang diperpanjang yang diukur dalam jam atau hari, kebutuhan untuk mempertahankan status eksekusi di seluruh penantian tersebut, dan persyaratan yang sensitif terhadap biaya di mana membayar waktu komputasi idle sangat mahal.
Human-in-the-loop persetujuan
Jeda eksekusi untuk tinjauan dokumen, persetujuan, atau keputusan sambil mempertahankan status eksekusi. Fungsi menunggu callback eksternal tanpa menghabiskan sumber daya, dilanjutkan secara otomatis ketika persetujuan diterima.
Pola ini penting untuk proses yang membutuhkan penilaian manusia atau validasi eksternal. Fungsi ditangguhkan pada titik panggilan balik, tidak menimbulkan biaya komputasi saat menunggu. Ketika seseorang mengirimkan keputusan mereka melalui API, Lambda memanggil fungsi Anda lagi dan memutar ulang dari pos pemeriksaan, melanjutkan dengan hasil persetujuan.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { documentId, reviewers } = event;
// Step 1: Prepare document for review
const prepared = await context.step("prepare-document", async () => {
return await documentService.prepare(documentId);
});
// Step 2: Request approval with callback
const approval = await context.waitForCallback(
"approval-callback",
async (callbackId) => {
await notificationService.sendApprovalRequest({
documentId,
reviewers,
callbackId,
expiresIn: 86400
});
},
{
timeout: { seconds: 86400 }
}
);
// Function resumes here when approval is received
if (approval?.approved) {
const finalized = await context.step("finalize-document", async () => {
return await documentService.finalize(documentId, approval.comments);
});
return {
status: 'approved',
documentId,
finalizedAt: finalized.timestamp
};
}
// Handle rejection
await context.step("archive-rejected", async () => {
await documentService.archive(documentId, approval?.reason);
});
return {
status: 'rejected',
documentId,
reason: approval?.reason
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig
@durable_execution
def lambda_handler(event, context: DurableContext):
document_id = event['documentId']
reviewers = event['reviewers']
# Step 1: Prepare document for review
prepared = context.step(
lambda _: document_service.prepare(document_id),
name='prepare-document'
)
# Step 2: Request approval with callback
def send_approval_request(callback_id):
notification_service.send_approval_request({
'documentId': document_id,
'reviewers': reviewers,
'callbackId': callback_id,
'expiresIn': 86400
})
approval = context.wait_for_callback(
send_approval_request,
name='approval-callback',
config=WaitConfig(timeout=86400)
)
# Function resumes here when approval is received
if approval and approval.get('approved'):
finalized = context.step(
lambda _: document_service.finalize(document_id, approval.get('comments')),
name='finalize-document'
)
return {
'status': 'approved',
'documentId': document_id,
'finalizedAt': finalized['timestamp']
}
# Handle rejection
context.step(
lambda _: document_service.archive(document_id, approval.get('reason') if approval else None),
name='archive-rejected'
)
return {
'status': 'rejected',
'documentId': document_id,
'reason': approval.get('reason') if approval else None
}
Ketika callback diterima dan fungsi Anda dilanjutkan, itu akan diputar ulang dari awal. Langkah persiapan-dokumen mengembalikan hasil checkpoint-nya secara instan. waitForCallback Operasi juga kembali secara instan dengan hasil persetujuan yang disimpan alih-alih menunggu lagi. Eksekusi kemudian dilanjutkan ke langkah finalisasi atau arsip.
Pipa data multi-tahap
Memproses kumpulan data besar melalui fase ekstraksi, transformasi, dan pemuatan dengan pos pemeriksaan antar tahapan. Setiap tahap dapat memakan waktu berjam-jam untuk diselesaikan, dan pos pemeriksaan memastikan pipa dapat dilanjutkan dari tahap apa pun jika terputus.
Pola ini sangat ideal untuk alur kerja ETL, migrasi data, atau pekerjaan pemrosesan batch di mana Anda perlu memproses data secara bertahap dengan titik pemulihan di antaranya. Jika tahap gagal, pipa dilanjutkan dari tahap selesai terakhir daripada memulai kembali dari awal. Anda juga dapat menggunakan operasi tunggu untuk berhenti di antara tahapan; menghormati batas tarif, menunggu sistem hilir siap, atau menjadwalkan pemrosesan selama jam-jam sibuk.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { datasetId, batchSize } = event;
// Stage 1: Extract data from source
const extracted = await context.step("extract-data", async () => {
const records = await sourceDatabase.extractRecords(datasetId);
return { recordCount: records.length, records };
});
// Wait 5 minutes to respect source system rate limits
await context.wait({ seconds: 300 });
// Stage 2: Transform data in batches
const transformed = await context.step("transform-data", async () => {
const batches = chunkArray(extracted.records, batchSize);
const results = [];
for (const batch of batches) {
const transformed = await transformService.processBatch(batch);
results.push(transformed);
}
return { batchCount: batches.length, results };
});
// Wait until off-peak hours (e.g., 2 AM)
const now = new Date();
const targetHour = 2;
const msUntilTarget = calculateMsUntilHour(now, targetHour);
await context.wait({ seconds: Math.floor(msUntilTarget / 1000) });
// Stage 3: Load data to destination
const loaded = await context.step("load-data", async () => {
let loadedCount = 0;
for (const result of transformed.results) {
await destinationDatabase.loadBatch(result);
loadedCount += result.length;
}
return { loadedCount };
});
// Stage 4: Verify and finalize
const verified = await context.step("verify-pipeline", async () => {
const verification = await destinationDatabase.verifyRecords(datasetId);
await pipelineService.markComplete(datasetId, verification);
return verification;
});
return {
datasetId,
recordsProcessed: extracted.recordCount,
batchesProcessed: transformed.batchCount,
recordsLoaded: loaded.loadedCount,
verified: verified.success
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution
from datetime import datetime
@durable_execution
def lambda_handler(event, context: DurableContext):
dataset_id = event['datasetId']
batch_size = event['batchSize']
# Stage 1: Extract data from source
def extract_data(_):
records = source_database.extract_records(dataset_id)
return {'recordCount': len(records), 'records': records}
extracted = context.step(extract_data, name='extract-data')
# Wait 5 minutes to respect source system rate limits
context.wait(300)
# Stage 2: Transform data in batches
def transform_data(_):
batches = chunk_array(extracted['records'], batch_size)
results = []
for batch in batches:
transformed = transform_service.process_batch(batch)
results.append(transformed)
return {'batchCount': len(batches), 'results': results}
transformed = context.step(transform_data, name='transform-data')
# Wait until off-peak hours (e.g., 2 AM)
now = datetime.now()
target_hour = 2
ms_until_target = calculate_ms_until_hour(now, target_hour)
context.wait(ms_until_target // 1000)
# Stage 3: Load data to destination
def load_data(_):
loaded_count = 0
for result in transformed['results']:
destination_database.load_batch(result)
loaded_count += len(result)
return {'loadedCount': loaded_count}
loaded = context.step(load_data, name='load-data')
# Stage 4: Verify and finalize
def verify_pipeline(_):
verification = destination_database.verify_records(dataset_id)
pipeline_service.mark_complete(dataset_id, verification)
return verification
verified = context.step(verify_pipeline, name='verify-pipeline')
return {
'datasetId': dataset_id,
'recordsProcessed': extracted['recordCount'],
'batchesProcessed': transformed['batchCount'],
'recordsLoaded': loaded['loadedCount'],
'verified': verified['success']
}
Setiap tahap dibungkus dalam satu langkah, menciptakan pos pemeriksaan yang memungkinkan pipa untuk melanjutkan dari tahap apa pun jika terputus. Penantian 5 menit antara ekstrak dan transformasi menghormati batas laju sistem sumber tanpa mengkonsumsi sumber daya komputasi, sementara menunggu hingga jam 2 pagi menjadwalkan operasi beban yang mahal selama jam-jam di luar sibuk.
Catatan
new Date()Panggilan dan calculateMsUntilHour() fungsi berada di luar langkah dan akan dijalankan kembali selama pemutaran ulang. Untuk operasi berbasis waktu yang harus konsisten di seluruh tayangan ulang, hitung stempel waktu di dalam langkah atau gunakan hanya untuk durasi tunggu (yang diperiksa).
Pemanggilan berantai di seluruh fungsi
Memanggil fungsi Lambda lainnya dari dalam fungsi tahan lama menggunakan. context.invoke() Fungsi pemanggilan ditangguhkan sambil menunggu fungsi yang dipanggil selesai, membuat pos pemeriksaan yang mempertahankan hasilnya. Jika fungsi pemanggilan terputus setelah fungsi yang dipanggil selesai, fungsi tersebut dilanjutkan dengan hasil yang disimpan tanpa memanggil kembali fungsi tersebut.
Gunakan pola ini ketika Anda memiliki fungsi khusus yang menangani domain tertentu (validasi pelanggan, pemrosesan pembayaran, manajemen inventaris) dan perlu mengoordinasikannya dalam alur kerja. Setiap fungsi mempertahankan logikanya sendiri dan dapat dipanggil oleh beberapa fungsi orkestrator, menghindari duplikasi kode.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
// Main orchestrator function
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { orderId, customerId } = event;
// Step 1: Validate customer by invoking customer service function
const customer = await context.invoke(
"validate-customer",
"arn:aws:lambda:us-east-1:123456789012:function:customer-service:1",
{ customerId }
);
if (!customer.isValid) {
return { orderId, status: "rejected", reason: "invalid_customer" };
}
// Step 2: Check inventory by invoking inventory service function
const inventory = await context.invoke(
"check-inventory",
"arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1",
{ orderId, items: event.items }
);
if (!inventory.available) {
return { orderId, status: "rejected", reason: "insufficient_inventory" };
}
// Step 3: Process payment by invoking payment service function
const payment = await context.invoke(
"process-payment",
"arn:aws:lambda:us-east-1:123456789012:function:payment-service:1",
{
customerId,
amount: inventory.totalAmount,
paymentMethod: customer.paymentMethod
}
);
// Step 4: Create shipment by invoking fulfillment service function
const shipment = await context.invoke(
"create-shipment",
"arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1",
{
orderId,
items: inventory.allocatedItems,
address: customer.shippingAddress
}
);
return {
orderId,
status: "completed",
trackingNumber: shipment.trackingNumber,
estimatedDelivery: shipment.estimatedDelivery
};
}
);
- Python
-
from aws_durable_execution_sdk_python import DurableContext, durable_execution
# Main orchestrator function
@durable_execution
def lambda_handler(event, context: DurableContext):
order_id = event['orderId']
customer_id = event['customerId']
# Step 1: Validate customer by invoking customer service function
customer = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:customer-service:1',
{'customerId': customer_id},
name='validate-customer'
)
if not customer['isValid']:
return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_customer'}
# Step 2: Check inventory by invoking inventory service function
inventory = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1',
{'orderId': order_id, 'items': event['items']},
name='check-inventory'
)
if not inventory['available']:
return {'orderId': order_id, 'status': 'rejected', 'reason': 'insufficient_inventory'}
# Step 3: Process payment by invoking payment service function
payment = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:payment-service:1',
{
'customerId': customer_id,
'amount': inventory['totalAmount'],
'paymentMethod': customer['paymentMethod']
},
name='process-payment'
)
# Step 4: Create shipment by invoking fulfillment service function
shipment = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1',
{
'orderId': order_id,
'items': inventory['allocatedItems'],
'address': customer['shippingAddress']
},
name='create-shipment'
)
return {
'orderId': order_id,
'status': 'completed',
'trackingNumber': shipment['trackingNumber'],
'estimatedDelivery': shipment['estimatedDelivery']
}
Setiap pemanggilan membuat pos pemeriksaan dalam fungsi orkestrator. Jika orkestrator terganggu setelah validasi pelanggan selesai, ia melanjutkan dari pos pemeriksaan itu dengan data pelanggan yang disimpan, melewatkan pemanggilan validasi. Ini mencegah panggilan duplikat ke layanan hilir dan memastikan eksekusi yang konsisten di seluruh interupsi.
Fungsi yang dipanggil dapat berupa fungsi Lambda yang tahan lama atau standar. Jika Anda menjalankan fungsi yang tahan lama, ia dapat memiliki alur kerja multi-langkah sendiri dengan menunggu dan pos pemeriksaan. Orkestrator hanya menunggu eksekusi yang tahan lama selesai, menerima hasil akhir.
Pemanggilan lintas akun tidak didukung. Semua fungsi yang dipanggil harus berada di AWS akun yang sama dengan fungsi pemanggilan.
Pola lanjutan
Gunakan fungsi tahan lama untuk membangun aplikasi multi-langkah kompleks yang menggabungkan beberapa operasi tahan lama, eksekusi paralel, pemrosesan array, logika bersyarat, dan polling. Pola ini memungkinkan Anda membangun aplikasi canggih yang mengoordinasikan banyak tugas sambil mempertahankan toleransi kesalahan dan pemulihan otomatis.
Pola lanjutan melampaui langkah-langkah berurutan sederhana. Anda dapat menjalankan operasi secara bersamaan denganparallel(), memproses array denganmap(), menunggu kondisi eksternal denganwaitForCondition(), dan menggabungkan primitif ini untuk membangun aplikasi yang andal. Setiap operasi yang tahan lama membuat pos pemeriksaannya sendiri, sehingga aplikasi Anda dapat pulih dari titik mana pun jika terputus.
Proses orientasi pengguna
Memandu pengguna melalui pendaftaran, verifikasi email, pengaturan profil, dan konfigurasi awal dengan penanganan coba lagi. Contoh ini menggabungkan langkah-langkah berurutan, callback, dan logika bersyarat untuk membuat proses orientasi yang lengkap.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { userId, email } = event;
// Step 1: Create user account
const user = await context.step("create-account", async () => {
return await userService.createAccount(userId, email);
});
// Step 2: Send verification email
await context.step("send-verification", async () => {
return await emailService.sendVerification(email);
});
// Step 3: Wait for email verification (up to 48 hours)
const verified = await context.waitForCallback(
"email-verification",
async (callbackId) => {
await notificationService.sendVerificationLink({
email,
callbackId,
expiresIn: 172800
});
},
{
timeout: { seconds: 172800 }
}
);
if (!verified) {
await context.step("send-reminder", async () => {
await emailService.sendReminder(email);
});
return {
status: "verification_timeout",
userId,
message: "Email verification not completed within 48 hours"
};
}
// Step 4: Initialize user profile in parallel
const setupResults = await context.parallel("profile-setup", [
async (ctx: DurableContext) => {
return await ctx.step("create-preferences", async () => {
return await preferencesService.createDefaults(userId);
});
},
async (ctx: DurableContext) => {
return await ctx.step("setup-notifications", async () => {
return await notificationService.setupDefaults(userId);
});
},
async (ctx: DurableContext) => {
return await ctx.step("create-welcome-content", async () => {
return await contentService.createWelcome(userId);
});
}
]);
// Step 5: Send welcome email
await context.step("send-welcome", async () => {
const [preferences, notifications, content] = setupResults.getResults();
return await emailService.sendWelcome({
email,
preferences,
notifications,
content
});
});
return {
status: "onboarding_complete",
userId,
completedAt: new Date().toISOString()
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig
from datetime import datetime
@durable_execution
def lambda_handler(event, context: DurableContext):
user_id = event['userId']
email = event['email']
# Step 1: Create user account
user = context.step(
lambda _: user_service.create_account(user_id, email),
name='create-account'
)
# Step 2: Send verification email
context.step(
lambda _: email_service.send_verification(email),
name='send-verification'
)
# Step 3: Wait for email verification (up to 48 hours)
def send_verification_link(callback_id):
notification_service.send_verification_link({
'email': email,
'callbackId': callback_id,
'expiresIn': 172800
})
verified = context.wait_for_callback(
send_verification_link,
name='email-verification',
config=WaitConfig(timeout=172800)
)
if not verified:
context.step(
lambda _: email_service.send_reminder(email),
name='send-reminder'
)
return {
'status': 'verification_timeout',
'userId': user_id,
'message': 'Email verification not completed within 48 hours'
}
# Step 4: Initialize user profile in parallel
def create_preferences(ctx: DurableContext):
return ctx.step(
lambda _: preferences_service.create_defaults(user_id),
name='create-preferences'
)
def setup_notifications(ctx: DurableContext):
return ctx.step(
lambda _: notification_service.setup_defaults(user_id),
name='setup-notifications'
)
def create_welcome_content(ctx: DurableContext):
return ctx.step(
lambda _: content_service.create_welcome(user_id),
name='create-welcome-content'
)
setup_results = context.parallel(
[create_preferences, setup_notifications, create_welcome_content],
name='profile-setup'
)
# Step 5: Send welcome email
def send_welcome(_):
results = setup_results.get_results()
preferences, notifications, content = results[0], results[1], results[2]
return email_service.send_welcome({
'email': email,
'preferences': preferences,
'notifications': notifications,
'content': content
})
context.step(send_welcome, name='send-welcome')
return {
'status': 'onboarding_complete',
'userId': user_id,
'completedAt': datetime.now().isoformat()
}
Proses ini menggabungkan langkah-langkah berurutan dengan pos pemeriksaan untuk pembuatan akun dan pengiriman email, kemudian berhenti hingga 48 jam menunggu verifikasi email tanpa menghabiskan sumber daya. Logika bersyarat menangani jalur yang berbeda berdasarkan apakah verifikasi selesai atau habis waktu. Tugas penyiapan profil berjalan secara bersamaan menggunakan operasi paralel untuk mengurangi total waktu eksekusi, dan setiap langkah mencoba ulang secara otomatis pada kegagalan sementara untuk memastikan orientasi selesai dengan andal.
Pemrosesan batch dengan pos pemeriksaan
Memproses jutaan catatan dengan pemulihan otomatis dari pos pemeriksaan terakhir yang berhasil setelah kegagalan. Contoh ini menunjukkan bagaimana fungsi tahan lama menggabungkan map() operasi dengan chunking dan pembatasan laju untuk menangani pemrosesan data skala besar.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
interface Batch {
batchIndex: number;
recordIds: string[];
}
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { datasetId, batchSize = 1000 } = event;
// Step 1: Get all record IDs to process
const recordIds = await context.step("fetch-record-ids", async () => {
return await dataService.getRecordIds(datasetId);
});
// Step 2: Split into batches
const batches: Batch[] = [];
for (let i = 0; i < recordIds.length; i += batchSize) {
batches.push({
batchIndex: Math.floor(i / batchSize),
recordIds: recordIds.slice(i, i + batchSize)
});
}
// Step 3: Process batches with controlled concurrency
const batchResults = await context.map(
"process-batches",
batches,
async (ctx: DurableContext, batch: Batch, index: number) => {
const processed = await ctx.step(`batch-${batch.batchIndex}`, async () => {
const results = [];
for (const recordId of batch.recordIds) {
const result = await recordService.process(recordId);
results.push(result);
}
return results;
});
const validated = await ctx.step(`validate-${batch.batchIndex}`, async () => {
return await validationService.validateBatch(processed);
});
return {
batchIndex: batch.batchIndex,
recordCount: batch.recordIds.length,
successCount: validated.successCount,
failureCount: validated.failureCount
};
},
{
maxConcurrency: 5
}
);
// Step 4: Aggregate results
const summary = await context.step("aggregate-results", async () => {
const results = batchResults.getResults();
const totalSuccess = results.reduce((sum, r) => sum + r.successCount, 0);
const totalFailure = results.reduce((sum, r) => sum + r.failureCount, 0);
return {
datasetId,
totalRecords: recordIds.length,
batchesProcessed: batches.length,
successCount: totalSuccess,
failureCount: totalFailure,
completedAt: new Date().toISOString()
};
});
return summary;
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, MapConfig
from datetime import datetime
from typing import List, Dict
@durable_execution
def lambda_handler(event, context: DurableContext):
dataset_id = event['datasetId']
batch_size = event.get('batchSize', 1000)
# Step 1: Get all record IDs to process
record_ids = context.step(
lambda _: data_service.get_record_ids(dataset_id),
name='fetch-record-ids'
)
# Step 2: Split into batches
batches = []
for i in range(0, len(record_ids), batch_size):
batches.append({
'batchIndex': i // batch_size,
'recordIds': record_ids[i:i + batch_size]
})
# Step 3: Process batches with controlled concurrency
def process_batch(ctx: DurableContext, batch: Dict, index: int):
batch_index = batch['batchIndex']
def process_records(_):
results = []
for record_id in batch['recordIds']:
result = record_service.process(record_id)
results.append(result)
return results
processed = ctx.step(process_records, name=f'batch-{batch_index}')
validated = ctx.step(
lambda _: validation_service.validate_batch(processed),
name=f'validate-{batch_index}'
)
return {
'batchIndex': batch_index,
'recordCount': len(batch['recordIds']),
'successCount': validated['successCount'],
'failureCount': validated['failureCount']
}
batch_results = context.map(
process_batch,
batches,
name='process-batches',
config=MapConfig(max_concurrency=5)
)
# Step 4: Aggregate results
def aggregate_results(_):
results = batch_results.get_results()
total_success = sum(r['successCount'] for r in results)
total_failure = sum(r['failureCount'] for r in results)
return {
'datasetId': dataset_id,
'totalRecords': len(record_ids),
'batchesProcessed': len(batches),
'successCount': total_success,
'failureCount': total_failure,
'completedAt': datetime.now().isoformat()
}
summary = context.step(aggregate_results, name='aggregate-results')
return summary
Catatan dibagi menjadi batch yang dapat dikelola untuk menghindari memori yang berlebihan atau layanan hilir, kemudian beberapa batch diproses secara bersamaan dengan mengendalikan paralelisme. maxConcurrency Setiap batch memiliki pos pemeriksaannya sendiri, jadi kegagalan hanya mencoba lagi batch yang gagal daripada memproses ulang semua catatan. Pola ini sangat ideal untuk pekerjaan ETL, migrasi data, atau operasi massal di mana pemrosesan dapat memakan waktu berjam-jam.
Langkah selanjutnya