Pemberitahuan akhir dukungan: Pada 20 Mei 2026, AWS akan mengakhiri dukungan untuk AWS IoT Events. Setelah 20 Mei 2026, Anda tidak akan lagi dapat mengakses AWS IoT Events konsol atau AWS IoT Events sumber daya. Untuk informasi selengkapnya, lihat AWS IoT Events akhir dukungan.
Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Prosedur migrasi untuk model detektor di AWS IoT Events
Bagian ini menjelaskan solusi alternatif yang menghadirkan fungsionalitas model detektor serupa saat Anda AWS IoT Events bermigrasi.
Anda dapat memigrasikan konsumsi data melalui AWS IoT Core aturan ke kombinasi layanan lain. AWS Alih-alih menelan data melalui BatchPutMessageAPI, data dapat dirutekan ke topik MQTT. AWS IoT Core
Pendekatan migrasi ini memanfaatkan topik AWS IoT Core MQTT sebagai titik masuk untuk data IoT Anda, menggantikan input langsung ke. AWS IoT Events Topik MQTT dipilih karena beberapa alasan utama. Mereka menawarkan kompatibilitas yang luas dengan perangkat IoT karena penggunaan MQTT yang meluas di industri. Topik-topik ini dapat menangani volume pesan yang tinggi dari berbagai perangkat, memastikan skalabilitas. Mereka juga memberikan fleksibilitas dalam perutean dan pemfilteran pesan berdasarkan konten atau jenis perangkat. Selain itu, topik AWS IoT Core MQTT terintegrasi secara mulus dengan AWS layanan lain, memfasilitasi proses migrasi.
Data mengalir dari topik MQTT ke dalam arsitektur yang menggabungkan Amazon Kinesis Data Streams, AWS Lambda fungsi, tabel Amazon DynamoDB, dan jadwal Amazon. EventBridge Kombinasi layanan ini mereplikasi dan meningkatkan fungsionalitas yang sebelumnya disediakan oleh AWS IoT Events, menawarkan Anda lebih banyak fleksibilitas dan kontrol atas jalur pemrosesan data IoT Anda.
Membandingkan arsitektur
AWS IoT Events Arsitektur saat ini menyerap data melalui AWS IoT Core aturan dan BatchPutMessage
API. Arsitektur ini digunakan AWS IoT Core untuk konsumsi data dan penerbitan acara, dengan pesan yang dirutekan melalui AWS IoT Events input ke model detektor yang menentukan logika status. Peran IAM mengelola izin yang diperlukan.
Solusi baru dipertahankan AWS IoT Core untuk konsumsi data (sekarang dengan topik MQTT input dan output khusus). Ini memperkenalkan Kinesis Data Streams untuk partisi data dan fungsi Lambda evaluator untuk logika keadaan. Status perangkat sekarang disimpan dalam tabel DynamoDB, dan peran IAM yang disempurnakan mengelola izin di seluruh layanan ini.
Tujuan | Solusi | Perbedaan |
---|---|---|
Penyerapan data - Menerima data dari perangkat IoT |
AWS IoT Core |
Sekarang membutuhkan dua topik MQTT yang berbeda: satu untuk menelan data perangkat dan satu lagi untuk menerbitkan acara keluaran |
Arah pesan - Merutekan pesan masuk ke layanan yang sesuai |
AWS IoT Core aturan perutean pesan |
Mempertahankan fungsionalitas perutean yang sama tetapi sekarang mengarahkan pesan ke Kinesis Data Streams, bukan AWS IoT Events |
Pemrosesan data - Menangani dan mengatur aliran data yang masuk |
Kinesis Data Streams |
Menggantikan fungsionalitas AWS IoT Events input, menyediakan konsumsi data dengan partisi ID perangkat untuk pemrosesan pesan |
Evaluasi logika — Memproses perubahan status dan memicu tindakan |
Penilai Lambda |
Mengganti model AWS IoT Events detektor, menyediakan evaluasi logika status yang dapat disesuaikan melalui kode alih-alih alur kerja visual |
Manajemen negara - Mempertahankan status perangkat |
Tabel DynamoDB |
Komponen baru yang menyediakan penyimpanan status perangkat yang persisten, menggantikan manajemen AWS IoT Events status internal |
Keamanan - Mengelola izin layanan |
Peran IAM |
Izin yang diperbarui sekarang mencakup akses ke Kinesis Data Streams, DynamoDB, dan selain izin yang ada EventBridge AWS IoT Core |
Langkah 1: (Opsional) konfigurasi model AWS IoT Events detektor ekspor
Sebelum membuat sumber daya baru, ekspor definisi model AWS IoT Events detektor Anda. Ini berisi logika pemrosesan acara Anda dan dapat berfungsi sebagai referensi historis untuk menerapkan solusi baru Anda.
Langkah 2: Buat peran IAM
Buat peran IAM untuk memberikan izin untuk mereplikasi fungsionalitas. AWS IoT Events Peran dalam contoh ini memberikan akses ke DynamoDB untuk manajemen status, untuk penjadwalan EventBridge, Kinesis Data Streams untuk konsumsi data, untuk memublikasikan pesan, dan untuk pencatatan. AWS IoT Core CloudWatch Bersama-sama, layanan ini berfungsi sebagai pengganti AWS IoT Events.
-
Buat peran IAM dengan izin sebagai berikut. Untuk petunjuk selengkapnya tentang cara membuat peran IAM, lihat Membuat peran untuk mendelegasikan izin ke AWS layanan di Panduan Pengguna IAM.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "DynamoDBAccess", "Effect": "Allow", "Action": [ "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:UpdateItem", "dynamodb:DeleteItem", "dynamodb:Query", "dynamodb:Scan" ], "Resource": "arn:aws:dynamodb:
your-region
:your-account-id
:table/EventsStateTable" }, { "Sid": "SchedulerAccess", "Effect": "Allow", "Action": [ "scheduler:CreateSchedule", "scheduler:DeleteSchedule" ], "Resource": "arn:aws:scheduler:your-region
:your-account-id
:schedule/*" }, { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "arn:aws:kinesis:your-region
:your-account-id
:stream/*" }, { "Sid": "IoTPublishAccess", "Effect": "Allow", "Action": "iot:Publish", "Resource": "arn:aws:iot:your-region
:your-account-id
:topic/*" }, { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:your-region
:your-account-id
:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs::your-account-id
:log-group:/aws/lambda/your-lambda
:*" ] } ] } -
Tambahkan kebijakan kepercayaan peran IAM berikut. Kebijakan kepercayaan memungkinkan AWS layanan yang ditentukan untuk mengambil peran IAM sehingga mereka dapat melakukan tindakan yang diperlukan. Untuk petunjuk selengkapnya tentang cara membuat kebijakan kepercayaan IAM, lihat Membuat peran menggunakan kebijakan kepercayaan khusus di Panduan Pengguna IAM.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "scheduler.amazonaws.com", "lambda.amazonaws.com", "iot.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }
Langkah 3: Buat Amazon Kinesis Data Streams
Buat Amazon Kinesis Data Streams AWS Management Console menggunakan atau. AWS CLI
Langkah 4: Buat atau perbarui aturan perutean pesan MQTT
Anda dapat membuat aturan perutean pesan MQTT baru atau memperbarui aturan yang ada.
Langkah 5: Dapatkan titik akhir untuk topik MQTT tujuan
Gunakan topik MQTT tujuan untuk mengonfigurasi tempat topik Anda mempublikasikan pesan keluar, menggantikan fungsionalitas yang sebelumnya ditangani oleh. AWS IoT Events Titik akhir unik untuk AWS akun dan wilayah Anda.
Langkah 6: Buat tabel Amazon DynamoDB
Tabel Amazon DynamoDB menggantikan fungsionalitas AWS IoT Events manajemen status, menyediakan cara yang skalabel dan fleksibel untuk mempertahankan dan mengelola status perangkat Anda dan logika model detektor dalam arsitektur solusi baru Anda.
Langkah 7: Buat AWS Lambda fungsi (konsol)
Fungsi Lambda berfungsi sebagai mesin pemrosesan inti, menggantikan logika evaluasi model detektor. AWS IoT Events Dalam contoh, kami mengintegrasikan dengan AWS layanan lain untuk menangani data yang masuk, mengelola status, dan memicu tindakan berdasarkan aturan yang Anda tetapkan.
Buat fungsi Lambda dengan NodeJS runtime. Gunakan cuplikan kode berikut, ganti konstanta hard-code:
-
Buka AWS Lambda console
. -
Pilih Buat fungsi.
-
Masukkan nama untuk nama Fungsi.
-
Pilih NodeJS 22.x sebagai Runtime.
-
Di dropdown Ubah peran eksekusi default, pilih Gunakan peran yang ada, lalu pilih peran IAM yang Anda buat di langkah sebelumnya.
-
Pilih Buat fungsi.
-
Tempel cuplikan kode berikut setelah mengganti konstanta kode keras.
-
Setelah fungsi Anda dibuat, di bawah tab Kode, tempel contoh kode berikut, ganti
your-destination-endpoint
titik akhir dengan milik Anda sendiri.
import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb'; import { PutItemCommand } from '@aws-sdk/client-dynamodb'; import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane"; import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import //// External Clients and Constants const scheduler = new SchedulerClient({}); const iot = new IoTDataPlaneClient({ endpoint: 'https://
your-destination-endpoint
-ats.iot.your-region
.amazonaws.com/' }); const ddb = new DynamoDBClient({}); //// Lambda Handler function export const handler = async (event) => { console.log('Incoming event:', JSON.stringify(event, null, 2)); if (!event.Records) { throw new Error('No records found in event'); } const processedRecords = []; for (const record of event.Records) { try { if (record.eventSource !== 'aws:kinesis') { console.log(`Skipping non-Kinesis record from ${record.eventSource}`); continue; } // Assumes that we are processing records from Kinesis const payload = record.kinesis.data; const decodedData = Buffer.from(payload, 'base64').toString(); console.log("decoded payload is ", decodedData); const output = await handleDecodedData(decodedData); // Add additional processing logic here const processedData = { output, sequenceNumber: record.kinesis.sequenceNumber, partitionKey: record.kinesis.partitionKey, timestamp: record.kinesis.approximateArrivalTimestamp }; processedRecords.push(processedData); } catch (error) { console.error('Error processing record:', error); console.error('Failed record:', record); // Decide whether to throw error or continue processing other records // throw error; // Uncomment to stop processing on first error } } return { statusCode: 200, body: JSON.stringify({ message: 'Processing complete', processedCount: processedRecords.length, records: processedRecords }) }; }; // Helper function to handle decoded data async function handleDecodedData(payload) { try { // Parse the decoded data const parsedData = JSON.parse(payload); // Extract instanceId const instanceId = parsedData.instanceId; // Parse the input field const inputData = JSON.parse(parsedData.payload); const temperature = inputData.temperature; console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature); await iotEvents.process(instanceId, inputData) return { instanceId, temperature, // Add any other fields you want to return rawInput: inputData }; } catch (error) { console.error('Error handling decoded data:', error); throw error; } } //// Classes for declaring/defining the state machine class CurrentState { constructor(instanceId, stateName, variables, inputs) { this.stateName = stateName; this.variables = variables; this.inputs = inputs; this.instanceId = instanceId } static async load(instanceId) { console.log(`Loading state for id ${instanceId}`); try { const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({ TableName: 'EventsStateTable', Key: { 'InstanceId': { S: `${instanceId}` } } })); const { stateName, variables, inputs } = JSON.parse(stateContent); return new CurrentState(instanceId, stateName, variables, inputs); } catch (e) { console.log(`No state for id ${instanceId}: ${e}`); return undefined; } } static async save(instanceId, state) { console.log(`Saving state for id ${instanceId}`); await ddb.send(new PutItemCommand({ TableName: 'your-events-state-table-name
', Item: { 'InstanceId': { S: `${instanceId}` }, 'state': { S: state } } })); } setVariable(name, value) { this.variables[name] = value; } changeState(stateName) { console.log(`Changing state from ${this.stateName} to ${stateName}`); this.stateName = stateName; } async setTimer(instanceId, frequencyInMinutes, payload) { console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`); const base64Payload = Buffer.from(JSON.stringify(payload)).toString(); console.log(base64Payload); const scheduleName = `your-schedule-name
-${instanceId}-schedule`; const scheduleParams = { Name: scheduleName, FlexibleTimeWindow: { Mode: 'OFF' }, ScheduleExpression: `rate(${frequencyInMinutes} minutes)`, Target: { Arn: "arn:aws::kinesis:your-region
:your-account-id
:stream/your-kinesis-stream-name
", RoleArn: "arn:aws::iam::your-account-id
:role/service-role/your-iam-role
", Input: base64Payload, KinesisParameters: { PartitionKey: instanceId, }, RetryPolicy: { MaximumRetryAttempts: 3 } }, }; const command = new CreateScheduleCommand(scheduleParams); console.log(`Sending command to set timer ${JSON.stringify(command)}`); await scheduler.send(command); } async clearTimer(instanceId) { console.log(`Cleaning timer ${instanceId}`); const scheduleName = `your-schedule-name
-${instanceId}-schdule`; const command = new DeleteScheduleCommand({ Name: scheduleName }); await scheduler.send(command); } async executeAction(actionType, actionPayload) { console.log(`Will execute the ${actionType} with payload ${actionPayload}`); await iot.send(new PublishCommand({ topic: `${this.instanceId}`, payload: actionPayload, qos: 0 })); } setInput(value) { this.inputs = { ...this.inputs, ...value }; } input(name) { return this.inputs[name]; } } class IoTEvents { constructor(initialState) { this.initialState = initialState; this.states = {}; } state(name) { const state = new IoTEventsState(); this.states[name] = state; return state; } async process(instanceId, input) { let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {}); currentState.setInput(input); console.log(`With inputs as: ${JSON.stringify(currentState)}`); const state = this.states[currentState.stateName]; currentState = await state.evaluate(currentState); console.log(`With output as: ${JSON.stringify(currentState)}`); await CurrentState.save(instanceId, JSON.stringify(currentState)); } } class Event { constructor(condition, action) { this.condition = condition; this.action = action; } } class IoTEventsState { constructor() { this.eventsList = [] } events(eventListArg) { this.eventsList.push(...eventListArg); return this; } async evaluate(currentState) { for (const e of this.eventsList) { console.log(`Evaluating event ${e.condition}`); if (e.condition(currentState)) { console.log(`Event condition met`); // Execute any action as defined in iotEvents DM Definition await e.action(currentState); } } return currentState; } } ////// DetectorModel Definitions - replace with your own defintionslet processAlarmStateEvent = new Event( (currentState) => { const source = currentState.input('source'); return ( currentState.input('temperature') < 70 ); }, async (currentState) => { currentState.changeState('normal'); await currentState.clearTimer(currentState.instanceId) await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`); } ); let processTimerEvent = new Event( (currentState) => { const source = currentState.input('source'); console.log(`Evaluating timer event with source ${source}`); const booleanOutput = (source !== undefined && source !== null && typeof source === 'string' && source.toLowerCase() === 'timer' && // check if the currentState == state from the timer payload currentState.input('currentState') !== undefined && currentState.input('currentState') !== null && currentState.input('currentState').toLowerCase !== 'normal'); console.log(`Timer event evaluated as ${booleanOutput}`); return booleanOutput; }, async (currentState) => { await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`); } ); let processNormalEvent = new Event( (currentState) => currentState.input('temperature') > 70, async (currentState) => { currentState.changeState('alarm'); await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`); await currentState.setTimer(currentState.instanceId, 5, { "instanceId": currentState.instanceId, "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}" }); } );
const iotEvents = new IoTEvents('normal'); iotEvents .state('normal') .events( [ processNormalEvent ]); iotEvents .state('alarm') .events([ processAlarmStateEvent, processTimerEvent ] );
Langkah 8: Tambahkan pemicu Amazon Kinesis Data Streams
Tambahkan pemicu Kinesis Data Streams ke fungsi Lambda menggunakan atau. AWS Management Console AWS CLI
Menambahkan pemicu Kinesis Data Streams ke fungsi Lambda Anda akan membuat koneksi antara pipeline konsumsi data dan logika pemrosesan Anda, memungkinkannya secara otomatis mengevaluasi aliran data IoT yang masuk dan bereaksi terhadap peristiwa secara real-time, mirip dengan cara memproses input. AWS IoT Events
Langkah 9: Uji konsumsi data dan fungsionalitas keluaran ()AWS CLI
Publikasikan payload ke topik MQTT berdasarkan apa yang Anda tentukan dalam model detektor Anda. Berikut ini adalah contoh payload ke topik MQTT your-topic-name
untuk menguji implementasi.
{ "instanceId": "your-instance-id", "payload": "{\"temperature\":78}" }
Anda akan melihat pesan MQTT yang dipublikasikan ke topik dengan konten berikut (atau serupa):
{ "state": "alarm detected, timer started" }