Procedimento de migração para modelos de detectores em AWS IoT Events - AWS IoT Events

Aviso de fim do suporte: em 20 de maio de 2026, AWS encerrará o suporte para AWS IoT Events. Depois de 20 de maio de 2026, você não poderá mais acessar o AWS IoT Events console ou os AWS IoT Events recursos. Para obter mais informações, consulte AWS IoT Events Fim do suporte.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Procedimento de migração para modelos de detectores em AWS IoT Events

Esta seção descreve soluções alternativas que oferecem funcionalidade semelhante ao modelo de detector à medida que você migra para fora do AWS IoT Events.

Você pode migrar a ingestão de dados por meio de AWS IoT Core regras para uma combinação de outros AWS serviços. Em vez da ingestão de dados por meio da BatchPutMessageAPI, os dados podem ser roteados para o tópico do AWS IoT Core MQTT.

Essa abordagem de migração utiliza os tópicos do AWS IoT Core MQTT como ponto de entrada para seus dados de IoT, substituindo a entrada direta para. AWS IoT Events Os tópicos do MQTT são escolhidos por vários motivos principais. Eles oferecem ampla compatibilidade com dispositivos de IoT devido ao amplo uso do MQTT no setor. Esses tópicos podem lidar com grandes volumes de mensagens de vários dispositivos, garantindo escalabilidade. Eles também oferecem flexibilidade no roteamento e na filtragem de mensagens com base no conteúdo ou no tipo de dispositivo. Além disso, os tópicos do AWS IoT Core MQTT se integram perfeitamente a outros AWS serviços, facilitando o processo de migração.

Os dados fluem dos tópicos do MQTT para uma arquitetura que combina o Amazon Kinesis Data Streams, AWS Lambda uma função, uma tabela do Amazon DynamoDB e agendas da Amazon. EventBridge Essa combinação de serviços replica e aprimora a funcionalidade fornecida anteriormente AWS IoT Events, oferecendo mais flexibilidade e controle sobre seu pipeline de processamento de dados de IoT.

Comparando arquiteturas

A AWS IoT Events arquitetura atual ingere dados por meio de uma AWS IoT Core regra e da BatchPutMessage API. Essa arquitetura é usada AWS IoT Core para ingestão de dados e publicação de eventos, com mensagens roteadas por meio de AWS IoT Events entradas para modelos de detectores que definem a lógica do estado. Uma função do IAM gerencia as permissões necessárias.

A nova solução é mantida AWS IoT Core para ingestão de dados (agora com tópicos dedicados de entrada e saída do MQTT). Ele apresenta o Kinesis Data Streams para particionamento de dados e uma função Lambda avaliadora para lógica de estado. Os estados do dispositivo agora são armazenados em uma tabela do DynamoDB, e uma função aprimorada do IAM gerencia as permissões nesses serviços.

Finalidade Solução Diferenças

Ingestão de dados — recebe dados de dispositivos de IoT

AWS IoT Core

Agora requer dois tópicos distintos do MQTT: um para ingerir dados do dispositivo e outro para publicar eventos de saída

Direção da mensagem — encaminha as mensagens recebidas para os serviços apropriados

AWS IoT Core regra de roteamento de mensagens

Mantém a mesma funcionalidade de roteamento, mas agora direciona as mensagens para o Kinesis Data Streams em vez de AWS IoT Events

Processamento de dados — manipula e organiza fluxos de dados recebidos

Kinesis Data Streams

Substitui a funcionalidade AWS IoT Events de entrada, fornecendo ingestão de dados com particionamento de ID de dispositivo para processamento de mensagens

Avaliação lógica — Processa mudanças de estado e aciona ações

Avaliador Lambda

Substitui o modelo do AWS IoT Events detector, fornecendo avaliação de lógica de estado personalizável por meio de código em vez de fluxo de trabalho visual

Gerenciamento de estado — mantém os estados do dispositivo

Tabela do DynamoDB

Novo componente que fornece armazenamento persistente dos estados do dispositivo, substituindo o gerenciamento interno AWS IoT Events do estado

Segurança — gerencia as permissões do serviço

Perfil do IAM

As permissões atualizadas agora incluem acesso ao Kinesis Data Streams, ao DynamoDB e às permissões EventBridge existentes AWS IoT Core

Etapa 1: (opcional) exportar configurações do modelo do AWS IoT Events detector

Antes de criar novos recursos, exporte as definições do modelo do AWS IoT Events detector. Eles contêm sua lógica de processamento de eventos e podem servir como referência histórica para implementar sua nova solução.

Console

