Aviso de fin de soporte: el 20 de mayo de 2026, AWS finalizará el soporte para AWS IoT Events. Después del 20 de mayo de 2026, ya no podrás acceder a la AWS IoT Events consola ni a AWS IoT Events los recursos. Para obtener más información, consulta AWS IoT Events el fin del soporte.
Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Procedimiento de migración de modelos de detectores en AWS IoT Events
En esta sección se describen soluciones alternativas que ofrecen una funcionalidad similar a la de un modelo de detector a medida que se migra AWS IoT Events.
Puede migrar la ingesta de datos mediante AWS IoT Core reglas a una combinación de otros AWS servicios. En lugar de la ingesta de datos a través de la BatchPutMessageAPI, los datos se pueden dirigir al tema MQTT. AWS IoT Core
Este enfoque de migración aprovecha los temas de AWS IoT Core MQTT como punto de entrada para sus datos de IoT, sustituyendo la entrada directa a. AWS IoT Events Los temas de MQTT se eligen por varios motivos clave. Ofrecen una amplia compatibilidad con los dispositivos de IoT debido al uso generalizado de MQTT en la industria. Estos temas permiten gestionar grandes volúmenes de mensajes procedentes de numerosos dispositivos, lo que garantiza la escalabilidad. También proporcionan flexibilidad a la hora de enrutar y filtrar los mensajes según el contenido o el tipo de dispositivo. Además, los temas de AWS IoT Core MQTT se integran perfectamente con otros AWS servicios, lo que facilita el proceso de migración.
Los datos fluyen de los temas de MQTT a una arquitectura que combina Amazon Kinesis Data Streams, AWS Lambda una función, una tabla de Amazon DynamoDB y las planificaciones de Amazon. EventBridge Esta combinación de servicios replica y mejora la funcionalidad que anteriormente ofrecía AWS IoT Events, ofreciéndole más flexibilidad y control sobre su proceso de procesamiento de datos de IoT.
Comparación de arquitecturas
La AWS IoT Events arquitectura actual ingiere datos a través de una AWS IoT Core regla y la BatchPutMessage
API. Esta arquitectura se utiliza AWS IoT Core para la ingesta de datos y la publicación de eventos, con mensajes enrutados a través de AWS IoT Events las entradas a modelos de detección que definen la lógica de estado. Un rol de IAM administra los permisos necesarios.
La nueva solución se ocupa de AWS IoT Core la ingesta de datos (ahora con temas específicos de MQTT de entrada y salida). Presenta Kinesis Data Streams para el particionamiento de datos y una función Lambda evaluadora para la lógica de estados. Los estados de los dispositivos ahora se almacenan en una tabla de DynamoDB y una función de IAM mejorada administra los permisos en estos servicios.
Finalidad | Solución | Diferencias |
---|---|---|
Ingesta de datos: recibe datos de dispositivos de IoT |
AWS IoT Core |
Ahora requiere dos temas distintos de MQTT: uno para la ingesta de datos del dispositivo y otro para la publicación de los eventos de salida |
Dirección de los mensajes: enruta los mensajes entrantes a los servicios correspondientes |
AWS IoT Core regla de enrutamiento de mensajes |
Mantiene la misma funcionalidad de enrutamiento, pero ahora dirige los mensajes a Kinesis Data Streams en lugar de AWS IoT Events |
Procesamiento de datos: gestiona y organiza los flujos de datos entrantes |
Kinesis Data Streams |
Sustituye la funcionalidad AWS IoT Events de entrada y permite la ingesta de datos mediante la partición de la ID del dispositivo para el procesamiento de los mensajes |
Evaluación lógica: procesa los cambios de estado y desencadena acciones |
Evaluador Lambda |
Sustituye al modelo de AWS IoT Events detector y proporciona una evaluación de la lógica de estado personalizable mediante código en lugar de un flujo de trabajo visual |
Administración del estado: mantiene los estados de los dispositivos |
Tabla de DynamoDB |
Nuevo componente que proporciona un almacenamiento persistente de los estados de los dispositivos, en sustitución de la administración interna AWS IoT Events del estado |
Seguridad: administra los permisos de servicio |
rol de IAM |
Los permisos actualizados ahora incluyen el acceso a Kinesis Data Streams y DynamoDB, EventBridge además de los permisos existentes AWS IoT Core |
Paso 1: (opcional) exportar AWS IoT Events las configuraciones del modelo de detector
Antes de crear nuevos recursos, exporte las definiciones AWS IoT Events del modelo de detector. Estos contienen su lógica de procesamiento de eventos y pueden servir como referencia histórica para implementar su nueva solución.
Paso 2: creación de un rol de IAM
Cree un rol de IAM para proporcionar permisos para replicar la funcionalidad de AWS IoT Events. El rol de este ejemplo concede acceso a DynamoDB para la administración del estado, la programación EventBridge, y a Kinesis Data Streams para la AWS IoT Core ingesta de datos, la publicación de mensajes y el registro. CloudWatch En conjunto, estos servicios funcionarán como sustitutos. AWS IoT Events
-
Cree un rol de IAM con los siguientes permisos. Para obtener instrucciones más detalladas sobre la creación de un rol de IAM, consulte Crear un rol para delegar permisos a un AWS servicio en la Guía del usuario de IAM.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "DynamoDBAccess", "Effect": "Allow", "Action": [ "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:UpdateItem", "dynamodb:DeleteItem", "dynamodb:Query", "dynamodb:Scan" ], "Resource": "arn:aws:dynamodb:
your-region
:your-account-id
:table/EventsStateTable" }, { "Sid": "SchedulerAccess", "Effect": "Allow", "Action": [ "scheduler:CreateSchedule", "scheduler:DeleteSchedule" ], "Resource": "arn:aws:scheduler:your-region
:your-account-id
:schedule/*" }, { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "arn:aws:kinesis:your-region
:your-account-id
:stream/*" }, { "Sid": "IoTPublishAccess", "Effect": "Allow", "Action": "iot:Publish", "Resource": "arn:aws:iot:your-region
:your-account-id
:topic/*" }, { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:your-region
:your-account-id
:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs::your-account-id
:log-group:/aws/lambda/your-lambda
:*" ] } ] } -
Añada la siguiente política de confianza de roles de IAM. Una política de confianza permite a AWS los servicios especificados asumir la función de IAM para poder realizar las acciones necesarias. Para obtener instrucciones más detalladas sobre cómo crear una política de confianza de IAM, consulte Crear un rol mediante políticas de confianza personalizadas en la Guía del usuario de IAM.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "scheduler.amazonaws.com", "lambda.amazonaws.com", "iot.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }
Paso 3: Crear Amazon Kinesis Data Streams
Cree Amazon Kinesis Data Streams con AWS Management Console o. AWS CLI
Paso 4: Cree o actualice la regla de enrutamiento de mensajes de MQTT
Puede crear una nueva regla de enrutamiento de mensajes de MQTT o actualizar una regla existente.
Paso 5: Obtenga el punto final del tema MQTT de destino
Utilice el tema MQTT de destino para configurar dónde publican sus temas los mensajes salientes, sustituyendo la funcionalidad que antes utilizaba. AWS IoT Events El punto final es exclusivo de su AWS cuenta y región.
Paso 6: Crear una tabla de Amazon DynamoDB
Una tabla de Amazon DynamoDB reemplaza la funcionalidad de administración de estados AWS IoT Events de, lo que proporciona una forma escalable y flexible de conservar y administrar el estado de sus dispositivos y la lógica del modelo de detector en la nueva arquitectura de soluciones.
Paso 7: Crear una AWS Lambda función (consola)
La función Lambda actúa como motor de procesamiento central y reemplaza la lógica de evaluación del modelo de detector de. AWS IoT Events En el ejemplo, nos integramos con otros AWS servicios para gestionar los datos entrantes, gestionar el estado y activar acciones en función de las reglas definidas.
Cree una función Lambda con NodeJS tiempo de ejecución. Utilice el siguiente fragmento de código para sustituir las constantes codificadas de forma rígida:
-
Abra la AWS Lambda console
. -
Elija Crear función.
-
Introduzca un nombre para el nombre de la función.
-
Seleccione Nodejs 22.x como motor de ejecución.
-
En el menú desplegable Cambiar la función de ejecución predeterminada, elija Usar la función existente y, a continuación, seleccione la función de IAM que creó en los pasos anteriores.
-
Elija Crear función.
-
Pegue el siguiente fragmento de código después de reemplazar las constantes codificadas de forma rígida.
-
Una vez creada la función, en la pestaña Código, pega el siguiente ejemplo de código y reemplaza el
your-destination-endpoint
punto final por el tuyo.
import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb'; import { PutItemCommand } from '@aws-sdk/client-dynamodb'; import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane"; import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import //// External Clients and Constants const scheduler = new SchedulerClient({}); const iot = new IoTDataPlaneClient({ endpoint: 'https://
your-destination-endpoint
-ats.iot.your-region
.amazonaws.com/' }); const ddb = new DynamoDBClient({}); //// Lambda Handler function export const handler = async (event) => { console.log('Incoming event:', JSON.stringify(event, null, 2)); if (!event.Records) { throw new Error('No records found in event'); } const processedRecords = []; for (const record of event.Records) { try { if (record.eventSource !== 'aws:kinesis') { console.log(`Skipping non-Kinesis record from ${record.eventSource}`); continue; } // Assumes that we are processing records from Kinesis const payload = record.kinesis.data; const decodedData = Buffer.from(payload, 'base64').toString(); console.log("decoded payload is ", decodedData); const output = await handleDecodedData(decodedData); // Add additional processing logic here const processedData = { output, sequenceNumber: record.kinesis.sequenceNumber, partitionKey: record.kinesis.partitionKey, timestamp: record.kinesis.approximateArrivalTimestamp }; processedRecords.push(processedData); } catch (error) { console.error('Error processing record:', error); console.error('Failed record:', record); // Decide whether to throw error or continue processing other records // throw error; // Uncomment to stop processing on first error } } return { statusCode: 200, body: JSON.stringify({ message: 'Processing complete', processedCount: processedRecords.length, records: processedRecords }) }; }; // Helper function to handle decoded data async function handleDecodedData(payload) { try { // Parse the decoded data const parsedData = JSON.parse(payload); // Extract instanceId const instanceId = parsedData.instanceId; // Parse the input field const inputData = JSON.parse(parsedData.payload); const temperature = inputData.temperature; console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature); await iotEvents.process(instanceId, inputData) return { instanceId, temperature, // Add any other fields you want to return rawInput: inputData }; } catch (error) { console.error('Error handling decoded data:', error); throw error; } } //// Classes for declaring/defining the state machine class CurrentState { constructor(instanceId, stateName, variables, inputs) { this.stateName = stateName; this.variables = variables; this.inputs = inputs; this.instanceId = instanceId } static async load(instanceId) { console.log(`Loading state for id ${instanceId}`); try { const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({ TableName: 'EventsStateTable', Key: { 'InstanceId': { S: `${instanceId}` } } })); const { stateName, variables, inputs } = JSON.parse(stateContent); return new CurrentState(instanceId, stateName, variables, inputs); } catch (e) { console.log(`No state for id ${instanceId}: ${e}`); return undefined; } } static async save(instanceId, state) { console.log(`Saving state for id ${instanceId}`); await ddb.send(new PutItemCommand({ TableName: 'your-events-state-table-name
', Item: { 'InstanceId': { S: `${instanceId}` }, 'state': { S: state } } })); } setVariable(name, value) { this.variables[name] = value; } changeState(stateName) { console.log(`Changing state from ${this.stateName} to ${stateName}`); this.stateName = stateName; } async setTimer(instanceId, frequencyInMinutes, payload) { console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`); const base64Payload = Buffer.from(JSON.stringify(payload)).toString(); console.log(base64Payload); const scheduleName = `your-schedule-name
-${instanceId}-schedule`; const scheduleParams = { Name: scheduleName, FlexibleTimeWindow: { Mode: 'OFF' }, ScheduleExpression: `rate(${frequencyInMinutes} minutes)`, Target: { Arn: "arn:aws::kinesis:your-region
:your-account-id
:stream/your-kinesis-stream-name
", RoleArn: "arn:aws::iam::your-account-id
:role/service-role/your-iam-role
", Input: base64Payload, KinesisParameters: { PartitionKey: instanceId, }, RetryPolicy: { MaximumRetryAttempts: 3 } }, }; const command = new CreateScheduleCommand(scheduleParams); console.log(`Sending command to set timer ${JSON.stringify(command)}`); await scheduler.send(command); } async clearTimer(instanceId) { console.log(`Cleaning timer ${instanceId}`); const scheduleName = `your-schedule-name
-${instanceId}-schdule`; const command = new DeleteScheduleCommand({ Name: scheduleName }); await scheduler.send(command); } async executeAction(actionType, actionPayload) { console.log(`Will execute the ${actionType} with payload ${actionPayload}`); await iot.send(new PublishCommand({ topic: `${this.instanceId}`, payload: actionPayload, qos: 0 })); } setInput(value) { this.inputs = { ...this.inputs, ...value }; } input(name) { return this.inputs[name]; } } class IoTEvents { constructor(initialState) { this.initialState = initialState; this.states = {}; } state(name) { const state = new IoTEventsState(); this.states[name] = state; return state; } async process(instanceId, input) { let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {}); currentState.setInput(input); console.log(`With inputs as: ${JSON.stringify(currentState)}`); const state = this.states[currentState.stateName]; currentState = await state.evaluate(currentState); console.log(`With output as: ${JSON.stringify(currentState)}`); await CurrentState.save(instanceId, JSON.stringify(currentState)); } } class Event { constructor(condition, action) { this.condition = condition; this.action = action; } } class IoTEventsState { constructor() { this.eventsList = [] } events(eventListArg) { this.eventsList.push(...eventListArg); return this; } async evaluate(currentState) { for (const e of this.eventsList) { console.log(`Evaluating event ${e.condition}`); if (e.condition(currentState)) { console.log(`Event condition met`); // Execute any action as defined in iotEvents DM Definition await e.action(currentState); } } return currentState; } } ////// DetectorModel Definitions - replace with your own defintionslet processAlarmStateEvent = new Event( (currentState) => { const source = currentState.input('source'); return ( currentState.input('temperature') < 70 ); }, async (currentState) => { currentState.changeState('normal'); await currentState.clearTimer(currentState.instanceId) await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`); } ); let processTimerEvent = new Event( (currentState) => { const source = currentState.input('source'); console.log(`Evaluating timer event with source ${source}`); const booleanOutput = (source !== undefined && source !== null && typeof source === 'string' && source.toLowerCase() === 'timer' && // check if the currentState == state from the timer payload currentState.input('currentState') !== undefined && currentState.input('currentState') !== null && currentState.input('currentState').toLowerCase !== 'normal'); console.log(`Timer event evaluated as ${booleanOutput}`); return booleanOutput; }, async (currentState) => { await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`); } ); let processNormalEvent = new Event( (currentState) => currentState.input('temperature') > 70, async (currentState) => { currentState.changeState('alarm'); await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`); await currentState.setTimer(currentState.instanceId, 5, { "instanceId": currentState.instanceId, "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}" }); } );
const iotEvents = new IoTEvents('normal'); iotEvents .state('normal') .events( [ processNormalEvent ]); iotEvents .state('alarm') .events([ processAlarmStateEvent, processTimerEvent ] );
Paso 8: Añadir un activador de Amazon Kinesis Data Streams
Añada un activador de Kinesis Data Streams a la función Lambda mediante o. AWS Management Console AWS CLI
Al añadir un activador de Kinesis Data Streams a la función Lambda, se establece la conexión entre el proceso de ingesta de datos y la lógica de procesamiento, lo que le permite evaluar automáticamente los flujos de datos de IoT entrantes y reaccionar ante los eventos en tiempo real, de forma similar a como se procesa las entradas. AWS IoT Events
Paso 9: Pruebe la funcionalidad de entrada y salida de datos ()AWS CLI
Publique una carga útil en el tema MQTT en función de lo que haya definido en su modelo de detector. El siguiente es un ejemplo de carga útil del tema your-topic-name
MQTT para probar una implementación.
{ "instanceId": "your-instance-id", "payload": "{\"temperature\":78}" }
Debería ver un mensaje de MQTT publicado en un tema con el siguiente contenido (o similar):
{ "state": "alarm detected, timer started" }