Contoh dan kasus penggunaan - AWS Lambda

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Contoh dan kasus penggunaan

Fungsi Lambda yang tahan lama memungkinkan Anda membangun aplikasi multi-langkah yang toleran terhadap kesalahan menggunakan operasi yang tahan lama seperti langkah dan menunggu. Dengan checkpointing otomatis dan model checkpoint-replay. Di mana eksekusi dimulai ulang dari awal setelah kegagalan tetapi melewatkan pos pemeriksaan yang selesai, fungsi Anda dapat pulih dari kegagalan dan melanjutkan eksekusi tanpa kehilangan kemajuan.

Proses toleran kesalahan berumur pendek

Gunakan fungsi yang tahan lama untuk membangun operasi andal yang biasanya selesai dalam hitungan menit. Meskipun proses ini lebih pendek dari alur kerja yang berjalan lama, mereka masih mendapat manfaat dari pemeriksaan otomatis dan toleransi kesalahan di seluruh sistem terdistribusi. Fungsi tahan lama memastikan proses multi-langkah Anda berhasil diselesaikan bahkan ketika panggilan layanan individu gagal, tanpa memerlukan penanganan kesalahan yang rumit atau kode manajemen status.

Skenario umum termasuk sistem pemesanan hotel, platform reservasi restoran, permintaan perjalanan berbagi perjalanan, pembelian tiket acara, dan peningkatan langganan SaaS. Skenario ini memiliki karakteristik umum: beberapa panggilan layanan yang harus diselesaikan bersama, kebutuhan untuk mencoba ulang otomatis pada kegagalan sementara, dan persyaratan untuk mempertahankan status konsisten di seluruh sistem terdistribusi.

Transaksi terdistribusi di seluruh layanan mikro

Mengkoordinasikan pembayaran, inventaris, dan pengiriman di beberapa layanan dengan rollback otomatis pada kegagalan. Setiap operasi layanan dibungkus dalam satu langkah, memastikan transaksi dapat pulih dari titik mana pun jika layanan gagal.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { orderId, amount, items } = event; // Reserve inventory across multiple warehouses const inventory = await context.step("reserve-inventory", async () => { return await inventoryService.reserve(items); }); // Process payment const payment = await context.step("process-payment", async () => { return await paymentService.charge(amount); }); // Create shipment const shipment = await context.step("create-shipment", async () => { return await shippingService.createShipment(orderId, inventory); }); return { orderId, status: 'completed', shipment }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution @durable_execution def lambda_handler(event, context: DurableContext): order_id = event['orderId'] amount = event['amount'] items = event['items'] # Reserve inventory across multiple warehouses inventory = context.step( lambda _: inventory_service.reserve(items), name='reserve-inventory' ) # Process payment payment = context.step( lambda _: payment_service.charge(amount), name='process-payment' ) # Create shipment shipment = context.step( lambda _: shipping_service.create_shipment(order_id, inventory), name='create-shipment' ) return {'orderId': order_id, 'status': 'completed', 'shipment': shipment}

Jika ada langkah yang gagal, fungsi secara otomatis mencoba ulang dari pos pemeriksaan terakhir yang berhasil. Reservasi inventaris tetap ada meskipun pemrosesan pembayaran gagal sementara. Ketika fungsi mencoba lagi, ia melewatkan langkah inventaris yang telah selesai dan langsung melanjutkan ke pemrosesan pembayaran. Ini menghilangkan reservasi duplikat dan memastikan status konsisten di seluruh sistem terdistribusi Anda.

Pemrosesan pesanan dengan beberapa langkah