Usando o AWS IoT Events AWS Management Console, execute as seguintes etapas para exportar as configurações do modelo do detector:

Para exportar modelos de detectores usando o AWS Management Console
  1. Faça login no console do AWS IoT Events.

  2. No painel de navegação à esquerda, escolha Detector models (Modelos de detector).

  3. Selecione o modelo do detector a ser exportado.

  4. Escolha Exportar. Leia a mensagem informativa sobre a saída e escolha Exportar novamente.

  5. Repita o processo para cada modelo de detector que você deseja exportar.

Um arquivo contendo uma saída JSON do seu modelo de detector é adicionado à pasta de download do seu navegador. Opcionalmente, você pode salvar cada configuração do modelo de detector para preservar os dados históricos.

AWS CLI

Usando o AWS CLI, execute os seguintes comandos para exportar as configurações do modelo do detector:

Para exportar modelos de detectores usando AWS CLI
  1. Liste todos os modelos de detectores em sua conta:

    aws iotevents list-detector-models
  2. Para cada modelo de detector, exporte sua configuração executando:

    aws iotevents describe-detector-model \ --detector-model-name your-detector-model-name
  3. Salve a saída para cada modelo de detector.

Etapa 2: Criar uma função do IAM

Crie uma função do IAM para fornecer permissões para replicar a funcionalidade do AWS IoT Events. A função neste exemplo concede acesso ao DynamoDB para gerenciamento de estado, agendamento EventBridge, Kinesis Data Streams para AWS IoT Core ingestão de dados, publicação de mensagens e registro em log. CloudWatch Juntos, esses serviços funcionarão como substitutos AWS IoT Events.

  1. Criar uma função do IAM com as permissões a seguir. Para obter instruções mais detalhadas sobre como criar uma função do IAM, consulte Criar uma função para delegar permissões a um AWS serviço no Guia do usuário do IAM.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "DynamoDBAccess", "Effect": "Allow", "Action": [ "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:UpdateItem", "dynamodb:DeleteItem", "dynamodb:Query", "dynamodb:Scan" ], "Resource": "arn:aws:dynamodb:your-region:your-account-id:table/EventsStateTable" }, { "Sid": "SchedulerAccess", "Effect": "Allow", "Action": [ "scheduler:CreateSchedule", "scheduler:DeleteSchedule" ], "Resource": "arn:aws:scheduler:your-region:your-account-id:schedule/*" }, { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "arn:aws:kinesis:your-region:your-account-id:stream/*" }, { "Sid": "IoTPublishAccess", "Effect": "Allow", "Action": "iot:Publish", "Resource": "arn:aws:iot:your-region:your-account-id:topic/*" }, { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:your-region:your-account-id:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs::your-account-id:log-group:/aws/lambda/your-lambda:*" ] } ] }
  2. Adicione a seguinte política de confiança da função do IAM. Uma política de confiança permite que os AWS serviços especificados assumam a função do IAM para que possam realizar as ações necessárias. Para obter instruções mais detalhadas sobre como criar uma política de confiança do IAM, consulte Criar uma função usando políticas de confiança personalizadas no Guia do usuário do IAM.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "scheduler.amazonaws.com", "lambda.amazonaws.com", "iot.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }

Etapa 3: Criar Amazon Kinesis Data Streams

Crie Amazon Kinesis Data Streams AWS Management Console usando o ou. AWS CLI

Console

Para criar um stream de dados do Kinesis usando o AWS Management Console, siga o procedimento encontrado na página Criar um stream de dados no Guia do desenvolvedor do Amazon Kinesis Data Streams.

Ajuste a contagem de fragmentos com base na contagem de dispositivos e no tamanho da carga útil da mensagem.

AWS CLI

Usando AWS CLI, crie o Amazon Kinesis Data Streams para ingerir e particionar os dados de seus dispositivos.

Os Kinesis Data Streams são usados nessa migração para substituir a funcionalidade de ingestão de dados do. AWS IoT Events Ele fornece uma maneira escalável e eficiente de coletar, processar e analisar dados de streaming em tempo real de seus dispositivos de IoT, ao mesmo tempo em que fornece tratamento flexível de dados e integração com AWS outros serviços.

aws kinesis create-stream --stream-name your-kinesis-stream-name --shard-count 4 --region your-region

Ajuste a contagem de fragmentos com base na contagem de dispositivos e no tamanho da carga útil da mensagem.

Etapa 4: criar ou atualizar a regra de roteamento de mensagens do MQTT

Você pode criar uma nova regra de roteamento de mensagens MQTT ou atualizar uma regra existente.

Console
  1. Determine se você precisa de uma nova regra de roteamento de mensagens do MQTT ou se você pode atualizar uma regra existente.

  2. Abra o console de AWS IoT Core.

  3. No painel de navegação, escolha Encaminhamento de mensagens e, em seguida, escolha Regras.

  4. Na seção Gerenciar, escolha Roteamento de mensagens e, em seguida, Regras.

  5. Escolha Criar regra.

  6. Na página Especificar propriedades da regra, insira o nome da AWS IoT Core regra em Nome da regra. Em Descrição da regra, opcional, insira uma descrição para identificar que você está processando eventos e encaminhando-os para o Kinesis Data Streams.

  7. Na página Configurar instrução SQL, insira o seguinte para a instrução SQL: SELECT * FROM 'your-database' e escolha Avançar.

  8. Na página Anexar ações de regras e em Ações de regras, escolha kinesis.

  9. Escolha seu stream do Kinesis para o stream. Para a chave de partição, insira your-instance-id. Selecione a função apropriada para a função do IAM e, em seguida, escolha Adicionar ação de regra.

Para obter mais informações, consulte Criação de regras de AWS IoT para rotear dados do dispositivo para outros serviços.

AWS CLI
  1. Crie um arquivo JSON com o conteúdo apresentado a seguir. Esse arquivo de configuração JSON define uma AWS IoT Core regra que seleciona todas as mensagens de um tópico e as encaminha para o stream especificado do Kinesis, usando o ID da instância como chave de partição.

    { "sql": "SELECT * FROM 'your-config-file'", "description": "Rule to process events and forward to Kinesis Data Streams", "actions": [ { "kinesis": { "streamName": "your-kinesis-stream-name", "roleArn": "arn:aws:iam::your-account-id:role/service-role/your-iam-role", "partitionKey": "${your-instance-id}" } } ], "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23" }
  2. Crie a regra de tópico do MQTT usando o. AWS CLI Essa etapa usa o AWS CLI para criar uma regra de AWS IoT Core tópico usando a configuração definida no events_rule.json arquivo.

    aws iot create-topic-rule \ --rule-name "your-iot-core-rule" \ --topic-rule-payload file://your-file-name.json

Etapa 5: Obtenha o endpoint para o tópico MQTT de destino

Use o tópico MQTT de destino para configurar onde seus tópicos publicam mensagens de saída, substituindo a funcionalidade anteriormente tratada pelo. AWS IoT Events O endpoint é exclusivo para sua AWS conta e região.

Console
  1. Abra o console de AWS IoT Core.

  2. Na seção Connect no painel de navegação esquerdo, escolha Configuração do domínio.

  3. Escolha a configuração do domínio IoT:Data-ATS para abrir a página de detalhes da configuração.

  4. Copie o valor do nome de domínio. Esse valor é o endpoint. Salve o valor do endpoint porque você precisará dele em etapas posteriores.

AWS CLI

Execute o comando a seguir para obter o AWS IoT Core endpoint para publicar mensagens de saída para sua conta.

aws iot describe-endpoint --endpoint-type iot:Data-ATS --region your-region

Etapa 6: criar uma tabela do Amazon DynamoDB

Uma tabela do Amazon DynamoDB substitui a funcionalidade de gerenciamento de estado AWS IoT Events do, fornecendo uma forma escalável e flexível de persistir e gerenciar o estado de seus dispositivos e a lógica do modelo de detector em sua nova arquitetura de solução.

Console

Crie uma tabela do Amazon DynamoDB para manter o estado dos modelos de detectores. Para obter mais informações, consulte Criar uma tabela no DynamoDB no Amazon DynamoDB Developer Guide.

Use o seguinte para obter os detalhes da tabela:

  • Em Nome da tabela, insira um nome de tabela de sua escolha.

  • Em Partition Key, insira seu próprio ID de instância.

  • Você pode usar as configurações padrão para as configurações da tabela

AWS CLI

Execute o comando a seguir para criar uma tabela do DynamoDB.

aws dynamodb create-table \ --table-name your-table-name \ --attribute-definitions AttributeName=your-instance-id,AttributeType=S \ --key-schema AttributeName=your-instance-id,KeyType=HASH \

Etapa 7: criar uma AWS Lambda função (console)

A função Lambda serve como o principal mecanismo de processamento, substituindo a lógica de avaliação do modelo de detector do. AWS IoT Events No exemplo, nos integramos com outros AWS serviços para lidar com dados recebidos, gerenciar o estado e acionar ações com base nas regras definidas por você.

Crie uma função Lambda com NodeJS tempo de execução. Use o seguinte trecho de código, substituindo as constantes codificadas:

  1. Abra a AWS Lambda console.

  2. Escolha a opção Criar função.

  3. Insira um nome para o nome da função.

  4. Selecione NodeJS 22.x como Runtime.

  5. No menu suspenso Alterar função de execução padrão, escolha Usar função existente e selecione a função do IAM que você criou nas etapas anteriores.

  6. Escolha a opção Criar função.

  7. Cole o seguinte trecho de código depois de substituir as constantes codificadas.

  8. Depois que sua função for criada, na guia Código, cole o exemplo de código a seguir, substituindo o your-destination-endpoint endpoint pelo seu.

import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb'; import { PutItemCommand } from '@aws-sdk/client-dynamodb'; import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane"; import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import //// External Clients and Constants const scheduler = new SchedulerClient({}); const iot = new IoTDataPlaneClient({ endpoint: 'https://your-destination-endpoint-ats.iot.your-region.amazonaws.com/' }); const ddb = new DynamoDBClient({}); //// Lambda Handler function export const handler = async (event) => { console.log('Incoming event:', JSON.stringify(event, null, 2)); if (!event.Records) { throw new Error('No records found in event'); } const processedRecords = []; for (const record of event.Records) { try { if (record.eventSource !== 'aws:kinesis') { console.log(`Skipping non-Kinesis record from ${record.eventSource}`); continue; } // Assumes that we are processing records from Kinesis const payload = record.kinesis.data; const decodedData = Buffer.from(payload, 'base64').toString(); console.log("decoded payload is ", decodedData); const output = await handleDecodedData(decodedData); // Add additional processing logic here const processedData = { output, sequenceNumber: record.kinesis.sequenceNumber, partitionKey: record.kinesis.partitionKey, timestamp: record.kinesis.approximateArrivalTimestamp }; processedRecords.push(processedData); } catch (error) { console.error('Error processing record:', error); console.error('Failed record:', record); // Decide whether to throw error or continue processing other records // throw error; // Uncomment to stop processing on first error } } return { statusCode: 200, body: JSON.stringify({ message: 'Processing complete', processedCount: processedRecords.length, records: processedRecords }) }; }; // Helper function to handle decoded data async function handleDecodedData(payload) { try { // Parse the decoded data const parsedData = JSON.parse(payload); // Extract instanceId const instanceId = parsedData.instanceId; // Parse the input field const inputData = JSON.parse(parsedData.payload); const temperature = inputData.temperature; console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature); await iotEvents.process(instanceId, inputData) return { instanceId, temperature, // Add any other fields you want to return rawInput: inputData }; } catch (error) { console.error('Error handling decoded data:', error); throw error; } } //// Classes for declaring/defining the state machine class CurrentState { constructor(instanceId, stateName, variables, inputs) { this.stateName = stateName; this.variables = variables; this.inputs = inputs; this.instanceId = instanceId } static async load(instanceId) { console.log(`Loading state for id ${instanceId}`); try { const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({ TableName: 'EventsStateTable', Key: { 'InstanceId': { S: `${instanceId}` } } })); const { stateName, variables, inputs } = JSON.parse(stateContent); return new CurrentState(instanceId, stateName, variables, inputs); } catch (e) { console.log(`No state for id ${instanceId}: ${e}`); return undefined; } } static async save(instanceId, state) { console.log(`Saving state for id ${instanceId}`); await ddb.send(new PutItemCommand({ TableName: 'your-events-state-table-name', Item: { 'InstanceId': { S: `${instanceId}` }, 'state': { S: state } } })); } setVariable(name, value) { this.variables[name] = value; } changeState(stateName) { console.log(`Changing state from ${this.stateName} to ${stateName}`); this.stateName = stateName; } async setTimer(instanceId, frequencyInMinutes, payload) { console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`); const base64Payload = Buffer.from(JSON.stringify(payload)).toString(); console.log(base64Payload); const scheduleName = `your-schedule-name-${instanceId}-schedule`; const scheduleParams = { Name: scheduleName, FlexibleTimeWindow: { Mode: 'OFF' }, ScheduleExpression: `rate(${frequencyInMinutes} minutes)`, Target: { Arn: "arn:aws::kinesis:your-region:your-account-id:stream/your-kinesis-stream-name", RoleArn: "arn:aws::iam::your-account-id:role/service-role/your-iam-role", Input: base64Payload, KinesisParameters: { PartitionKey: instanceId, }, RetryPolicy: { MaximumRetryAttempts: 3 } }, }; const command = new CreateScheduleCommand(scheduleParams); console.log(`Sending command to set timer ${JSON.stringify(command)}`); await scheduler.send(command); } async clearTimer(instanceId) { console.log(`Cleaning timer ${instanceId}`); const scheduleName = `your-schedule-name-${instanceId}-schdule`; const command = new DeleteScheduleCommand({ Name: scheduleName }); await scheduler.send(command); } async executeAction(actionType, actionPayload) { console.log(`Will execute the ${actionType} with payload ${actionPayload}`); await iot.send(new PublishCommand({ topic: `${this.instanceId}`, payload: actionPayload, qos: 0 })); } setInput(value) { this.inputs = { ...this.inputs, ...value }; } input(name) { return this.inputs[name]; } } class IoTEvents { constructor(initialState) { this.initialState = initialState; this.states = {}; } state(name) { const state = new IoTEventsState(); this.states[name] = state; return state; } async process(instanceId, input) { let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {}); currentState.setInput(input); console.log(`With inputs as: ${JSON.stringify(currentState)}`); const state = this.states[currentState.stateName]; currentState = await state.evaluate(currentState); console.log(`With output as: ${JSON.stringify(currentState)}`); await CurrentState.save(instanceId, JSON.stringify(currentState)); } } class Event { constructor(condition, action) { this.condition = condition; this.action = action; } } class IoTEventsState { constructor() { this.eventsList = [] } events(eventListArg) { this.eventsList.push(...eventListArg); return this; } async evaluate(currentState) { for (const e of this.eventsList) { console.log(`Evaluating event ${e.condition}`); if (e.condition(currentState)) { console.log(`Event condition met`); // Execute any action as defined in iotEvents DM Definition await e.action(currentState); } } return currentState; } } ////// DetectorModel Definitions - replace with your own defintions let processAlarmStateEvent = new Event( (currentState) => { const source = currentState.input('source'); return ( currentState.input('temperature') < 70 ); }, async (currentState) => { currentState.changeState('normal'); await currentState.clearTimer(currentState.instanceId) await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`); } ); let processTimerEvent = new Event( (currentState) => { const source = currentState.input('source'); console.log(`Evaluating timer event with source ${source}`); const booleanOutput = (source !== undefined && source !== null && typeof source === 'string' && source.toLowerCase() === 'timer' && // check if the currentState == state from the timer payload currentState.input('currentState') !== undefined && currentState.input('currentState') !== null && currentState.input('currentState').toLowerCase !== 'normal'); console.log(`Timer event evaluated as ${booleanOutput}`); return booleanOutput; }, async (currentState) => { await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`); } ); let processNormalEvent = new Event( (currentState) => currentState.input('temperature') > 70, async (currentState) => { currentState.changeState('alarm'); await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`); await currentState.setTimer(currentState.instanceId, 5, { "instanceId": currentState.instanceId, "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}" }); } ); const iotEvents = new IoTEvents('normal'); iotEvents .state('normal') .events( [ processNormalEvent ]); iotEvents .state('alarm') .events([ processAlarmStateEvent, processTimerEvent ] );

Etapa 8: Adicionar um gatilho do Amazon Kinesis Data Streams

Adicione um gatilho do Kinesis Data Streams à função Lambda usando o ou. AWS Management Console AWS CLI

Adicionar um gatilho do Kinesis Data Streams à sua função Lambda estabelece a conexão entre seu pipeline de ingestão de dados e sua lógica de processamento, permitindo que ele avalie automaticamente os fluxos de dados de IoT recebidos e reaja aos eventos em tempo real, da mesma forma que os processos entram. AWS IoT Events

Console

Para obter mais informações, consulte Criar um mapeamento de origem de eventos para invocar uma função Lambda no Guia AWS Lambda do desenvolvedor.

Use o seguinte para obter os detalhes do mapeamento da origem do evento:

AWS CLI

Execute o comando a seguir para criar o gatilho da função Lambda.

aws lambda create-event-source-mapping \ --function-name your-lambda-name \ --event-source arn:aws:kinesis:your-region:your-account-id:stream/your-kinesis-stream-name \ --batch-size 10 \ --starting-position LATEST \ --region your-region

Etapa 9: testar a funcionalidade de ingestão e saída de dados ()AWS CLI

Publique uma carga no tópico do MQTT com base no que você definiu em seu modelo de detector. Veja a seguir um exemplo de carga útil para o tópico MQTT your-topic-name para testar uma implementação.

{ "instanceId": "your-instance-id", "payload": "{\"temperature\":78}" }

Você deve ver uma mensagem MQTT publicada em um tópico com o seguinte conteúdo (ou similar):

{ "state": "alarm detected, timer started" }