Hinweis zum Ende des Supports: Am 20. Mai 2026 AWS endet der Support für AWS IoT Events. Nach dem 20. Mai 2026 können Sie nicht mehr auf die AWS IoT Events Konsole oder AWS IoT Events die Ressourcen zugreifen. Weitere Informationen finden Sie unter AWS IoT Events Ende des Supports.
Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Migrationsverfahren für Detektormodelle in AWS IoT Events
In diesem Abschnitt werden alternative Lösungen beschrieben, die ähnliche Funktionen des Detektormodells bieten, wenn Sie von wechseln AWS IoT Events.
Sie können die Datenaufnahme über AWS IoT Core Regeln auf eine Kombination anderer AWS Dienste migrieren. Anstatt die Daten über die BatchPutMessageAPI aufzunehmen, können die Daten an das MQTT-Thema weitergeleitet werden. AWS IoT Core
Dieser Migrationsansatz nutzt AWS IoT Core MQTT-Themen als Einstiegspunkt für Ihre IoT-Daten und ersetzt die direkte Eingabe zu. AWS IoT Events MQTT-Themen werden aus mehreren wichtigen Gründen ausgewählt. Sie bieten aufgrund der weit verbreiteten Verwendung von MQTT in der Branche eine breite Kompatibilität mit IoT-Geräten. Diese Themen können große Mengen an Nachrichten von zahlreichen Geräten verarbeiten und gewährleisten so die Skalierbarkeit. Sie bieten auch Flexibilität beim Routing und Filtern von Nachrichten nach Inhalt oder Gerätetyp. Darüber hinaus lassen sich AWS IoT Core MQTT-Themen nahtlos in andere AWS Dienste integrieren, was den Migrationsprozess erleichtert.
Daten fließen von MQTT-Themen in eine Architektur, die Amazon Kinesis Data Streams, eine AWS Lambda Funktion, eine Amazon DynamoDB-Tabelle und Amazon-Zeitpläne kombiniert. EventBridge Diese Kombination von Diensten repliziert und erweitert die Funktionen, die zuvor von bereitgestellt wurden AWS IoT Events, und bietet Ihnen mehr Flexibilität und Kontrolle über Ihre IoT-Datenverarbeitungspipeline.
Architekturen im Vergleich
Die aktuelle AWS IoT Events Architektur nimmt Daten über eine AWS IoT Core Regel und die BatchPutMessage
API auf. Diese Architektur wird AWS IoT Core für die Datenaufnahme und die Veröffentlichung von Ereignissen verwendet, wobei Nachrichten über AWS IoT Events Eingänge an Detektormodelle weitergeleitet werden, die die Zustandslogik definieren. Eine IAM-Rolle verwaltet die erforderlichen Berechtigungen.
Die neue Lösung kümmert sich AWS IoT Core um die Datenaufnahme (jetzt mit speziellen MQTT-Themen für Eingabe und Ausgabe). Es führt Kinesis Data Streams für die Datenpartitionierung und eine Evaluator-Lambda-Funktion für die Zustandslogik ein. Gerätestatus werden jetzt in einer DynamoDB-Tabelle gespeichert, und eine erweiterte IAM-Rolle verwaltet die Berechtigungen für diese Dienste.
Zweck | Lösung | Unterschiede |
---|---|---|
Datenaufnahme — Empfängt Daten von IoT-Geräten |
AWS IoT Core |
Erfordert jetzt zwei unterschiedliche MQTT-Themen: eines für die Erfassung von Gerätedaten und eines für die Veröffentlichung von Ausgabeereignissen |
Nachrichtenrichtung — Leitet eingehende Nachrichten an entsprechende Dienste weiter |
AWS IoT Core Regel für die Nachrichtenweiterleitung |
Behält die gleiche Routing-Funktionalität bei, leitet aber Nachrichten jetzt an Kinesis Data Streams weiter, anstatt AWS IoT Events |
Datenverarbeitung — Verarbeitet und organisiert eingehende Datenströme |
Kinesis Data Streams |
Ersetzt AWS IoT Events Eingabefunktionen und ermöglicht die Datenaufnahme mit Geräte-ID-Partitionierung für die Nachrichtenverarbeitung |
Logikauswertung — Verarbeitet Statusänderungen und löst Aktionen aus |
Evaluator Lambda |
Ersetzt das AWS IoT Events Detektormodell und ermöglicht eine anpassbare Auswertung der Zustandslogik durch Code anstelle eines visuellen Workflows |
Statusverwaltung — Behält den Gerätestatus bei |
DynamoDB-Tabelle |
Neue Komponente, die eine persistente Speicherung von Gerätestatus ermöglicht und die interne AWS IoT Events Statusverwaltung ersetzt |
Sicherheit — Verwaltet Dienstberechtigungen |
IAM-Rolle |
Die aktualisierten Berechtigungen umfassen jetzt zusätzlich zu den vorhandenen Berechtigungen den Zugriff auf Kinesis Data Streams und EventBridge DynamoDB AWS IoT Core |
Schritt 1: (Optional) Exportieren AWS IoT Events Sie die Modellkonfigurationen der Detektoren
Bevor Sie neue Ressourcen erstellen, exportieren Sie Ihre AWS IoT Events Detektormodelldefinitionen. Diese enthalten Ihre Logik zur Ereignisverarbeitung und können als historische Referenz für die Implementierung Ihrer neuen Lösung dienen.
Schritt 2: Erstellen einer IAM-Rolle
Erstellen Sie eine IAM-Rolle, um Berechtigungen zur Replikation der Funktionalität von bereitzustellen. AWS IoT Events Die Rolle in diesem Beispiel gewährt Zugriff auf DynamoDB für die Statusverwaltung, EventBridge für die Planung, Kinesis Data Streams für die Datenaufnahme, für die Veröffentlichung von Nachrichten und AWS IoT Core für die Protokollierung. CloudWatch Zusammen können diese Dienste als Ersatz für verwendet werden. AWS IoT Events
-
Erstellen Sie eine IAM-Rolle mit den folgenden Berechtigungen. Ausführlichere Anweisungen zum Erstellen einer IAM-Rolle finden Sie unter Erstellen einer Rolle zum Delegieren von Berechtigungen für einen AWS Dienst im IAM-Benutzerhandbuch.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "DynamoDBAccess", "Effect": "Allow", "Action": [ "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:UpdateItem", "dynamodb:DeleteItem", "dynamodb:Query", "dynamodb:Scan" ], "Resource": "arn:aws:dynamodb:
your-region
:your-account-id
:table/EventsStateTable" }, { "Sid": "SchedulerAccess", "Effect": "Allow", "Action": [ "scheduler:CreateSchedule", "scheduler:DeleteSchedule" ], "Resource": "arn:aws:scheduler:your-region
:your-account-id
:schedule/*" }, { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "arn:aws:kinesis:your-region
:your-account-id
:stream/*" }, { "Sid": "IoTPublishAccess", "Effect": "Allow", "Action": "iot:Publish", "Resource": "arn:aws:iot:your-region
:your-account-id
:topic/*" }, { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:your-region
:your-account-id
:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs::your-account-id
:log-group:/aws/lambda/your-lambda
:*" ] } ] } -
Fügen Sie die folgende Vertrauensrichtlinie für IAM-Rollen hinzu. Eine Vertrauensrichtlinie ermöglicht es den angegebenen AWS Diensten, die IAM-Rolle zu übernehmen, sodass sie die erforderlichen Aktionen ausführen können. Ausführlichere Anweisungen zur Erstellung einer IAM-Vertrauensrichtlinie finden Sie im IAM-Benutzerhandbuch unter Erstellen einer Rolle mithilfe benutzerdefinierter Vertrauensrichtlinien.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "scheduler.amazonaws.com", "lambda.amazonaws.com", "iot.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }
Schritt 3: Amazon Kinesis Data Streams erstellen
Erstellen Sie Amazon Kinesis Data Streams mit dem AWS Management Console oder AWS CLI.
Schritt 4: Erstellen oder aktualisieren Sie die MQTT-Nachrichtenrouting-Regel
Sie können eine neue MQTT-Nachrichtenrouting-Regel erstellen oder eine bestehende Regel aktualisieren.
Schritt 5: Ermitteln Sie den Endpunkt für das MQTT-Zielthema
Verwenden Sie das Ziel-MQTT-Thema, um zu konfigurieren, wo Ihre Themen ausgehende Nachrichten veröffentlichen, und ersetzen Sie damit die Funktionalität, die zuvor von verwendet wurde. AWS IoT Events Der Endpunkt ist für Ihr AWS Konto und Ihre Region einzigartig.
Schritt 6: Erstellen Sie eine Amazon DynamoDB-Tabelle
Eine Amazon DynamoDB-Tabelle ersetzt die Statusverwaltungsfunktion von und bietet eine skalierbare und flexible Möglichkeit AWS IoT Events, den Status Ihrer Geräte und die Detektormodelllogik in Ihrer neuen Lösungsarchitektur beizubehalten und zu verwalten.
Schritt 7: Erstellen Sie eine AWS Lambda Funktion (Konsole)
Die Lambda-Funktion dient als zentrale Verarbeitungs-Engine und ersetzt die Bewertungslogik des Detektormodells von AWS IoT Events. In diesem Beispiel integrieren wir andere AWS Dienste, um eingehende Daten zu verarbeiten, den Status zu verwalten und Aktionen auf der Grundlage Ihrer definierten Regeln auszulösen.
Erstellen Sie eine Lambda-Funktion mit NodeJS Laufzeit. Verwenden Sie den folgenden Codeausschnitt, der die hartcodierten Konstanten ersetzt:
-
Öffnen Sie die AWS Lambda console
. -
Wählen Sie Funktion erstellen.
-
Geben Sie einen Namen für den Funktionsnamen ein.
-
Wählen Sie NodeJS 2.x als Runtime aus.
-
Wählen Sie in der Dropdownliste Standardausführungsrolle ändern die Option Bestehende Rolle verwenden und wählen Sie dann die IAM-Rolle aus, die Sie in den vorherigen Schritten erstellt haben.
-
Wählen Sie Funktion erstellen.
-
Fügen Sie den folgenden Codeausschnitt ein, nachdem Sie die hartcodierten Konstanten ersetzt haben.
-
Fügen Sie nach der Erstellung Ihrer Funktion auf der Registerkarte Code das folgende Codebeispiel ein und ersetzen Sie den
your-destination-endpoint
Endpunkt durch Ihren eigenen.
import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb'; import { PutItemCommand } from '@aws-sdk/client-dynamodb'; import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane"; import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import //// External Clients and Constants const scheduler = new SchedulerClient({}); const iot = new IoTDataPlaneClient({ endpoint: 'https://
your-destination-endpoint
-ats.iot.your-region
.amazonaws.com/' }); const ddb = new DynamoDBClient({}); //// Lambda Handler function export const handler = async (event) => { console.log('Incoming event:', JSON.stringify(event, null, 2)); if (!event.Records) { throw new Error('No records found in event'); } const processedRecords = []; for (const record of event.Records) { try { if (record.eventSource !== 'aws:kinesis') { console.log(`Skipping non-Kinesis record from ${record.eventSource}`); continue; } // Assumes that we are processing records from Kinesis const payload = record.kinesis.data; const decodedData = Buffer.from(payload, 'base64').toString(); console.log("decoded payload is ", decodedData); const output = await handleDecodedData(decodedData); // Add additional processing logic here const processedData = { output, sequenceNumber: record.kinesis.sequenceNumber, partitionKey: record.kinesis.partitionKey, timestamp: record.kinesis.approximateArrivalTimestamp }; processedRecords.push(processedData); } catch (error) { console.error('Error processing record:', error); console.error('Failed record:', record); // Decide whether to throw error or continue processing other records // throw error; // Uncomment to stop processing on first error } } return { statusCode: 200, body: JSON.stringify({ message: 'Processing complete', processedCount: processedRecords.length, records: processedRecords }) }; }; // Helper function to handle decoded data async function handleDecodedData(payload) { try { // Parse the decoded data const parsedData = JSON.parse(payload); // Extract instanceId const instanceId = parsedData.instanceId; // Parse the input field const inputData = JSON.parse(parsedData.payload); const temperature = inputData.temperature; console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature); await iotEvents.process(instanceId, inputData) return { instanceId, temperature, // Add any other fields you want to return rawInput: inputData }; } catch (error) { console.error('Error handling decoded data:', error); throw error; } } //// Classes for declaring/defining the state machine class CurrentState { constructor(instanceId, stateName, variables, inputs) { this.stateName = stateName; this.variables = variables; this.inputs = inputs; this.instanceId = instanceId } static async load(instanceId) { console.log(`Loading state for id ${instanceId}`); try { const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({ TableName: 'EventsStateTable', Key: { 'InstanceId': { S: `${instanceId}` } } })); const { stateName, variables, inputs } = JSON.parse(stateContent); return new CurrentState(instanceId, stateName, variables, inputs); } catch (e) { console.log(`No state for id ${instanceId}: ${e}`); return undefined; } } static async save(instanceId, state) { console.log(`Saving state for id ${instanceId}`); await ddb.send(new PutItemCommand({ TableName: 'your-events-state-table-name
', Item: { 'InstanceId': { S: `${instanceId}` }, 'state': { S: state } } })); } setVariable(name, value) { this.variables[name] = value; } changeState(stateName) { console.log(`Changing state from ${this.stateName} to ${stateName}`); this.stateName = stateName; } async setTimer(instanceId, frequencyInMinutes, payload) { console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`); const base64Payload = Buffer.from(JSON.stringify(payload)).toString(); console.log(base64Payload); const scheduleName = `your-schedule-name
-${instanceId}-schedule`; const scheduleParams = { Name: scheduleName, FlexibleTimeWindow: { Mode: 'OFF' }, ScheduleExpression: `rate(${frequencyInMinutes} minutes)`, Target: { Arn: "arn:aws::kinesis:your-region
:your-account-id
:stream/your-kinesis-stream-name
", RoleArn: "arn:aws::iam::your-account-id
:role/service-role/your-iam-role
", Input: base64Payload, KinesisParameters: { PartitionKey: instanceId, }, RetryPolicy: { MaximumRetryAttempts: 3 } }, }; const command = new CreateScheduleCommand(scheduleParams); console.log(`Sending command to set timer ${JSON.stringify(command)}`); await scheduler.send(command); } async clearTimer(instanceId) { console.log(`Cleaning timer ${instanceId}`); const scheduleName = `your-schedule-name
-${instanceId}-schdule`; const command = new DeleteScheduleCommand({ Name: scheduleName }); await scheduler.send(command); } async executeAction(actionType, actionPayload) { console.log(`Will execute the ${actionType} with payload ${actionPayload}`); await iot.send(new PublishCommand({ topic: `${this.instanceId}`, payload: actionPayload, qos: 0 })); } setInput(value) { this.inputs = { ...this.inputs, ...value }; } input(name) { return this.inputs[name]; } } class IoTEvents { constructor(initialState) { this.initialState = initialState; this.states = {}; } state(name) { const state = new IoTEventsState(); this.states[name] = state; return state; } async process(instanceId, input) { let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {}); currentState.setInput(input); console.log(`With inputs as: ${JSON.stringify(currentState)}`); const state = this.states[currentState.stateName]; currentState = await state.evaluate(currentState); console.log(`With output as: ${JSON.stringify(currentState)}`); await CurrentState.save(instanceId, JSON.stringify(currentState)); } } class Event { constructor(condition, action) { this.condition = condition; this.action = action; } } class IoTEventsState { constructor() { this.eventsList = [] } events(eventListArg) { this.eventsList.push(...eventListArg); return this; } async evaluate(currentState) { for (const e of this.eventsList) { console.log(`Evaluating event ${e.condition}`); if (e.condition(currentState)) { console.log(`Event condition met`); // Execute any action as defined in iotEvents DM Definition await e.action(currentState); } } return currentState; } } ////// DetectorModel Definitions - replace with your own defintionslet processAlarmStateEvent = new Event( (currentState) => { const source = currentState.input('source'); return ( currentState.input('temperature') < 70 ); }, async (currentState) => { currentState.changeState('normal'); await currentState.clearTimer(currentState.instanceId) await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`); } ); let processTimerEvent = new Event( (currentState) => { const source = currentState.input('source'); console.log(`Evaluating timer event with source ${source}`); const booleanOutput = (source !== undefined && source !== null && typeof source === 'string' && source.toLowerCase() === 'timer' && // check if the currentState == state from the timer payload currentState.input('currentState') !== undefined && currentState.input('currentState') !== null && currentState.input('currentState').toLowerCase !== 'normal'); console.log(`Timer event evaluated as ${booleanOutput}`); return booleanOutput; }, async (currentState) => { await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`); } ); let processNormalEvent = new Event( (currentState) => currentState.input('temperature') > 70, async (currentState) => { currentState.changeState('alarm'); await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`); await currentState.setTimer(currentState.instanceId, 5, { "instanceId": currentState.instanceId, "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}" }); } );
const iotEvents = new IoTEvents('normal'); iotEvents .state('normal') .events( [ processNormalEvent ]); iotEvents .state('alarm') .events([ processAlarmStateEvent, processTimerEvent ] );
Schritt 8: Einen Amazon Kinesis Data Streams Streams-Trigger hinzufügen
Fügen Sie der Lambda-Funktion mithilfe von oder einen Kinesis Data Streams Streams-Trigger hinzu. AWS Management Console AWS CLI
Durch das Hinzufügen eines Kinesis Data Streams Streams-Triggers zu Ihrer Lambda-Funktion wird die Verbindung zwischen Ihrer Datenerfassungspipeline und Ihrer Verarbeitungslogik hergestellt, sodass eingehende IoT-Datenströme automatisch ausgewertet und in Echtzeit auf Ereignisse reagiert werden kann, ähnlich wie bei der Verarbeitung von Eingaben. AWS IoT Events
Schritt 9: Testen Sie die Funktionen zur Datenaufnahme und -ausgabe ()AWS CLI
Veröffentlichen Sie eine Nutzlast zum MQTT-Thema, die auf dem basiert, was Sie in Ihrem Detektormodell definiert haben. Im Folgenden finden Sie ein Beispiel für eine Payload zum MQTT-Thema your-topic-name
zum Testen einer Implementierung.
{ "instanceId": "your-instance-id", "payload": "{\"temperature\":78}" }
Sie sollten eine MQTT-Nachricht sehen, die zu einem Thema veröffentlicht wurde und den folgenden (oder ähnlichen) Inhalt hat:
{ "state": "alarm detected, timer started" }