Avis de fin de support : le 20 mai 2026, AWS le support de AWS IoT Events. Après le 20 mai 2026, vous ne pourrez plus accéder à la AWS IoT Events console ni aux AWS IoT Events ressources. Pour plus d'informations, consultez AWS IoT Events la fin du support.
Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Procédure de migration pour les modèles de détecteurs dans AWS IoT Events
Cette section décrit les solutions alternatives qui fournissent des fonctionnalités de modèle de détecteur similaires à celles que vous utilisez lors de la migration AWS IoT Events.
Vous pouvez migrer l'ingestion de données via des AWS IoT Core règles vers une combinaison d'autres AWS services. Au lieu d'être ingérées via l'BatchPutMessageAPI, les données peuvent être acheminées vers le sujet AWS IoT Core MQTT.
Cette approche de migration utilise les sujets AWS IoT Core MQTT comme point d'entrée pour vos données IoT, en remplacement de la saisie directe dans. AWS IoT Events Les sujets MQTT sont choisis pour plusieurs raisons principales. Ils offrent une large compatibilité avec les appareils IoT en raison de l'utilisation généralisée du MQTT dans l'industrie. Ces rubriques peuvent traiter de gros volumes de messages provenant de nombreux appareils, ce qui garantit l'évolutivité. Ils offrent également une flexibilité dans le routage et le filtrage des messages en fonction du contenu ou du type d'appareil. De plus, les rubriques AWS IoT Core MQTT s'intègrent parfaitement aux autres AWS services, ce qui facilite le processus de migration.
Les données issues de sujets MQTT circulent vers une architecture combinant Amazon Kinesis Data Streams, AWS Lambda une fonction, une table Amazon DynamoDB et des plannings Amazon. EventBridge Cette combinaison de services reproduit et améliore les fonctionnalités précédemment fournies par AWS IoT Events, vous offrant ainsi plus de flexibilité et de contrôle sur votre pipeline de traitement des données IoT.
Comparaison des architectures
L' AWS IoT Events architecture actuelle ingère les données par le biais d'une AWS IoT Core règle et de l'BatchPutMessage
API. Cette architecture est utilisée AWS IoT Core pour l'ingestion de données et la publication d'événements, les messages étant acheminés via des AWS IoT Events entrées vers des modèles de détecteurs qui définissent la logique de l'état. Un rôle IAM gère les autorisations nécessaires.
La nouvelle solution prend en charge AWS IoT Core l'ingestion de données (désormais avec des rubriques MQTT dédiées aux entrées et aux sorties). Il introduit Kinesis Data Streams pour le partitionnement des données et une fonction Lambda d'évaluation pour la logique des états. Les états des appareils sont désormais stockés dans une table DynamoDB, et un rôle IAM amélioré gère les autorisations sur ces services.
Objectif | Solution | Différences |
---|---|---|
Ingestion de données — Reçoit les données des appareils IoT |
AWS IoT Core |
Nécessite désormais deux rubriques MQTT distinctes : l'une pour l'ingestion des données du périphérique et l'autre pour la publication des événements de sortie |
Orientation des messages — Achemine les messages entrants vers les services appropriés |
AWS IoT Core règle de routage des messages |
Conserve les mêmes fonctionnalités de routage, mais dirige désormais les messages vers Kinesis Data Streams au lieu de AWS IoT Events |
Traitement des données — Gère et organise les flux de données entrants |
Kinesis Data Streams |
Remplace la fonctionnalité de AWS IoT Events saisie, permettant l'ingestion des données avec le partitionnement des identifiants des appareils pour le traitement des messages |
Évaluation logique — Traite les changements d'état et déclenche des actions |
Évaluateur Lambda |
Remplace le modèle AWS IoT Events de détecteur, fournissant une évaluation de la logique d'état personnalisable par le biais du code au lieu d'un flux de travail visuel |
Gestion des états — Maintient l'état des appareils |
Tableau DynamoDB |
Nouveau composant qui fournit un stockage permanent des états des appareils, remplaçant ainsi la gestion interne des AWS IoT Events états |
Sécurité — Gère les autorisations de service |
Rôle IAM |
Les autorisations mises à jour incluent désormais l'accès à Kinesis Data Streams, à DynamoDB, EventBridge ainsi qu'aux autorisations existantes AWS IoT Core |
Étape 1 : Exporter les configurations des modèles de AWS IoT Events détecteurs (facultatif)
Avant de créer de nouvelles ressources, exportez les définitions AWS IoT Events de vos modèles de détecteurs. Ils contiennent votre logique de traitement des événements et peuvent servir de référence historique pour la mise en œuvre de votre nouvelle solution.
Étape 2 : Créer un rôle IAM
Créez un rôle IAM pour fournir des autorisations permettant de répliquer les fonctionnalités de. AWS IoT Events Dans cet exemple, le rôle donne accès à DynamoDB pour la gestion des états, pour la planification EventBridge, à Kinesis Data Streams pour l' AWS IoT Core ingestion de données, pour la publication de messages et pour la journalisation. CloudWatch Ensemble, ces services remplaceront AWS IoT Events.
-
Créez un rôle IAM avec les autorisations suivantes. Pour des instructions plus détaillées sur la création d'un rôle IAM, voir Créer un rôle pour déléguer des autorisations à un AWS service dans le Guide de l'utilisateur IAM.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "DynamoDBAccess", "Effect": "Allow", "Action": [ "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:UpdateItem", "dynamodb:DeleteItem", "dynamodb:Query", "dynamodb:Scan" ], "Resource": "arn:aws:dynamodb:
your-region
:your-account-id
:table/EventsStateTable" }, { "Sid": "SchedulerAccess", "Effect": "Allow", "Action": [ "scheduler:CreateSchedule", "scheduler:DeleteSchedule" ], "Resource": "arn:aws:scheduler:your-region
:your-account-id
:schedule/*" }, { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "arn:aws:kinesis:your-region
:your-account-id
:stream/*" }, { "Sid": "IoTPublishAccess", "Effect": "Allow", "Action": "iot:Publish", "Resource": "arn:aws:iot:your-region
:your-account-id
:topic/*" }, { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:your-region
:your-account-id
:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs::your-account-id
:log-group:/aws/lambda/your-lambda
:*" ] } ] } -
Ajoutez la politique de confiance des rôles IAM suivante. Une politique de confiance permet aux AWS services spécifiés d'assumer le rôle IAM afin qu'ils puissent effectuer les actions nécessaires. Pour des instructions plus détaillées sur la création d'une politique de confiance IAM, voir Création d'un rôle à l'aide de politiques de confiance personnalisées dans le Guide de l'utilisateur IAM.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "scheduler.amazonaws.com", "lambda.amazonaws.com", "iot.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }
Étape 3 : créer Amazon Kinesis Data Streams
Créez Amazon Kinesis Data Streams à l'aide AWS Management Console du AWS CLI ou.
Étape 4 : Création ou mise à jour de la règle de routage des messages MQTT
Vous pouvez créer une nouvelle règle de routage des messages MQTT ou mettre à jour une règle existante.
Étape 5 : Obtenir le point de terminaison pour le sujet MQTT de destination
Utilisez la rubrique MQTT de destination pour configurer l'endroit où vos rubriques publient les messages sortants, en remplacement de la fonctionnalité précédemment gérée par AWS IoT Events. Le point de terminaison est propre à votre AWS compte et à votre région.
Étape 6 : Création d'une table Amazon DynamoDB
Une table Amazon DynamoDB remplace la fonctionnalité de gestion d'état AWS IoT Events de, fournissant un moyen évolutif et flexible de conserver et de gérer l'état de vos appareils ainsi que la logique du modèle de détecteur dans votre nouvelle architecture de solution.
Étape 7 : Création d'une AWS Lambda fonction (console)
La fonction Lambda sert de moteur de traitement principal, remplaçant la logique d'évaluation du modèle de détecteur de. AWS IoT Events Dans cet exemple, nous nous intégrons à d'autres AWS services pour gérer les données entrantes, gérer l'état et déclencher des actions en fonction des règles que vous avez définies.
Créez une fonction Lambda avec NodeJS runtime. Utilisez l'extrait de code suivant pour remplacer les constantes codées en dur :
-
Ouvrez la AWS Lambda console
. -
Choisissez Créer une fonction.
-
Entrez un nom pour le nom de la fonction.
-
Sélectionnez NodeJS 22.x comme environnement d'exécution.
-
Dans la liste déroulante Modifier le rôle d'exécution par défaut, choisissez Utiliser le rôle existant, puis sélectionnez le rôle IAM que vous avez créé lors des étapes précédentes.
-
Choisissez Créer une fonction.
-
Collez l'extrait de code suivant après avoir remplacé les constantes codées en dur.
-
Une fois votre fonction créée, sous l'onglet Code, collez l'exemple de code suivant, en remplaçant le
your-destination-endpoint
point de terminaison par le vôtre.
import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb'; import { PutItemCommand } from '@aws-sdk/client-dynamodb'; import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane"; import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import //// External Clients and Constants const scheduler = new SchedulerClient({}); const iot = new IoTDataPlaneClient({ endpoint: 'https://
your-destination-endpoint
-ats.iot.your-region
.amazonaws.com/' }); const ddb = new DynamoDBClient({}); //// Lambda Handler function export const handler = async (event) => { console.log('Incoming event:', JSON.stringify(event, null, 2)); if (!event.Records) { throw new Error('No records found in event'); } const processedRecords = []; for (const record of event.Records) { try { if (record.eventSource !== 'aws:kinesis') { console.log(`Skipping non-Kinesis record from ${record.eventSource}`); continue; } // Assumes that we are processing records from Kinesis const payload = record.kinesis.data; const decodedData = Buffer.from(payload, 'base64').toString(); console.log("decoded payload is ", decodedData); const output = await handleDecodedData(decodedData); // Add additional processing logic here const processedData = { output, sequenceNumber: record.kinesis.sequenceNumber, partitionKey: record.kinesis.partitionKey, timestamp: record.kinesis.approximateArrivalTimestamp }; processedRecords.push(processedData); } catch (error) { console.error('Error processing record:', error); console.error('Failed record:', record); // Decide whether to throw error or continue processing other records // throw error; // Uncomment to stop processing on first error } } return { statusCode: 200, body: JSON.stringify({ message: 'Processing complete', processedCount: processedRecords.length, records: processedRecords }) }; }; // Helper function to handle decoded data async function handleDecodedData(payload) { try { // Parse the decoded data const parsedData = JSON.parse(payload); // Extract instanceId const instanceId = parsedData.instanceId; // Parse the input field const inputData = JSON.parse(parsedData.payload); const temperature = inputData.temperature; console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature); await iotEvents.process(instanceId, inputData) return { instanceId, temperature, // Add any other fields you want to return rawInput: inputData }; } catch (error) { console.error('Error handling decoded data:', error); throw error; } } //// Classes for declaring/defining the state machine class CurrentState { constructor(instanceId, stateName, variables, inputs) { this.stateName = stateName; this.variables = variables; this.inputs = inputs; this.instanceId = instanceId } static async load(instanceId) { console.log(`Loading state for id ${instanceId}`); try { const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({ TableName: 'EventsStateTable', Key: { 'InstanceId': { S: `${instanceId}` } } })); const { stateName, variables, inputs } = JSON.parse(stateContent); return new CurrentState(instanceId, stateName, variables, inputs); } catch (e) { console.log(`No state for id ${instanceId}: ${e}`); return undefined; } } static async save(instanceId, state) { console.log(`Saving state for id ${instanceId}`); await ddb.send(new PutItemCommand({ TableName: 'your-events-state-table-name
', Item: { 'InstanceId': { S: `${instanceId}` }, 'state': { S: state } } })); } setVariable(name, value) { this.variables[name] = value; } changeState(stateName) { console.log(`Changing state from ${this.stateName} to ${stateName}`); this.stateName = stateName; } async setTimer(instanceId, frequencyInMinutes, payload) { console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`); const base64Payload = Buffer.from(JSON.stringify(payload)).toString(); console.log(base64Payload); const scheduleName = `your-schedule-name
-${instanceId}-schedule`; const scheduleParams = { Name: scheduleName, FlexibleTimeWindow: { Mode: 'OFF' }, ScheduleExpression: `rate(${frequencyInMinutes} minutes)`, Target: { Arn: "arn:aws::kinesis:your-region
:your-account-id
:stream/your-kinesis-stream-name
", RoleArn: "arn:aws::iam::your-account-id
:role/service-role/your-iam-role
", Input: base64Payload, KinesisParameters: { PartitionKey: instanceId, }, RetryPolicy: { MaximumRetryAttempts: 3 } }, }; const command = new CreateScheduleCommand(scheduleParams); console.log(`Sending command to set timer ${JSON.stringify(command)}`); await scheduler.send(command); } async clearTimer(instanceId) { console.log(`Cleaning timer ${instanceId}`); const scheduleName = `your-schedule-name
-${instanceId}-schdule`; const command = new DeleteScheduleCommand({ Name: scheduleName }); await scheduler.send(command); } async executeAction(actionType, actionPayload) { console.log(`Will execute the ${actionType} with payload ${actionPayload}`); await iot.send(new PublishCommand({ topic: `${this.instanceId}`, payload: actionPayload, qos: 0 })); } setInput(value) { this.inputs = { ...this.inputs, ...value }; } input(name) { return this.inputs[name]; } } class IoTEvents { constructor(initialState) { this.initialState = initialState; this.states = {}; } state(name) { const state = new IoTEventsState(); this.states[name] = state; return state; } async process(instanceId, input) { let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {}); currentState.setInput(input); console.log(`With inputs as: ${JSON.stringify(currentState)}`); const state = this.states[currentState.stateName]; currentState = await state.evaluate(currentState); console.log(`With output as: ${JSON.stringify(currentState)}`); await CurrentState.save(instanceId, JSON.stringify(currentState)); } } class Event { constructor(condition, action) { this.condition = condition; this.action = action; } } class IoTEventsState { constructor() { this.eventsList = [] } events(eventListArg) { this.eventsList.push(...eventListArg); return this; } async evaluate(currentState) { for (const e of this.eventsList) { console.log(`Evaluating event ${e.condition}`); if (e.condition(currentState)) { console.log(`Event condition met`); // Execute any action as defined in iotEvents DM Definition await e.action(currentState); } } return currentState; } } ////// DetectorModel Definitions - replace with your own defintionslet processAlarmStateEvent = new Event( (currentState) => { const source = currentState.input('source'); return ( currentState.input('temperature') < 70 ); }, async (currentState) => { currentState.changeState('normal'); await currentState.clearTimer(currentState.instanceId) await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`); } ); let processTimerEvent = new Event( (currentState) => { const source = currentState.input('source'); console.log(`Evaluating timer event with source ${source}`); const booleanOutput = (source !== undefined && source !== null && typeof source === 'string' && source.toLowerCase() === 'timer' && // check if the currentState == state from the timer payload currentState.input('currentState') !== undefined && currentState.input('currentState') !== null && currentState.input('currentState').toLowerCase !== 'normal'); console.log(`Timer event evaluated as ${booleanOutput}`); return booleanOutput; }, async (currentState) => { await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`); } ); let processNormalEvent = new Event( (currentState) => currentState.input('temperature') > 70, async (currentState) => { currentState.changeState('alarm'); await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`); await currentState.setTimer(currentState.instanceId, 5, { "instanceId": currentState.instanceId, "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}" }); } );
const iotEvents = new IoTEvents('normal'); iotEvents .state('normal') .events( [ processNormalEvent ]); iotEvents .state('alarm') .events([ processAlarmStateEvent, processTimerEvent ] );
Étape 8 : ajouter un déclencheur Amazon Kinesis Data Streams
Ajoutez un déclencheur Kinesis Data Streams à la fonction Lambda à l'aide du bouton ou. AWS Management Console AWS CLI
L'ajout d'un déclencheur Kinesis Data Streams à votre fonction Lambda établit le lien entre votre pipeline d'ingestion de données et votre logique de traitement, ce qui lui permet d'évaluer automatiquement les flux de données IoT entrants et de réagir aux événements en temps réel, de la même manière que le traitement des entrées. AWS IoT Events
Étape 9 : Tester les fonctionnalités d'ingestion et de sortie des données (AWS CLI)
Publiez une charge utile dans la rubrique MQTT en fonction de ce que vous avez défini dans votre modèle de détecteur. Voici un exemple de charge utile pour la rubrique MQTT your-topic-name
pour tester une implémentation.
{ "instanceId": "your-instance-id", "payload": "{\"temperature\":78}" }
Vous devriez voir un message MQTT publié sur un sujet avec le contenu suivant (ou similaire) :
{ "state": "alarm detected, timer started" }