Procedimiento de migración de modelos de detectores en AWS IoT Events - AWS IoT Events

Aviso de fin de soporte: el 20 de mayo de 2026, AWS finalizará el soporte para AWS IoT Events. Después del 20 de mayo de 2026, ya no podrás acceder a la AWS IoT Events consola ni a AWS IoT Events los recursos. Para obtener más información, consulta AWS IoT Events el fin del soporte.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Procedimiento de migración de modelos de detectores en AWS IoT Events

En esta sección se describen soluciones alternativas que ofrecen una funcionalidad similar a la de un modelo de detector a medida que se migra AWS IoT Events.

Puede migrar la ingesta de datos mediante AWS IoT Core reglas a una combinación de otros AWS servicios. En lugar de la ingesta de datos a través de la BatchPutMessageAPI, los datos se pueden dirigir al tema MQTT. AWS IoT Core

Este enfoque de migración aprovecha los temas de AWS IoT Core MQTT como punto de entrada para sus datos de IoT, sustituyendo la entrada directa a. AWS IoT Events Los temas de MQTT se eligen por varios motivos clave. Ofrecen una amplia compatibilidad con los dispositivos de IoT debido al uso generalizado de MQTT en la industria. Estos temas permiten gestionar grandes volúmenes de mensajes procedentes de numerosos dispositivos, lo que garantiza la escalabilidad. También proporcionan flexibilidad a la hora de enrutar y filtrar los mensajes según el contenido o el tipo de dispositivo. Además, los temas de AWS IoT Core MQTT se integran perfectamente con otros AWS servicios, lo que facilita el proceso de migración.

Los datos fluyen de los temas de MQTT a una arquitectura que combina Amazon Kinesis Data Streams, AWS Lambda una función, una tabla de Amazon DynamoDB y las planificaciones de Amazon. EventBridge Esta combinación de servicios replica y mejora la funcionalidad que anteriormente ofrecía AWS IoT Events, ofreciéndole más flexibilidad y control sobre su proceso de procesamiento de datos de IoT.

Comparación de arquitecturas

La AWS IoT Events arquitectura actual ingiere datos a través de una AWS IoT Core regla y la BatchPutMessage API. Esta arquitectura se utiliza AWS IoT Core para la ingesta de datos y la publicación de eventos, con mensajes enrutados a través de AWS IoT Events las entradas a modelos de detección que definen la lógica de estado. Un rol de IAM administra los permisos necesarios.

La nueva solución se ocupa de AWS IoT Core la ingesta de datos (ahora con temas específicos de MQTT de entrada y salida). Presenta Kinesis Data Streams para el particionamiento de datos y una función Lambda evaluadora para la lógica de estados. Los estados de los dispositivos ahora se almacenan en una tabla de DynamoDB y una función de IAM mejorada administra los permisos en estos servicios.

Finalidad Solución Diferencias

Ingesta de datos: recibe datos de dispositivos de IoT

AWS IoT Core

Ahora requiere dos temas distintos de MQTT: uno para la ingesta de datos del dispositivo y otro para la publicación de los eventos de salida

Dirección de los mensajes: enruta los mensajes entrantes a los servicios correspondientes

AWS IoT Core regla de enrutamiento de mensajes

Mantiene la misma funcionalidad de enrutamiento, pero ahora dirige los mensajes a Kinesis Data Streams en lugar de AWS IoT Events

Procesamiento de datos: gestiona y organiza los flujos de datos entrantes

Kinesis Data Streams

Sustituye la funcionalidad AWS IoT Events de entrada y permite la ingesta de datos mediante la partición de la ID del dispositivo para el procesamiento de los mensajes

Evaluación lógica: procesa los cambios de estado y desencadena acciones

Evaluador Lambda

Sustituye al modelo de AWS IoT Events detector y proporciona una evaluación de la lógica de estado personalizable mediante código en lugar de un flujo de trabajo visual