Memproses pesanan melalui validasi, otorisasi pembayaran, alokasi inventaris, dan pemenuhan dengan percobaan ulang dan pemulihan otomatis. Setiap langkah diperiksa, memastikan pesanan berlanjut bahkan jika langkah individu gagal dan coba lagi.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { orderId, customerId, items } = event; // Validate order details const validation = await context.step("validate-order", async () => { const customer = await customerService.validate(customerId); const itemsValid = await inventoryService.validateItems(items); return { customer, itemsValid }; }); if (!validation.itemsValid) { return { orderId, status: 'rejected', reason: 'invalid_items' }; } // Authorize payment const authorization = await context.step("authorize-payment", async () => { return await paymentService.authorize( validation.customer.paymentMethod, calculateTotal(items) ); }); // Allocate inventory const allocation = await context.step("allocate-inventory", async () => { return await inventoryService.allocate(items); }); // Fulfill order const fulfillment = await context.step("fulfill-order", async () => { return await fulfillmentService.createShipment({ orderId, items: allocation.allocatedItems, address: validation.customer.shippingAddress }); }); return { orderId, status: 'completed', trackingNumber: fulfillment.trackingNumber }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution @durable_execution def lambda_handler(event, context: DurableContext): order_id = event['orderId'] customer_id = event['customerId'] items = event['items'] # Validate order details def validate_order(_): customer = customer_service.validate(customer_id) items_valid = inventory_service.validate_items(items) return {'customer': customer, 'itemsValid': items_valid} validation = context.step(validate_order, name='validate-order') if not validation['itemsValid']: return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_items'} # Authorize payment authorization = context.step( lambda _: payment_service.authorize( validation['customer']['paymentMethod'], calculate_total(items) ), name='authorize-payment' ) # Allocate inventory allocation = context.step( lambda _: inventory_service.allocate(items), name='allocate-inventory' ) # Fulfill order fulfillment = context.step( lambda _: fulfillment_service.create_shipment({ 'orderId': order_id, 'items': allocation['allocatedItems'], 'address': validation['customer']['shippingAddress'] }), name='fulfill-order' ) return { 'orderId': order_id, 'status': 'completed', 'trackingNumber': fulfillment['trackingNumber'] }

Pola ini memastikan pesanan tidak pernah terjebak dalam keadaan menengah. Jika validasi gagal, pesanan ditolak sebelum otorisasi pembayaran. Jika otorisasi pembayaran gagal, inventaris tidak dialokasikan. Setiap langkah dibangun di atas yang sebelumnya dengan percobaan ulang dan pemulihan otomatis.

Catatan

Pemeriksaan bersyarat if (!validation.itemsValid) berada di luar langkah dan akan dijalankan kembali selama pemutaran ulang. Ini aman karena deterministik—selalu menghasilkan hasil yang sama dengan objek validasi yang sama.

Proses yang berjalan lama

Gunakan fungsi tahan lama untuk proses yang berlangsung berjam-jam, berhari-hari, atau berminggu-minggu. Operasi tunggu menangguhkan eksekusi tanpa menimbulkan biaya komputasi, membuat proses yang berjalan lama menjadi hemat biaya. Selama periode tunggu, fungsi Anda berhenti berjalan dan Lambda mendaur ulang lingkungan eksekusi. Ketika tiba waktunya untuk melanjutkan, Lambda memanggil fungsi Anda lagi dan memutar ulang dari pos pemeriksaan terakhir.

Model eksekusi ini membuat fungsi tahan lama ideal untuk proses yang perlu dijeda untuk waktu yang lama, baik menunggu keputusan manusia, respons sistem eksternal, jendela pemrosesan terjadwal, atau penundaan berbasis waktu. Anda hanya membayar untuk waktu komputasi aktif, bukan untuk menunggu.

Skenario umum termasuk proses persetujuan dokumen, pemrosesan batch terjadwal, proses orientasi multi-hari, proses uji coba berlangganan, dan sistem notifikasi tertunda. Skenario ini memiliki karakteristik umum: periode tunggu yang diperpanjang yang diukur dalam jam atau hari, kebutuhan untuk mempertahankan status eksekusi di seluruh penantian tersebut, dan persyaratan yang sensitif terhadap biaya di mana membayar waktu komputasi idle sangat mahal.

Human-in-the-loop persetujuan

Jeda eksekusi untuk tinjauan dokumen, persetujuan, atau keputusan sambil mempertahankan status eksekusi. Fungsi menunggu callback eksternal tanpa menghabiskan sumber daya, dilanjutkan secara otomatis ketika persetujuan diterima.

