Aviso de fim do suporte: em 20 de maio de 2026, AWS encerrará o suporte para AWS IoT Events. Depois de 20 de maio de 2026, você não poderá mais acessar o AWS IoT Events console ou os AWS IoT Events recursos. Para obter mais informações, consulte AWS IoT Events Fim do suporte.
As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Procedimento de migração para modelos de detectores em AWS IoT Events
Esta seção descreve soluções alternativas que oferecem funcionalidade semelhante ao modelo de detector à medida que você migra para fora do AWS IoT Events.
Você pode migrar a ingestão de dados por meio de AWS IoT Core regras para uma combinação de outros AWS serviços. Em vez da ingestão de dados por meio da BatchPutMessageAPI, os dados podem ser roteados para o tópico do AWS IoT Core MQTT.
Essa abordagem de migração utiliza os tópicos do AWS IoT Core MQTT como ponto de entrada para seus dados de IoT, substituindo a entrada direta para. AWS IoT Events Os tópicos do MQTT são escolhidos por vários motivos principais. Eles oferecem ampla compatibilidade com dispositivos de IoT devido ao amplo uso do MQTT no setor. Esses tópicos podem lidar com grandes volumes de mensagens de vários dispositivos, garantindo escalabilidade. Eles também oferecem flexibilidade no roteamento e na filtragem de mensagens com base no conteúdo ou no tipo de dispositivo. Além disso, os tópicos do AWS IoT Core MQTT se integram perfeitamente a outros AWS serviços, facilitando o processo de migração.
Os dados fluem dos tópicos do MQTT para uma arquitetura que combina o Amazon Kinesis Data Streams, AWS Lambda uma função, uma tabela do Amazon DynamoDB e agendas da Amazon. EventBridge Essa combinação de serviços replica e aprimora a funcionalidade fornecida anteriormente AWS IoT Events, oferecendo mais flexibilidade e controle sobre seu pipeline de processamento de dados de IoT.
Comparando arquiteturas
A AWS IoT Events arquitetura atual ingere dados por meio de uma AWS IoT Core regra e da BatchPutMessage
API. Essa arquitetura é usada AWS IoT Core para ingestão de dados e publicação de eventos, com mensagens roteadas por meio de AWS IoT Events entradas para modelos de detectores que definem a lógica do estado. Uma função do IAM gerencia as permissões necessárias.
A nova solução é mantida AWS IoT Core para ingestão de dados (agora com tópicos dedicados de entrada e saída do MQTT). Ele apresenta o Kinesis Data Streams para particionamento de dados e uma função Lambda avaliadora para lógica de estado. Os estados do dispositivo agora são armazenados em uma tabela do DynamoDB, e uma função aprimorada do IAM gerencia as permissões nesses serviços.
Finalidade | Solução | Diferenças |
---|---|---|
Ingestão de dados — recebe dados de dispositivos de IoT |
AWS IoT Core |
Agora requer dois tópicos distintos do MQTT: um para ingerir dados do dispositivo e outro para publicar eventos de saída |
Direção da mensagem — encaminha as mensagens recebidas para os serviços apropriados |
AWS IoT Core regra de roteamento de mensagens |
Mantém a mesma funcionalidade de roteamento, mas agora direciona as mensagens para o Kinesis Data Streams em vez de AWS IoT Events |
Processamento de dados — manipula e organiza fluxos de dados recebidos |
Kinesis Data Streams |
Substitui a funcionalidade AWS IoT Events de entrada, fornecendo ingestão de dados com particionamento de ID de dispositivo para processamento de mensagens |
Avaliação lógica — Processa mudanças de estado e aciona ações |
Avaliador Lambda |
Substitui o modelo do AWS IoT Events detector, fornecendo avaliação de lógica de estado personalizável por meio de código em vez de fluxo de trabalho visual |
Gerenciamento de estado — mantém os estados do dispositivo |
Tabela do DynamoDB |
Novo componente que fornece armazenamento persistente dos estados do dispositivo, substituindo o gerenciamento interno AWS IoT Events do estado |
Segurança — gerencia as permissões do serviço |
Perfil do IAM |
As permissões atualizadas agora incluem acesso ao Kinesis Data Streams, ao DynamoDB e às permissões EventBridge existentes AWS IoT Core |
Etapa 1: (opcional) exportar configurações do modelo do AWS IoT Events detector
Antes de criar novos recursos, exporte as definições do modelo do AWS IoT Events detector. Eles contêm sua lógica de processamento de eventos e podem servir como referência histórica para implementar sua nova solução.
Etapa 2: Criar uma função do IAM
Crie uma função do IAM para fornecer permissões para replicar a funcionalidade do AWS IoT Events. A função neste exemplo concede acesso ao DynamoDB para gerenciamento de estado, agendamento EventBridge, Kinesis Data Streams para AWS IoT Core ingestão de dados, publicação de mensagens e registro em log. CloudWatch Juntos, esses serviços funcionarão como substitutos AWS IoT Events.
-
Criar uma função do IAM com as permissões a seguir. Para obter instruções mais detalhadas sobre como criar uma função do IAM, consulte Criar uma função para delegar permissões a um AWS serviço no Guia do usuário do IAM.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "DynamoDBAccess", "Effect": "Allow", "Action": [ "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:UpdateItem", "dynamodb:DeleteItem", "dynamodb:Query", "dynamodb:Scan" ], "Resource": "arn:aws:dynamodb:
your-region
:your-account-id
:table/EventsStateTable" }, { "Sid": "SchedulerAccess", "Effect": "Allow", "Action": [ "scheduler:CreateSchedule", "scheduler:DeleteSchedule" ], "Resource": "arn:aws:scheduler:your-region
:your-account-id
:schedule/*" }, { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "arn:aws:kinesis:your-region
:your-account-id
:stream/*" }, { "Sid": "IoTPublishAccess", "Effect": "Allow", "Action": "iot:Publish", "Resource": "arn:aws:iot:your-region
:your-account-id
:topic/*" }, { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:your-region
:your-account-id
:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs::your-account-id
:log-group:/aws/lambda/your-lambda
:*" ] } ] } -
Adicione a seguinte política de confiança da função do IAM. Uma política de confiança permite que os AWS serviços especificados assumam a função do IAM para que possam realizar as ações necessárias. Para obter instruções mais detalhadas sobre como criar uma política de confiança do IAM, consulte Criar uma função usando políticas de confiança personalizadas no Guia do usuário do IAM.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "scheduler.amazonaws.com", "lambda.amazonaws.com", "iot.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }
Etapa 3: Criar Amazon Kinesis Data Streams
Crie Amazon Kinesis Data Streams AWS Management Console usando o ou. AWS CLI
Etapa 4: criar ou atualizar a regra de roteamento de mensagens do MQTT
Você pode criar uma nova regra de roteamento de mensagens MQTT ou atualizar uma regra existente.
Etapa 5: Obtenha o endpoint para o tópico MQTT de destino
Use o tópico MQTT de destino para configurar onde seus tópicos publicam mensagens de saída, substituindo a funcionalidade anteriormente tratada pelo. AWS IoT Events O endpoint é exclusivo para sua AWS conta e região.
Etapa 6: criar uma tabela do Amazon DynamoDB
Uma tabela do Amazon DynamoDB substitui a funcionalidade de gerenciamento de estado AWS IoT Events do, fornecendo uma forma escalável e flexível de persistir e gerenciar o estado de seus dispositivos e a lógica do modelo de detector em sua nova arquitetura de solução.
Etapa 7: criar uma AWS Lambda função (console)
A função Lambda serve como o principal mecanismo de processamento, substituindo a lógica de avaliação do modelo de detector do. AWS IoT Events No exemplo, nos integramos com outros AWS serviços para lidar com dados recebidos, gerenciar o estado e acionar ações com base nas regras definidas por você.
Crie uma função Lambda com NodeJS tempo de execução. Use o seguinte trecho de código, substituindo as constantes codificadas:
-
Abra a AWS Lambda console
. -
Escolha a opção Criar função.
-
Insira um nome para o nome da função.
-
Selecione NodeJS 22.x como Runtime.
-
No menu suspenso Alterar função de execução padrão, escolha Usar função existente e selecione a função do IAM que você criou nas etapas anteriores.
-
Escolha a opção Criar função.
-
Cole o seguinte trecho de código depois de substituir as constantes codificadas.
-
Depois que sua função for criada, na guia Código, cole o exemplo de código a seguir, substituindo o
your-destination-endpoint
endpoint pelo seu.
import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb'; import { PutItemCommand } from '@aws-sdk/client-dynamodb'; import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane"; import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import //// External Clients and Constants const scheduler = new SchedulerClient({}); const iot = new IoTDataPlaneClient({ endpoint: 'https://
your-destination-endpoint
-ats.iot.your-region
.amazonaws.com/' }); const ddb = new DynamoDBClient({}); //// Lambda Handler function export const handler = async (event) => { console.log('Incoming event:', JSON.stringify(event, null, 2)); if (!event.Records) { throw new Error('No records found in event'); } const processedRecords = []; for (const record of event.Records) { try { if (record.eventSource !== 'aws:kinesis') { console.log(`Skipping non-Kinesis record from ${record.eventSource}`); continue; } // Assumes that we are processing records from Kinesis const payload = record.kinesis.data; const decodedData = Buffer.from(payload, 'base64').toString(); console.log("decoded payload is ", decodedData); const output = await handleDecodedData(decodedData); // Add additional processing logic here const processedData = { output, sequenceNumber: record.kinesis.sequenceNumber, partitionKey: record.kinesis.partitionKey, timestamp: record.kinesis.approximateArrivalTimestamp }; processedRecords.push(processedData); } catch (error) { console.error('Error processing record:', error); console.error('Failed record:', record); // Decide whether to throw error or continue processing other records // throw error; // Uncomment to stop processing on first error } } return { statusCode: 200, body: JSON.stringify({ message: 'Processing complete', processedCount: processedRecords.length, records: processedRecords }) }; }; // Helper function to handle decoded data async function handleDecodedData(payload) { try { // Parse the decoded data const parsedData = JSON.parse(payload); // Extract instanceId const instanceId = parsedData.instanceId; // Parse the input field const inputData = JSON.parse(parsedData.payload); const temperature = inputData.temperature; console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature); await iotEvents.process(instanceId, inputData) return { instanceId, temperature, // Add any other fields you want to return rawInput: inputData }; } catch (error) { console.error('Error handling decoded data:', error); throw error; } } //// Classes for declaring/defining the state machine class CurrentState { constructor(instanceId, stateName, variables, inputs) { this.stateName = stateName; this.variables = variables; this.inputs = inputs; this.instanceId = instanceId } static async load(instanceId) { console.log(`Loading state for id ${instanceId}`); try { const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({ TableName: 'EventsStateTable', Key: { 'InstanceId': { S: `${instanceId}` } } })); const { stateName, variables, inputs } = JSON.parse(stateContent); return new CurrentState(instanceId, stateName, variables, inputs); } catch (e) { console.log(`No state for id ${instanceId}: ${e}`); return undefined; } } static async save(instanceId, state) { console.log(`Saving state for id ${instanceId}`); await ddb.send(new PutItemCommand({ TableName: 'your-events-state-table-name
', Item: { 'InstanceId': { S: `${instanceId}` }, 'state': { S: state } } })); } setVariable(name, value) { this.variables[name] = value; } changeState(stateName) { console.log(`Changing state from ${this.stateName} to ${stateName}`); this.stateName = stateName; } async setTimer(instanceId, frequencyInMinutes, payload) { console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`); const base64Payload = Buffer.from(JSON.stringify(payload)).toString(); console.log(base64Payload); const scheduleName = `your-schedule-name
-${instanceId}-schedule`; const scheduleParams = { Name: scheduleName, FlexibleTimeWindow: { Mode: 'OFF' }, ScheduleExpression: `rate(${frequencyInMinutes} minutes)`, Target: { Arn: "arn:aws::kinesis:your-region
:your-account-id
:stream/your-kinesis-stream-name
", RoleArn: "arn:aws::iam::your-account-id
:role/service-role/your-iam-role
", Input: base64Payload, KinesisParameters: { PartitionKey: instanceId, }, RetryPolicy: { MaximumRetryAttempts: 3 } }, }; const command = new CreateScheduleCommand(scheduleParams); console.log(`Sending command to set timer ${JSON.stringify(command)}`); await scheduler.send(command); } async clearTimer(instanceId) { console.log(`Cleaning timer ${instanceId}`); const scheduleName = `your-schedule-name
-${instanceId}-schdule`; const command = new DeleteScheduleCommand({ Name: scheduleName }); await scheduler.send(command); } async executeAction(actionType, actionPayload) { console.log(`Will execute the ${actionType} with payload ${actionPayload}`); await iot.send(new PublishCommand({ topic: `${this.instanceId}`, payload: actionPayload, qos: 0 })); } setInput(value) { this.inputs = { ...this.inputs, ...value }; } input(name) { return this.inputs[name]; } } class IoTEvents { constructor(initialState) { this.initialState = initialState; this.states = {}; } state(name) { const state = new IoTEventsState(); this.states[name] = state; return state; } async process(instanceId, input) { let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {}); currentState.setInput(input); console.log(`With inputs as: ${JSON.stringify(currentState)}`); const state = this.states[currentState.stateName]; currentState = await state.evaluate(currentState); console.log(`With output as: ${JSON.stringify(currentState)}`); await CurrentState.save(instanceId, JSON.stringify(currentState)); } } class Event { constructor(condition, action) { this.condition = condition; this.action = action; } } class IoTEventsState { constructor() { this.eventsList = [] } events(eventListArg) { this.eventsList.push(...eventListArg); return this; } async evaluate(currentState) { for (const e of this.eventsList) { console.log(`Evaluating event ${e.condition}`); if (e.condition(currentState)) { console.log(`Event condition met`); // Execute any action as defined in iotEvents DM Definition await e.action(currentState); } } return currentState; } } ////// DetectorModel Definitions - replace with your own defintionslet processAlarmStateEvent = new Event( (currentState) => { const source = currentState.input('source'); return ( currentState.input('temperature') < 70 ); }, async (currentState) => { currentState.changeState('normal'); await currentState.clearTimer(currentState.instanceId) await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`); } ); let processTimerEvent = new Event( (currentState) => { const source = currentState.input('source'); console.log(`Evaluating timer event with source ${source}`); const booleanOutput = (source !== undefined && source !== null && typeof source === 'string' && source.toLowerCase() === 'timer' && // check if the currentState == state from the timer payload currentState.input('currentState') !== undefined && currentState.input('currentState') !== null && currentState.input('currentState').toLowerCase !== 'normal'); console.log(`Timer event evaluated as ${booleanOutput}`); return booleanOutput; }, async (currentState) => { await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`); } ); let processNormalEvent = new Event( (currentState) => currentState.input('temperature') > 70, async (currentState) => { currentState.changeState('alarm'); await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`); await currentState.setTimer(currentState.instanceId, 5, { "instanceId": currentState.instanceId, "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}" }); } );
const iotEvents = new IoTEvents('normal'); iotEvents .state('normal') .events( [ processNormalEvent ]); iotEvents .state('alarm') .events([ processAlarmStateEvent, processTimerEvent ] );
Etapa 8: Adicionar um gatilho do Amazon Kinesis Data Streams
Adicione um gatilho do Kinesis Data Streams à função Lambda usando o ou. AWS Management Console AWS CLI
Adicionar um gatilho do Kinesis Data Streams à sua função Lambda estabelece a conexão entre seu pipeline de ingestão de dados e sua lógica de processamento, permitindo que ele avalie automaticamente os fluxos de dados de IoT recebidos e reaja aos eventos em tempo real, da mesma forma que os processos entram. AWS IoT Events
Etapa 9: testar a funcionalidade de ingestão e saída de dados ()AWS CLI
Publique uma carga no tópico do MQTT com base no que você definiu em seu modelo de detector. Veja a seguir um exemplo de carga útil para o tópico MQTT your-topic-name
para testar uma implementação.
{ "instanceId": "your-instance-id", "payload": "{\"temperature\":78}" }
Você deve ver uma mensagem MQTT publicada em um tópico com o seguinte conteúdo (ou similar):
{ "state": "alarm detected, timer started" }