Administración del estado: mantiene los estados de los dispositivos

Tabla de DynamoDB

Nuevo componente que proporciona un almacenamiento persistente de los estados de los dispositivos, en sustitución de la administración interna AWS IoT Events del estado

Seguridad: administra los permisos de servicio

rol de IAM

Los permisos actualizados ahora incluyen el acceso a Kinesis Data Streams y DynamoDB, EventBridge además de los permisos existentes AWS IoT Core

Paso 1: (opcional) exportar AWS IoT Events las configuraciones del modelo de detector

Antes de crear nuevos recursos, exporte las definiciones AWS IoT Events del modelo de detector. Estos contienen su lógica de procesamiento de eventos y pueden servir como referencia histórica para implementar su nueva solución.

Console

Con ellas AWS IoT Events AWS Management Console, lleve a cabo los siguientes pasos para exportar las configuraciones del modelo de su detector:

Para exportar modelos de detectores mediante el AWS Management Console
  1. Inicie sesión en la consola de AWS IoT Events.

  2. En el panel de navegación izquierdo, elija Detector models (Modelos de detector).

  3. Seleccione el modelo de detector que desee exportar.

  4. Seleccione Exportar. Lea el mensaje de información sobre la salida y, a continuación, vuelva a seleccionar Exportar.

  5. Repita el proceso para cada modelo de detector que desee exportar.

Se agrega un archivo que contiene una salida JSON de su modelo de detector a la carpeta de descargas de su navegador. Si lo desea, puede guardar la configuración de cada modelo de detector para conservar los datos históricos.

AWS CLI

Con el AWS CLI, ejecute los siguientes comandos para exportar las configuraciones del modelo de detector:

Para exportar modelos de detectores mediante AWS CLI
  1. Enumere todos los modelos de detectores de su cuenta:

    aws iotevents list-detector-models
  2. Para cada modelo de detector, exporte su configuración ejecutando:

    aws iotevents describe-detector-model \ --detector-model-name your-detector-model-name
  3. Guarde la salida de cada modelo de detector.

Paso 2: creación de un rol de IAM