Pola ini penting untuk proses yang membutuhkan penilaian manusia atau validasi eksternal. Fungsi ditangguhkan pada titik panggilan balik, tidak menimbulkan biaya komputasi saat menunggu. Ketika seseorang mengirimkan keputusan mereka melalui API, Lambda memanggil fungsi Anda lagi dan memutar ulang dari pos pemeriksaan, melanjutkan dengan hasil persetujuan.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { documentId, reviewers } = event; // Step 1: Prepare document for review const prepared = await context.step("prepare-document", async () => { return await documentService.prepare(documentId); }); // Step 2: Request approval with callback const approval = await context.waitForCallback( "approval-callback", async (callbackId) => { await notificationService.sendApprovalRequest({ documentId, reviewers, callbackId, expiresIn: 86400 }); }, { timeout: { seconds: 86400 } } ); // Function resumes here when approval is received if (approval?.approved) { const finalized = await context.step("finalize-document", async () => { return await documentService.finalize(documentId, approval.comments); }); return { status: 'approved', documentId, finalizedAt: finalized.timestamp }; } // Handle rejection await context.step("archive-rejected", async () => { await documentService.archive(documentId, approval?.reason); }); return { status: 'rejected', documentId, reason: approval?.reason }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig @durable_execution def lambda_handler(event, context: DurableContext): document_id = event['documentId'] reviewers = event['reviewers'] # Step 1: Prepare document for review prepared = context.step( lambda _: document_service.prepare(document_id), name='prepare-document' ) # Step 2: Request approval with callback def send_approval_request(callback_id): notification_service.send_approval_request({ 'documentId': document_id, 'reviewers': reviewers, 'callbackId': callback_id, 'expiresIn': 86400 }) approval = context.wait_for_callback( send_approval_request, name='approval-callback', config=WaitConfig(timeout=86400) ) # Function resumes here when approval is received if approval and approval.get('approved'): finalized = context.step( lambda _: document_service.finalize(document_id, approval.get('comments')), name='finalize-document' ) return { 'status': 'approved', 'documentId': document_id, 'finalizedAt': finalized['timestamp'] } # Handle rejection context.step( lambda _: document_service.archive(document_id, approval.get('reason') if approval else None), name='archive-rejected' ) return { 'status': 'rejected', 'documentId': document_id, 'reason': approval.get('reason') if approval else None }

Ketika callback diterima dan fungsi Anda dilanjutkan, itu akan diputar ulang dari awal. Langkah persiapan-dokumen mengembalikan hasil checkpoint-nya secara instan. waitForCallback Operasi juga kembali secara instan dengan hasil persetujuan yang disimpan alih-alih menunggu lagi. Eksekusi kemudian dilanjutkan ke langkah finalisasi atau arsip.

Pipa data multi-tahap

Memproses kumpulan data besar melalui fase ekstraksi, transformasi, dan pemuatan dengan pos pemeriksaan antar tahapan. Setiap tahap dapat memakan waktu berjam-jam untuk diselesaikan, dan pos pemeriksaan memastikan pipa dapat dilanjutkan dari tahap apa pun jika terputus.

