Avviso di fine del supporto: il 20 maggio 2026, AWS terminerà il supporto per AWS IoT Events. Dopo il 20 maggio 2026, non potrai più accedere alla AWS IoT Events console o AWS IoT Events alle risorse. Per ulteriori informazioni, consulta AWS IoT Events Fine del supporto.
Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Procedura di migrazione per i modelli di rilevatori in AWS IoT Events
Questa sezione descrive soluzioni alternative che offrono funzionalità simili a quelle dei modelli di rilevatori durante la migrazione. AWS IoT Events
È possibile migrare l'ingestione dei dati tramite AWS IoT Core regole a una combinazione di altri servizi. AWS Invece di importare i dati tramite l'BatchPutMessageAPI, i dati possono essere indirizzati all'argomento MQTT. AWS IoT Core
Questo approccio di migrazione sfrutta gli argomenti AWS IoT Core MQTT come punto di ingresso per i dati IoT, sostituendo l'input diretto a. AWS IoT Events Gli argomenti MQTT vengono scelti per diversi motivi principali. Offrono un'ampia compatibilità con i dispositivi IoT grazie all'uso diffuso di MQTT nel settore. Questi argomenti possono gestire elevati volumi di messaggi da numerosi dispositivi, garantendo la scalabilità. Offrono inoltre flessibilità nel routing e nel filtraggio dei messaggi in base al contenuto o al tipo di dispositivo. Inoltre, gli argomenti AWS IoT Core MQTT si integrano perfettamente con altri AWS servizi, facilitando il processo di migrazione.
I dati fluiscono da argomenti MQTT in un'architettura che combina Amazon Kinesis Data Streams, AWS Lambda una funzione, una tabella Amazon DynamoDB e pianificazioni Amazon. EventBridge Questa combinazione di servizi replica e migliora le funzionalità precedentemente fornite da AWS IoT Events, offrendoti maggiore flessibilità e controllo sulla pipeline di elaborazione dei dati IoT.
Architetture a confronto
L' AWS IoT Events architettura attuale inserisce i dati tramite una AWS IoT Core regola e l'API. BatchPutMessage
Questa architettura viene utilizzata AWS IoT Core per l'inserimento dei dati e la pubblicazione degli eventi, con i messaggi instradati attraverso AWS IoT Events gli input ai modelli di rilevatori che definiscono la logica di stato. Un ruolo IAM gestisce le autorizzazioni necessarie.
La nuova soluzione consente l'inserimento AWS IoT Core dei dati (ora con argomenti MQTT di input e output dedicati). Introduce Kinesis Data Streams per il partizionamento dei dati e una funzione Lambda di valutazione per la logica di stato. Gli stati dei dispositivi sono ora archiviati in una tabella DynamoDB e un ruolo IAM avanzato gestisce le autorizzazioni su questi servizi.
Scopo | Soluzione | Differenze |
---|---|---|
Inserimento di dati: riceve dati da dispositivi IoT |
AWS IoT Core |
Ora richiede due argomenti MQTT distinti: uno per l'acquisizione dei dati del dispositivo e un altro per la pubblicazione degli eventi di output |
Direzione dei messaggi: indirizza i messaggi in arrivo ai servizi appropriati |
AWS IoT Core regola di routing dei messaggi |
Mantiene la stessa funzionalità di routing ma ora indirizza i messaggi a Kinesis Data Streams anziché AWS IoT Events |
Elaborazione dei dati: gestisce e organizza i flussi di dati in entrata |
Flussi di dati Kinesis |
Sostituisce la funzionalità AWS IoT Events di input, fornendo l'inserimento dei dati con il partizionamento degli ID del dispositivo per l'elaborazione dei messaggi |
Valutazione logica: elabora i cambiamenti di stato e attiva le azioni |
Evaluator Lambda |
Sostituisce il modello del AWS IoT Events rilevatore, fornendo una valutazione della logica di stato personalizzabile tramite codice anziché un flusso di lavoro visivo |
Gestione dello stato: mantiene gli stati del dispositivo |
DynamoDB tabella |
Nuovo componente che fornisce l'archiviazione persistente degli stati del dispositivo, sostituendo la gestione interna AWS IoT Events dello stato |
Sicurezza: gestisce le autorizzazioni di servizio |
Ruolo IAM |
Le autorizzazioni aggiornate ora includono l'accesso a Kinesis Data Streams, DynamoDB e, in aggiunta alle autorizzazioni esistenti EventBridge AWS IoT Core |
Fase 1: Esporta le configurazioni del modello di rilevatore (facoltativo) AWS IoT Events
Prima di creare nuove risorse, esporta le definizioni del modello di AWS IoT Events rilevatore. Queste contengono la logica di elaborazione degli eventi e possono fungere da riferimento storico per l'implementazione della nuova soluzione.
Fase 2: creazione di un ruolo IAM
Crea un ruolo IAM per fornire le autorizzazioni per replicare la funzionalità di. AWS IoT Events Il ruolo in questo esempio concede l'accesso a DynamoDB per la gestione dello stato, per la pianificazione EventBridge, a Kinesis Data Streams per l'ingestione dei dati, per la pubblicazione di messaggi e per la registrazione. AWS IoT Core CloudWatch Insieme, questi servizi fungeranno da sostituto. AWS IoT Events
-
Creare un ruolo IAM con le seguenti autorizzazioni. Per istruzioni più dettagliate sulla creazione di un ruolo IAM, consulta Creare un ruolo per delegare le autorizzazioni a un AWS servizio nella Guida per l'utente IAM.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "DynamoDBAccess", "Effect": "Allow", "Action": [ "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:UpdateItem", "dynamodb:DeleteItem", "dynamodb:Query", "dynamodb:Scan" ], "Resource": "arn:aws:dynamodb:
your-region
:your-account-id
:table/EventsStateTable" }, { "Sid": "SchedulerAccess", "Effect": "Allow", "Action": [ "scheduler:CreateSchedule", "scheduler:DeleteSchedule" ], "Resource": "arn:aws:scheduler:your-region
:your-account-id
:schedule/*" }, { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "arn:aws:kinesis:your-region
:your-account-id
:stream/*" }, { "Sid": "IoTPublishAccess", "Effect": "Allow", "Action": "iot:Publish", "Resource": "arn:aws:iot:your-region
:your-account-id
:topic/*" }, { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:your-region
:your-account-id
:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs::your-account-id
:log-group:/aws/lambda/your-lambda
:*" ] } ] } -
Aggiungi la seguente policy di fiducia per i ruoli IAM. Una policy di fiducia consente ai AWS servizi specificati di assumere il ruolo IAM in modo da poter eseguire le azioni necessarie. Per istruzioni più dettagliate sulla creazione di una policy di fiducia IAM, consulta Create a role using custom trust policy nella IAM User Guide.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "scheduler.amazonaws.com", "lambda.amazonaws.com", "iot.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }
Fase 3: creazione di Amazon Kinesis Data Streams
Crea Amazon Kinesis Data Streams AWS Management Console utilizzando o. AWS CLI
Fase 4: Creare o aggiornare la regola di routing dei messaggi MQTT
È possibile creare una nuova regola di routing dei messaggi MQTT o aggiornare una regola esistente.
Fase 5: Ottenere l'endpoint per l'argomento MQTT di destinazione
Utilizzate l'argomento MQTT di destinazione per configurare dove i vostri argomenti pubblicano i messaggi in uscita, sostituendo la funzionalità precedentemente gestita da. AWS IoT Events L'endpoint è unico per il tuo account e la tua regione AWS .
Fase 6: creare una tabella Amazon DynamoDB
Una tabella Amazon DynamoDB sostituisce la funzionalità AWS IoT Events di gestione dello stato di, fornendo un modo scalabile e flessibile per persistere e gestire lo stato dei dispositivi e la logica del modello di rilevamento nella nuova architettura della soluzione.
Fase 7: Creare una AWS Lambda funzione (console)
La funzione Lambda funge da motore di elaborazione principale, sostituendo la logica di valutazione del modello di rilevatore di. AWS IoT Events Nell'esempio, ci integriamo con altri AWS servizi per gestire i dati in entrata, gestire lo stato e attivare azioni in base alle regole definite dall'utente.
Crea una funzione Lambda con NodeJS runtime. Usa il seguente frammento di codice, sostituendo le costanti codificate:
-
Apri la AWS Lambda console
. -
Scegli Crea funzione.
-
Immettete un nome per il nome della funzione.
-
Seleziona NodeJS 22.x come Runtime.
-
Nel menu a discesa Modifica ruolo di esecuzione predefinito, scegli Usa ruolo esistente, quindi seleziona il ruolo IAM creato nei passaggi precedenti.
-
Scegli Crea funzione.
-
Incolla il seguente frammento di codice dopo aver sostituito le costanti codificate.
-
Dopo la creazione della funzione, nella scheda Codice, incolla il seguente esempio di codice, sostituendo l'
your-destination-endpoint
endpoint con il tuo.
import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb'; import { PutItemCommand } from '@aws-sdk/client-dynamodb'; import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane"; import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import //// External Clients and Constants const scheduler = new SchedulerClient({}); const iot = new IoTDataPlaneClient({ endpoint: 'https://
your-destination-endpoint
-ats.iot.your-region
.amazonaws.com/' }); const ddb = new DynamoDBClient({}); //// Lambda Handler function export const handler = async (event) => { console.log('Incoming event:', JSON.stringify(event, null, 2)); if (!event.Records) { throw new Error('No records found in event'); } const processedRecords = []; for (const record of event.Records) { try { if (record.eventSource !== 'aws:kinesis') { console.log(`Skipping non-Kinesis record from ${record.eventSource}`); continue; } // Assumes that we are processing records from Kinesis const payload = record.kinesis.data; const decodedData = Buffer.from(payload, 'base64').toString(); console.log("decoded payload is ", decodedData); const output = await handleDecodedData(decodedData); // Add additional processing logic here const processedData = { output, sequenceNumber: record.kinesis.sequenceNumber, partitionKey: record.kinesis.partitionKey, timestamp: record.kinesis.approximateArrivalTimestamp }; processedRecords.push(processedData); } catch (error) { console.error('Error processing record:', error); console.error('Failed record:', record); // Decide whether to throw error or continue processing other records // throw error; // Uncomment to stop processing on first error } } return { statusCode: 200, body: JSON.stringify({ message: 'Processing complete', processedCount: processedRecords.length, records: processedRecords }) }; }; // Helper function to handle decoded data async function handleDecodedData(payload) { try { // Parse the decoded data const parsedData = JSON.parse(payload); // Extract instanceId const instanceId = parsedData.instanceId; // Parse the input field const inputData = JSON.parse(parsedData.payload); const temperature = inputData.temperature; console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature); await iotEvents.process(instanceId, inputData) return { instanceId, temperature, // Add any other fields you want to return rawInput: inputData }; } catch (error) { console.error('Error handling decoded data:', error); throw error; } } //// Classes for declaring/defining the state machine class CurrentState { constructor(instanceId, stateName, variables, inputs) { this.stateName = stateName; this.variables = variables; this.inputs = inputs; this.instanceId = instanceId } static async load(instanceId) { console.log(`Loading state for id ${instanceId}`); try { const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({ TableName: 'EventsStateTable', Key: { 'InstanceId': { S: `${instanceId}` } } })); const { stateName, variables, inputs } = JSON.parse(stateContent); return new CurrentState(instanceId, stateName, variables, inputs); } catch (e) { console.log(`No state for id ${instanceId}: ${e}`); return undefined; } } static async save(instanceId, state) { console.log(`Saving state for id ${instanceId}`); await ddb.send(new PutItemCommand({ TableName: 'your-events-state-table-name
', Item: { 'InstanceId': { S: `${instanceId}` }, 'state': { S: state } } })); } setVariable(name, value) { this.variables[name] = value; } changeState(stateName) { console.log(`Changing state from ${this.stateName} to ${stateName}`); this.stateName = stateName; } async setTimer(instanceId, frequencyInMinutes, payload) { console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`); const base64Payload = Buffer.from(JSON.stringify(payload)).toString(); console.log(base64Payload); const scheduleName = `your-schedule-name
-${instanceId}-schedule`; const scheduleParams = { Name: scheduleName, FlexibleTimeWindow: { Mode: 'OFF' }, ScheduleExpression: `rate(${frequencyInMinutes} minutes)`, Target: { Arn: "arn:aws::kinesis:your-region
:your-account-id
:stream/your-kinesis-stream-name
", RoleArn: "arn:aws::iam::your-account-id
:role/service-role/your-iam-role
", Input: base64Payload, KinesisParameters: { PartitionKey: instanceId, }, RetryPolicy: { MaximumRetryAttempts: 3 } }, }; const command = new CreateScheduleCommand(scheduleParams); console.log(`Sending command to set timer ${JSON.stringify(command)}`); await scheduler.send(command); } async clearTimer(instanceId) { console.log(`Cleaning timer ${instanceId}`); const scheduleName = `your-schedule-name
-${instanceId}-schdule`; const command = new DeleteScheduleCommand({ Name: scheduleName }); await scheduler.send(command); } async executeAction(actionType, actionPayload) { console.log(`Will execute the ${actionType} with payload ${actionPayload}`); await iot.send(new PublishCommand({ topic: `${this.instanceId}`, payload: actionPayload, qos: 0 })); } setInput(value) { this.inputs = { ...this.inputs, ...value }; } input(name) { return this.inputs[name]; } } class IoTEvents { constructor(initialState) { this.initialState = initialState; this.states = {}; } state(name) { const state = new IoTEventsState(); this.states[name] = state; return state; } async process(instanceId, input) { let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {}); currentState.setInput(input); console.log(`With inputs as: ${JSON.stringify(currentState)}`); const state = this.states[currentState.stateName]; currentState = await state.evaluate(currentState); console.log(`With output as: ${JSON.stringify(currentState)}`); await CurrentState.save(instanceId, JSON.stringify(currentState)); } } class Event { constructor(condition, action) { this.condition = condition; this.action = action; } } class IoTEventsState { constructor() { this.eventsList = [] } events(eventListArg) { this.eventsList.push(...eventListArg); return this; } async evaluate(currentState) { for (const e of this.eventsList) { console.log(`Evaluating event ${e.condition}`); if (e.condition(currentState)) { console.log(`Event condition met`); // Execute any action as defined in iotEvents DM Definition await e.action(currentState); } } return currentState; } } ////// DetectorModel Definitions - replace with your own defintionslet processAlarmStateEvent = new Event( (currentState) => { const source = currentState.input('source'); return ( currentState.input('temperature') < 70 ); }, async (currentState) => { currentState.changeState('normal'); await currentState.clearTimer(currentState.instanceId) await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`); } ); let processTimerEvent = new Event( (currentState) => { const source = currentState.input('source'); console.log(`Evaluating timer event with source ${source}`); const booleanOutput = (source !== undefined && source !== null && typeof source === 'string' && source.toLowerCase() === 'timer' && // check if the currentState == state from the timer payload currentState.input('currentState') !== undefined && currentState.input('currentState') !== null && currentState.input('currentState').toLowerCase !== 'normal'); console.log(`Timer event evaluated as ${booleanOutput}`); return booleanOutput; }, async (currentState) => { await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`); } ); let processNormalEvent = new Event( (currentState) => currentState.input('temperature') > 70, async (currentState) => { currentState.changeState('alarm'); await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`); await currentState.setTimer(currentState.instanceId, 5, { "instanceId": currentState.instanceId, "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}" }); } );
const iotEvents = new IoTEvents('normal'); iotEvents .state('normal') .events( [ processNormalEvent ]); iotEvents .state('alarm') .events([ processAlarmStateEvent, processTimerEvent ] );
Fase 8: aggiungere un trigger Amazon Kinesis Data Streams
Aggiungi un trigger Kinesis Data Streams alla funzione Lambda utilizzando o. AWS Management Console AWS CLI
L'aggiunta di un trigger Kinesis Data Streams alla funzione Lambda stabilisce la connessione tra la pipeline di inserimento dei dati e la logica di elaborazione, consentendole di valutare automaticamente i flussi di dati IoT in entrata e reagire agli eventi in tempo reale, in modo simile a come elabora gli input. AWS IoT Events
Fase 9: Verificare la funzionalità di inserimento e output dei dati ()AWS CLI
Pubblicate un payload sull'argomento MQTT in base a ciò che avete definito nel modello del rilevatore. Di seguito è riportato un esempio di payload relativo all'argomento your-topic-name
MQTT per testare un'implementazione.
{ "instanceId": "your-instance-id", "payload": "{\"temperature\":78}" }
Dovresti vedere un messaggio MQTT pubblicato su un argomento con il seguente contenuto (o simile):
{ "state": "alarm detected, timer started" }