Cree un rol de IAM para proporcionar permisos para replicar la funcionalidad de AWS IoT Events. El rol de este ejemplo concede acceso a DynamoDB para la administración del estado, la programación EventBridge, y a Kinesis Data Streams para la AWS IoT Core ingesta de datos, la publicación de mensajes y el registro. CloudWatch En conjunto, estos servicios funcionarán como sustitutos. AWS IoT Events

  1. Cree un rol de IAM con los siguientes permisos. Para obtener instrucciones más detalladas sobre la creación de un rol de IAM, consulte Crear un rol para delegar permisos a un AWS servicio en la Guía del usuario de IAM.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "DynamoDBAccess", "Effect": "Allow", "Action": [ "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:UpdateItem", "dynamodb:DeleteItem", "dynamodb:Query", "dynamodb:Scan" ], "Resource": "arn:aws:dynamodb:your-region:your-account-id:table/EventsStateTable" }, { "Sid": "SchedulerAccess", "Effect": "Allow", "Action": [ "scheduler:CreateSchedule", "scheduler:DeleteSchedule" ], "Resource": "arn:aws:scheduler:your-region:your-account-id:schedule/*" }, { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "arn:aws:kinesis:your-region:your-account-id:stream/*" }, { "Sid": "IoTPublishAccess", "Effect": "Allow", "Action": "iot:Publish", "Resource": "arn:aws:iot:your-region:your-account-id:topic/*" }, { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:your-region:your-account-id:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs::your-account-id:log-group:/aws/lambda/your-lambda:*" ] } ] }
  2. Añada la siguiente política de confianza de roles de IAM. Una política de confianza permite a AWS los servicios especificados asumir la función de IAM para poder realizar las acciones necesarias. Para obtener instrucciones más detalladas sobre cómo crear una política de confianza de IAM, consulte Crear un rol mediante políticas de confianza personalizadas en la Guía del usuario de IAM.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "scheduler.amazonaws.com", "lambda.amazonaws.com", "iot.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }

Paso 3: Crear Amazon Kinesis Data Streams

Cree Amazon Kinesis Data Streams con AWS Management Console o. AWS CLI

Console

Para crear una transmisión de datos de Kinesis mediante el AWS Management Console, siga el procedimiento que se encuentra en la página Crear una transmisión de datos de la Guía para desarrolladores de Amazon Kinesis Data Streams.

Ajuste el recuento de fragmentos en función del número de dispositivos y del tamaño de la carga útil de los mensajes.

AWS CLI

Utilice AWS CLI Amazon Kinesis Data Streams para incorporar y particionar los datos de sus dispositivos.

En esta migración se utiliza Kinesis Data Streams para reemplazar la funcionalidad de ingesta de datos de. AWS IoT Events Proporciona una forma escalable y eficiente de recopilar, procesar y analizar datos de streaming en tiempo real desde sus dispositivos de IoT, a la vez que proporciona un manejo flexible de los datos y la integración con otros AWS servicios.

aws kinesis create-stream --stream-name your-kinesis-stream-name --shard-count 4 --region your-region

Ajusta el número de fragmentos en función del número de dispositivos y del tamaño de la carga útil de los mensajes.

Paso 4: Cree o actualice la regla de enrutamiento de mensajes de MQTT

Puede crear una nueva regla de enrutamiento de mensajes de MQTT o actualizar una regla existente.

Console
  1. Determine si necesita una nueva regla de enrutamiento de mensajes de MQTT o si puede actualizar una regla existente.

  2. Abra la consola de AWS IoT Core.

  3. En el panel de navegación, elija Enrutamiento de mensajes y, a continuación, Reglas.

  4. En la sección Administrar, selecciona Enrutamiento de mensajes y, a continuación, Reglas.

  5. Seleccione Creación de regla.

  6. En la página Especificar las propiedades de la regla, introduzca el nombre de la AWS IoT Core regla para Nombre de la regla. En Descripción de la regla (opcional), introduzca una descripción para identificar que está procesando eventos y reenviándolos a Kinesis Data Streams.

  7. En la página Configurar una sentencia SQL, introduzca lo siguiente para la sentencia SQL: y, a continuaciónSELECT * FROM 'your-database', seleccione Siguiente.

  8. En la página Adjuntar reglas y acciones y, en Acciones de regla, elija kinesis.

  9. Elija su transmisión de Kinesis para la transmisión. Escriba your-instance-id como clave de partición. Seleccione la función adecuada para la función de IAM y, a continuación, elija Añadir acción de regla.

Para obtener más información, consulte Crear reglas de AWS IoT para enrutar los datos del dispositivo a otros servicios.

AWS CLI
  1. Cree un archivo JSON denominado con el siguiente contenido. Este archivo de configuración JSON define una AWS IoT Core regla que selecciona todos los mensajes de un tema y los reenvía a la transmisión de Kinesis especificada, utilizando el ID de instancia como clave de partición.

    { "sql": "SELECT * FROM 'your-config-file'", "description": "Rule to process events and forward to Kinesis Data Streams", "actions": [ { "kinesis": { "streamName": "your-kinesis-stream-name", "roleArn": "arn:aws:iam::your-account-id:role/service-role/your-iam-role", "partitionKey": "${your-instance-id}" } } ], "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23" }
  2. Cree la regla temática de MQTT con. AWS CLI En este paso se utiliza AWS CLI para crear una regla AWS IoT Core temática con la configuración definida en el events_rule.json archivo.

    aws iot create-topic-rule \ --rule-name "your-iot-core-rule" \ --topic-rule-payload file://your-file-name.json

Paso 5: Obtenga el punto final del tema MQTT de destino

Utilice el tema MQTT de destino para configurar dónde publican sus temas los mensajes salientes, sustituyendo la funcionalidad que antes utilizaba. AWS IoT Events El punto final es exclusivo de su AWS cuenta y región.

Console
  1. Abra la consola de AWS IoT Core.

  2. En la sección Connect del panel de navegación izquierdo, selecciona Configuración de dominio.

  3. Elija la configuración del dominio IoT:DATA-ATS para abrir la página de detalles de la configuración.

  4. Copie el valor del nombre de dominio. Este valor es el punto final. Guarde el valor del punto final, ya que lo necesitará en pasos posteriores.

AWS CLI

Ejecuta el siguiente comando para obtener el AWS IoT Core punto final para publicar los mensajes salientes de tu cuenta.

aws iot describe-endpoint --endpoint-type iot:Data-ATS --region your-region

Paso 6: Crear una tabla de Amazon DynamoDB

Una tabla de Amazon DynamoDB reemplaza la funcionalidad de administración de estados AWS IoT Events de, lo que proporciona una forma escalable y flexible de conservar y administrar el estado de sus dispositivos y la lógica del modelo de detector en la nueva arquitectura de soluciones.

Console

Cree una tabla de Amazon DynamoDB para conservar el estado de los modelos de detectores. Para obtener más información, consulte Crear una tabla en DynamoDB en la Guía para desarrolladores de Amazon DynamoDB.

Utilice lo siguiente para ver los detalles de la tabla:

  • En Nombre de tabla, introduzca el nombre de tabla que desee.

  • En Clave de partición, introduce tu propio ID de instancia.

  • Puede usar la configuración predeterminada para la configuración de la tabla

AWS CLI

Ejecute el siguiente comando para crear una tabla de DynamoDB.

aws dynamodb create-table \ --table-name your-table-name \ --attribute-definitions AttributeName=your-instance-id,AttributeType=S \ --key-schema AttributeName=your-instance-id,KeyType=HASH \

Paso 7: Crear una AWS Lambda función (consola)

La función Lambda actúa como motor de procesamiento central y reemplaza la lógica de evaluación del modelo de detector de. AWS IoT Events En el ejemplo, nos integramos con otros AWS servicios para gestionar los datos entrantes, gestionar el estado y activar acciones en función de las reglas definidas.

Cree una función Lambda con NodeJS tiempo de ejecución. Utilice el siguiente fragmento de código para sustituir las constantes codificadas de forma rígida:

  1. Abra la AWS Lambda console.

  2. Elija Crear función.

  3. Introduzca un nombre para el nombre de la función.

  4. Seleccione Nodejs 22.x como motor de ejecución.

  5. En el menú desplegable Cambiar la función de ejecución predeterminada, elija Usar la función existente y, a continuación, seleccione la función de IAM que creó en los pasos anteriores.

  6. Elija Crear función.

  7. Pegue el siguiente fragmento de código después de reemplazar las constantes codificadas de forma rígida.

  8. Una vez creada la función, en la pestaña Código, pega el siguiente ejemplo de código y reemplaza el your-destination-endpoint punto final por el tuyo.

import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb'; import { PutItemCommand } from '@aws-sdk/client-dynamodb'; import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane"; import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import //// External Clients and Constants const scheduler = new SchedulerClient({}); const iot = new IoTDataPlaneClient({ endpoint: 'https://your-destination-endpoint-ats.iot.your-region.amazonaws.com/' }); const ddb = new DynamoDBClient({}); //// Lambda Handler function export const handler = async (event) => { console.log('Incoming event:', JSON.stringify(event, null, 2)); if (!event.Records) { throw new Error('No records found in event'); } const processedRecords = []; for (const record of event.Records) { try { if (record.eventSource !== 'aws:kinesis') { console.log(`Skipping non-Kinesis record from ${record.eventSource}`); continue; } // Assumes that we are processing records from Kinesis const payload = record.kinesis.data; const decodedData = Buffer.from(payload, 'base64').toString(); console.log("decoded payload is ", decodedData); const output = await handleDecodedData(decodedData); // Add additional processing logic here const processedData = { output, sequenceNumber: record.kinesis.sequenceNumber, partitionKey: record.kinesis.partitionKey, timestamp: record.kinesis.approximateArrivalTimestamp }; processedRecords.push(processedData); } catch (error) { console.error('Error processing record:', error); console.error('Failed record:', record); // Decide whether to throw error or continue processing other records // throw error; // Uncomment to stop processing on first error } } return { statusCode: 200, body: JSON.stringify({ message: 'Processing complete', processedCount: processedRecords.length, records: processedRecords }) }; }; // Helper function to handle decoded data async function handleDecodedData(payload) { try { // Parse the decoded data const parsedData = JSON.parse(payload); // Extract instanceId const instanceId = parsedData.instanceId; // Parse the input field const inputData = JSON.parse(parsedData.payload); const temperature = inputData.temperature; console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature); await iotEvents.process(instanceId, inputData) return { instanceId, temperature, // Add any other fields you want to return rawInput: inputData }; } catch (error) { console.error('Error handling decoded data:', error); throw error; } } //// Classes for declaring/defining the state machine class CurrentState { constructor(instanceId, stateName, variables, inputs) { this.stateName = stateName; this.variables = variables; this.inputs = inputs; this.instanceId = instanceId } static async load(instanceId) { console.log(`Loading state for id ${instanceId}`); try { const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({ TableName: 'EventsStateTable', Key: { 'InstanceId': { S: `${instanceId}` } } })); const { stateName, variables, inputs } = JSON.parse(stateContent); return new CurrentState(instanceId, stateName, variables, inputs); } catch (e) { console.log(`No state for id ${instanceId}: ${e}`); return undefined; } } static async save(instanceId, state) { console.log(`Saving state for id ${instanceId}`); await ddb.send(new PutItemCommand({ TableName: 'your-events-state-table-name', Item: { 'InstanceId': { S: `${instanceId}` }, 'state': { S: state } } })); } setVariable(name, value) { this.variables[name] = value; } changeState(stateName) { console.log(`Changing state from ${this.stateName} to ${stateName}`); this.stateName = stateName; } async setTimer(instanceId, frequencyInMinutes, payload) { console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`); const base64Payload = Buffer.from(JSON.stringify(payload)).toString(); console.log(base64Payload); const scheduleName = `your-schedule-name-${instanceId}-schedule`; const scheduleParams = { Name: scheduleName, FlexibleTimeWindow: { Mode: 'OFF' }, ScheduleExpression: `rate(${frequencyInMinutes} minutes)`, Target: { Arn: "arn:aws::kinesis:your-region:your-account-id:stream/your-kinesis-stream-name", RoleArn: "arn:aws::iam::your-account-id:role/service-role/your-iam-role", Input: base64Payload, KinesisParameters: { PartitionKey: instanceId, }, RetryPolicy: { MaximumRetryAttempts: 3 } }, }; const command = new CreateScheduleCommand(scheduleParams); console.log(`Sending command to set timer ${JSON.stringify(command)}`); await scheduler.send(command); } async clearTimer(instanceId) { console.log(`Cleaning timer ${instanceId}`); const scheduleName = `your-schedule-name-${instanceId}-schdule`; const command = new DeleteScheduleCommand({ Name: scheduleName }); await scheduler.send(command); } async executeAction(actionType, actionPayload) { console.log(`Will execute the ${actionType} with payload ${actionPayload}`); await iot.send(new PublishCommand({ topic: `${this.instanceId}`, payload: actionPayload, qos: 0 })); } setInput(value) { this.inputs = { ...this.inputs, ...value }; } input(name) { return this.inputs[name]; } } class IoTEvents { constructor(initialState) { this.initialState = initialState; this.states = {}; } state(name) { const state = new IoTEventsState(); this.states[name] = state; return state; } async process(instanceId, input) { let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {}); currentState.setInput(input); console.log(`With inputs as: ${JSON.stringify(currentState)}`); const state = this.states[currentState.stateName]; currentState = await state.evaluate(currentState); console.log(`With output as: ${JSON.stringify(currentState)}`); await CurrentState.save(instanceId, JSON.stringify(currentState)); } } class Event { constructor(condition, action) { this.condition = condition; this.action = action; } } class IoTEventsState { constructor() { this.eventsList = [] } events(eventListArg) { this.eventsList.push(...eventListArg); return this; } async evaluate(currentState) { for (const e of this.eventsList) { console.log(`Evaluating event ${e.condition}`); if (e.condition(currentState)) { console.log(`Event condition met`); // Execute any action as defined in iotEvents DM Definition await e.action(currentState); } } return currentState; } } ////// DetectorModel Definitions - replace with your own defintions let processAlarmStateEvent = new Event( (currentState) => { const source = currentState.input('source'); return ( currentState.input('temperature') < 70 ); }, async (currentState) => { currentState.changeState('normal'); await currentState.clearTimer(currentState.instanceId) await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`); } ); let processTimerEvent = new Event( (currentState) => { const source = currentState.input('source'); console.log(`Evaluating timer event with source ${source}`); const booleanOutput = (source !== undefined && source !== null && typeof source === 'string' && source.toLowerCase() === 'timer' && // check if the currentState == state from the timer payload currentState.input('currentState') !== undefined && currentState.input('currentState') !== null && currentState.input('currentState').toLowerCase !== 'normal'); console.log(`Timer event evaluated as ${booleanOutput}`); return booleanOutput; }, async (currentState) => { await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`); } ); let processNormalEvent = new Event( (currentState) => currentState.input('temperature') > 70, async (currentState) => { currentState.changeState('alarm'); await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`); await currentState.setTimer(currentState.instanceId, 5, { "instanceId": currentState.instanceId, "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}" }); } ); const iotEvents = new IoTEvents('normal'); iotEvents .state('normal') .events( [ processNormalEvent ]); iotEvents .state('alarm') .events([ processAlarmStateEvent, processTimerEvent ] );

Paso 8: Añadir un activador de Amazon Kinesis Data Streams

Añada un activador de Kinesis Data Streams a la función Lambda mediante o. AWS Management Console AWS CLI

Al añadir un activador de Kinesis Data Streams a la función Lambda, se establece la conexión entre el proceso de ingesta de datos y la lógica de procesamiento, lo que le permite evaluar automáticamente los flujos de datos de IoT entrantes y reaccionar ante los eventos en tiempo real, de forma similar a como se procesa las entradas. AWS IoT Events

Console

Para obtener más información, consulte Crear un mapeo de origen de eventos para invocar una función Lambda en AWS Lambda la Guía para desarrolladores.

Utilice lo siguiente para obtener los detalles del mapeo de la fuente de eventos:

AWS CLI

Ejecute el siguiente comando para crear el activador de la función Lambda.

aws lambda create-event-source-mapping \ --function-name your-lambda-name \ --event-source arn:aws:kinesis:your-region:your-account-id:stream/your-kinesis-stream-name \ --batch-size 10 \ --starting-position LATEST \ --region your-region

Paso 9: Pruebe la funcionalidad de entrada y salida de datos ()AWS CLI

Publique una carga útil en el tema MQTT en función de lo que haya definido en su modelo de detector. El siguiente es un ejemplo de carga útil del tema your-topic-name MQTT para probar una implementación.

{ "instanceId": "your-instance-id", "payload": "{\"temperature\":78}" }

Debería ver un mensaje de MQTT publicado en un tema con el siguiente contenido (o similar):

{ "state": "alarm detected, timer started" }