Pola ini sangat ideal untuk alur kerja ETL, migrasi data, atau pekerjaan pemrosesan batch di mana Anda perlu memproses data secara bertahap dengan titik pemulihan di antaranya. Jika tahap gagal, pipa dilanjutkan dari tahap selesai terakhir daripada memulai kembali dari awal. Anda juga dapat menggunakan operasi tunggu untuk berhenti di antara tahapan; menghormati batas tarif, menunggu sistem hilir siap, atau menjadwalkan pemrosesan selama jam-jam sibuk.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { datasetId, batchSize } = event; // Stage 1: Extract data from source const extracted = await context.step("extract-data", async () => { const records = await sourceDatabase.extractRecords(datasetId); return { recordCount: records.length, records }; }); // Wait 5 minutes to respect source system rate limits await context.wait({ seconds: 300 }); // Stage 2: Transform data in batches const transformed = await context.step("transform-data", async () => { const batches = chunkArray(extracted.records, batchSize); const results = []; for (const batch of batches) { const transformed = await transformService.processBatch(batch); results.push(transformed); } return { batchCount: batches.length, results }; }); // Wait until off-peak hours (e.g., 2 AM) const now = new Date(); const targetHour = 2; const msUntilTarget = calculateMsUntilHour(now, targetHour); await context.wait({ seconds: Math.floor(msUntilTarget / 1000) }); // Stage 3: Load data to destination const loaded = await context.step("load-data", async () => { let loadedCount = 0; for (const result of transformed.results) { await destinationDatabase.loadBatch(result); loadedCount += result.length; } return { loadedCount }; }); // Stage 4: Verify and finalize const verified = await context.step("verify-pipeline", async () => { const verification = await destinationDatabase.verifyRecords(datasetId); await pipelineService.markComplete(datasetId, verification); return verification; }); return { datasetId, recordsProcessed: extracted.recordCount, batchesProcessed: transformed.batchCount, recordsLoaded: loaded.loadedCount, verified: verified.success }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution from datetime import datetime @durable_execution def lambda_handler(event, context: DurableContext): dataset_id = event['datasetId'] batch_size = event['batchSize'] # Stage 1: Extract data from source def extract_data(_): records = source_database.extract_records(dataset_id) return {'recordCount': len(records), 'records': records} extracted = context.step(extract_data, name='extract-data') # Wait 5 minutes to respect source system rate limits context.wait(300) # Stage 2: Transform data in batches def transform_data(_): batches = chunk_array(extracted['records'], batch_size) results = [] for batch in batches: transformed = transform_service.process_batch(batch) results.append(transformed) return {'batchCount': len(batches), 'results': results} transformed = context.step(transform_data, name='transform-data') # Wait until off-peak hours (e.g., 2 AM) now = datetime.now() target_hour = 2 ms_until_target = calculate_ms_until_hour(now, target_hour) context.wait(ms_until_target // 1000) # Stage 3: Load data to destination def load_data(_): loaded_count = 0 for result in transformed['results']: destination_database.load_batch(result) loaded_count += len(result) return {'loadedCount': loaded_count} loaded = context.step(load_data, name='load-data') # Stage 4: Verify and finalize def verify_pipeline(_): verification = destination_database.verify_records(dataset_id) pipeline_service.mark_complete(dataset_id, verification) return verification verified = context.step(verify_pipeline, name='verify-pipeline') return { 'datasetId': dataset_id, 'recordsProcessed': extracted['recordCount'], 'batchesProcessed': transformed['batchCount'], 'recordsLoaded': loaded['loadedCount'], 'verified': verified['success'] }

Setiap tahap dibungkus dalam satu langkah, menciptakan pos pemeriksaan yang memungkinkan pipa untuk melanjutkan dari tahap apa pun jika terputus. Penantian 5 menit antara ekstrak dan transformasi menghormati batas laju sistem sumber tanpa mengkonsumsi sumber daya komputasi, sementara menunggu hingga jam 2 pagi menjadwalkan operasi beban yang mahal selama jam-jam di luar sibuk.

Catatan

new Date()Panggilan dan calculateMsUntilHour() fungsi berada di luar langkah dan akan dijalankan kembali selama pemutaran ulang. Untuk operasi berbasis waktu yang harus konsisten di seluruh tayangan ulang, hitung stempel waktu di dalam langkah atau gunakan hanya untuk durasi tunggu (yang diperiksa).

Pemanggilan berantai di seluruh fungsi

Memanggil fungsi Lambda lainnya dari dalam fungsi tahan lama menggunakan. context.invoke() Fungsi pemanggilan ditangguhkan sambil menunggu fungsi yang dipanggil selesai, membuat pos pemeriksaan yang mempertahankan hasilnya. Jika fungsi pemanggilan terputus setelah fungsi yang dipanggil selesai, fungsi tersebut dilanjutkan dengan hasil yang disimpan tanpa memanggil kembali fungsi tersebut.

Gunakan pola ini ketika Anda memiliki fungsi khusus yang menangani domain tertentu (validasi pelanggan, pemrosesan pembayaran, manajemen inventaris) dan perlu mengoordinasikannya dalam alur kerja. Setiap fungsi mempertahankan logikanya sendiri dan dapat dipanggil oleh beberapa fungsi orkestrator, menghindari duplikasi kode.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; // Main orchestrator function export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { orderId, customerId } = event; // Step 1: Validate customer by invoking customer service function const customer = await context.invoke( "validate-customer", "arn:aws:lambda:us-east-1:123456789012:function:customer-service:1", { customerId } ); if (!customer.isValid) { return { orderId, status: "rejected", reason: "invalid_customer" }; } // Step 2: Check inventory by invoking inventory service function const inventory = await context.invoke( "check-inventory", "arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1", { orderId, items: event.items } ); if (!inventory.available) { return { orderId, status: "rejected", reason: "insufficient_inventory" }; } // Step 3: Process payment by invoking payment service function const payment = await context.invoke( "process-payment", "arn:aws:lambda:us-east-1:123456789012:function:payment-service:1", { customerId, amount: inventory.totalAmount, paymentMethod: customer.paymentMethod } ); // Step 4: Create shipment by invoking fulfillment service function const shipment = await context.invoke( "create-shipment", "arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1", { orderId, items: inventory.allocatedItems, address: customer.shippingAddress } ); return { orderId, status: "completed", trackingNumber: shipment.trackingNumber, estimatedDelivery: shipment.estimatedDelivery }; } );
Python
from aws_durable_execution_sdk_python import DurableContext, durable_execution # Main orchestrator function @durable_execution def lambda_handler(event, context: DurableContext): order_id = event['orderId'] customer_id = event['customerId'] # Step 1: Validate customer by invoking customer service function customer = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:customer-service:1', {'customerId': customer_id}, name='validate-customer' ) if not customer['isValid']: return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_customer'} # Step 2: Check inventory by invoking inventory service function inventory = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1', {'orderId': order_id, 'items': event['items']}, name='check-inventory' ) if not inventory['available']: return {'orderId': order_id, 'status': 'rejected', 'reason': 'insufficient_inventory'} # Step 3: Process payment by invoking payment service function payment = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:payment-service:1', { 'customerId': customer_id, 'amount': inventory['totalAmount'], 'paymentMethod': customer['paymentMethod'] }, name='process-payment' ) # Step 4: Create shipment by invoking fulfillment service function shipment = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1', { 'orderId': order_id, 'items': inventory['allocatedItems'], 'address': customer['shippingAddress'] }, name='create-shipment' ) return { 'orderId': order_id, 'status': 'completed', 'trackingNumber': shipment['trackingNumber'], 'estimatedDelivery': shipment['estimatedDelivery'] }

Setiap pemanggilan membuat pos pemeriksaan dalam fungsi orkestrator. Jika orkestrator terganggu setelah validasi pelanggan selesai, ia melanjutkan dari pos pemeriksaan itu dengan data pelanggan yang disimpan, melewatkan pemanggilan validasi. Ini mencegah panggilan duplikat ke layanan hilir dan memastikan eksekusi yang konsisten di seluruh interupsi.

Fungsi yang dipanggil dapat berupa fungsi Lambda yang tahan lama atau standar. Jika Anda menjalankan fungsi yang tahan lama, ia dapat memiliki alur kerja multi-langkah sendiri dengan menunggu dan pos pemeriksaan. Orkestrator hanya menunggu eksekusi yang tahan lama selesai, menerima hasil akhir.

catatan

Pemanggilan lintas akun tidak didukung. Semua fungsi yang dipanggil harus berada di AWS akun yang sama dengan fungsi pemanggilan.

Pola lanjutan

Gunakan fungsi tahan lama untuk membangun aplikasi multi-langkah kompleks yang menggabungkan beberapa operasi tahan lama, eksekusi paralel, pemrosesan array, logika bersyarat, dan polling. Pola ini memungkinkan Anda membangun aplikasi canggih yang mengoordinasikan banyak tugas sambil mempertahankan toleransi kesalahan dan pemulihan otomatis.

Pola lanjutan melampaui langkah-langkah berurutan sederhana. Anda dapat menjalankan operasi secara bersamaan denganparallel(), memproses array denganmap(), menunggu kondisi eksternal denganwaitForCondition(), dan menggabungkan primitif ini untuk membangun aplikasi yang andal. Setiap operasi yang tahan lama membuat pos pemeriksaannya sendiri, sehingga aplikasi Anda dapat pulih dari titik mana pun jika terputus.

Proses orientasi pengguna

Memandu pengguna melalui pendaftaran, verifikasi email, pengaturan profil, dan konfigurasi awal dengan penanganan coba lagi. Contoh ini menggabungkan langkah-langkah berurutan, callback, dan logika bersyarat untuk membuat proses orientasi yang lengkap.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { userId, email } = event; // Step 1: Create user account const user = await context.step("create-account", async () => { return await userService.createAccount(userId, email); }); // Step 2: Send verification email await context.step("send-verification", async () => { return await emailService.sendVerification(email); }); // Step 3: Wait for email verification (up to 48 hours) const verified = await context.waitForCallback( "email-verification", async (callbackId) => { await notificationService.sendVerificationLink({ email, callbackId, expiresIn: 172800 }); }, { timeout: { seconds: 172800 } } ); if (!verified) { await context.step("send-reminder", async () => { await emailService.sendReminder(email); }); return { status: "verification_timeout", userId, message: "Email verification not completed within 48 hours" }; } // Step 4: Initialize user profile in parallel const setupResults = await context.parallel("profile-setup", [ async (ctx: DurableContext) => { return await ctx.step("create-preferences", async () => { return await preferencesService.createDefaults(userId); }); }, async (ctx: DurableContext) => { return await ctx.step("setup-notifications", async () => { return await notificationService.setupDefaults(userId); }); }, async (ctx: DurableContext) => { return await ctx.step("create-welcome-content", async () => { return await contentService.createWelcome(userId); }); } ]); // Step 5: Send welcome email await context.step("send-welcome", async () => { const [preferences, notifications, content] = setupResults.getResults(); return await emailService.sendWelcome({ email, preferences, notifications, content }); }); return { status: "onboarding_complete", userId, completedAt: new Date().toISOString() }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig from datetime import datetime @durable_execution def lambda_handler(event, context: DurableContext): user_id = event['userId'] email = event['email'] # Step 1: Create user account user = context.step( lambda _: user_service.create_account(user_id, email), name='create-account' ) # Step 2: Send verification email context.step( lambda _: email_service.send_verification(email), name='send-verification' ) # Step 3: Wait for email verification (up to 48 hours) def send_verification_link(callback_id): notification_service.send_verification_link({ 'email': email, 'callbackId': callback_id, 'expiresIn': 172800 }) verified = context.wait_for_callback( send_verification_link, name='email-verification', config=WaitConfig(timeout=172800) ) if not verified: context.step( lambda _: email_service.send_reminder(email), name='send-reminder' ) return { 'status': 'verification_timeout', 'userId': user_id, 'message': 'Email verification not completed within 48 hours' } # Step 4: Initialize user profile in parallel def create_preferences(ctx: DurableContext): return ctx.step( lambda _: preferences_service.create_defaults(user_id), name='create-preferences' ) def setup_notifications(ctx: DurableContext): return ctx.step( lambda _: notification_service.setup_defaults(user_id), name='setup-notifications' ) def create_welcome_content(ctx: DurableContext): return ctx.step( lambda _: content_service.create_welcome(user_id), name='create-welcome-content' ) setup_results = context.parallel( [create_preferences, setup_notifications, create_welcome_content], name='profile-setup' ) # Step 5: Send welcome email def send_welcome(_): results = setup_results.get_results() preferences, notifications, content = results[0], results[1], results[2] return email_service.send_welcome({ 'email': email, 'preferences': preferences, 'notifications': notifications, 'content': content }) context.step(send_welcome, name='send-welcome') return { 'status': 'onboarding_complete', 'userId': user_id, 'completedAt': datetime.now().isoformat() }

Proses ini menggabungkan langkah-langkah berurutan dengan pos pemeriksaan untuk pembuatan akun dan pengiriman email, kemudian berhenti hingga 48 jam menunggu verifikasi email tanpa menghabiskan sumber daya. Logika bersyarat menangani jalur yang berbeda berdasarkan apakah verifikasi selesai atau habis waktu. Tugas penyiapan profil berjalan secara bersamaan menggunakan operasi paralel untuk mengurangi total waktu eksekusi, dan setiap langkah mencoba ulang secara otomatis pada kegagalan sementara untuk memastikan orientasi selesai dengan andal.

Pemrosesan batch dengan pos pemeriksaan

Memproses jutaan catatan dengan pemulihan otomatis dari pos pemeriksaan terakhir yang berhasil setelah kegagalan. Contoh ini menunjukkan bagaimana fungsi tahan lama menggabungkan map() operasi dengan chunking dan pembatasan laju untuk menangani pemrosesan data skala besar.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; interface Batch { batchIndex: number; recordIds: string[]; } export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { datasetId, batchSize = 1000 } = event; // Step 1: Get all record IDs to process const recordIds = await context.step("fetch-record-ids", async () => { return await dataService.getRecordIds(datasetId); }); // Step 2: Split into batches const batches: Batch[] = []; for (let i = 0; i < recordIds.length; i += batchSize) { batches.push({ batchIndex: Math.floor(i / batchSize), recordIds: recordIds.slice(i, i + batchSize) }); } // Step 3: Process batches with controlled concurrency const batchResults = await context.map( "process-batches", batches, async (ctx: DurableContext, batch: Batch, index: number) => { const processed = await ctx.step(`batch-${batch.batchIndex}`, async () => { const results = []; for (const recordId of batch.recordIds) { const result = await recordService.process(recordId); results.push(result); } return results; }); const validated = await ctx.step(`validate-${batch.batchIndex}`, async () => { return await validationService.validateBatch(processed); }); return { batchIndex: batch.batchIndex, recordCount: batch.recordIds.length, successCount: validated.successCount, failureCount: validated.failureCount }; }, { maxConcurrency: 5 } ); // Step 4: Aggregate results const summary = await context.step("aggregate-results", async () => { const results = batchResults.getResults(); const totalSuccess = results.reduce((sum, r) => sum + r.successCount, 0); const totalFailure = results.reduce((sum, r) => sum + r.failureCount, 0); return { datasetId, totalRecords: recordIds.length, batchesProcessed: batches.length, successCount: totalSuccess, failureCount: totalFailure, completedAt: new Date().toISOString() }; }); return summary; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution, MapConfig from datetime import datetime from typing import List, Dict @durable_execution def lambda_handler(event, context: DurableContext): dataset_id = event['datasetId'] batch_size = event.get('batchSize', 1000) # Step 1: Get all record IDs to process record_ids = context.step( lambda _: data_service.get_record_ids(dataset_id), name='fetch-record-ids' ) # Step 2: Split into batches batches = [] for i in range(0, len(record_ids), batch_size): batches.append({ 'batchIndex': i // batch_size, 'recordIds': record_ids[i:i + batch_size] }) # Step 3: Process batches with controlled concurrency def process_batch(ctx: DurableContext, batch: Dict, index: int): batch_index = batch['batchIndex'] def process_records(_): results = [] for record_id in batch['recordIds']: result = record_service.process(record_id) results.append(result) return results processed = ctx.step(process_records, name=f'batch-{batch_index}') validated = ctx.step( lambda _: validation_service.validate_batch(processed), name=f'validate-{batch_index}' ) return { 'batchIndex': batch_index, 'recordCount': len(batch['recordIds']), 'successCount': validated['successCount'], 'failureCount': validated['failureCount'] } batch_results = context.map( process_batch, batches, name='process-batches', config=MapConfig(max_concurrency=5) ) # Step 4: Aggregate results def aggregate_results(_): results = batch_results.get_results() total_success = sum(r['successCount'] for r in results) total_failure = sum(r['failureCount'] for r in results) return { 'datasetId': dataset_id, 'totalRecords': len(record_ids), 'batchesProcessed': len(batches), 'successCount': total_success, 'failureCount': total_failure, 'completedAt': datetime.now().isoformat() } summary = context.step(aggregate_results, name='aggregate-results') return summary

Catatan dibagi menjadi batch yang dapat dikelola untuk menghindari memori yang berlebihan atau layanan hilir, kemudian beberapa batch diproses secara bersamaan dengan mengendalikan paralelisme. maxConcurrency Setiap batch memiliki pos pemeriksaannya sendiri, jadi kegagalan hanya mencoba lagi batch yang gagal daripada memproses ulang semua catatan. Pola ini sangat ideal untuk pekerjaan ETL, migrasi data, atau operasi massal di mana pemrosesan dapat memakan waktu berjam-jam.

Langkah selanjutnya