Migrationsverfahren für Detektormodelle in AWS IoT Events - AWS IoT Events

Hinweis zum Ende des Supports: Am 20. Mai 2026 AWS endet der Support für AWS IoT Events. Nach dem 20. Mai 2026 können Sie nicht mehr auf die AWS IoT Events Konsole oder AWS IoT Events die Ressourcen zugreifen. Weitere Informationen finden Sie unter AWS IoT Events Ende des Supports.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Migrationsverfahren für Detektormodelle in AWS IoT Events

In diesem Abschnitt werden alternative Lösungen beschrieben, die ähnliche Funktionen des Detektormodells bieten, wenn Sie von wechseln AWS IoT Events.

Sie können die Datenaufnahme über AWS IoT Core Regeln auf eine Kombination anderer AWS Dienste migrieren. Anstatt die Daten über die BatchPutMessageAPI aufzunehmen, können die Daten an das MQTT-Thema weitergeleitet werden. AWS IoT Core

Dieser Migrationsansatz nutzt AWS IoT Core MQTT-Themen als Einstiegspunkt für Ihre IoT-Daten und ersetzt die direkte Eingabe zu. AWS IoT Events MQTT-Themen werden aus mehreren wichtigen Gründen ausgewählt. Sie bieten aufgrund der weit verbreiteten Verwendung von MQTT in der Branche eine breite Kompatibilität mit IoT-Geräten. Diese Themen können große Mengen an Nachrichten von zahlreichen Geräten verarbeiten und gewährleisten so die Skalierbarkeit. Sie bieten auch Flexibilität beim Routing und Filtern von Nachrichten nach Inhalt oder Gerätetyp. Darüber hinaus lassen sich AWS IoT Core MQTT-Themen nahtlos in andere AWS Dienste integrieren, was den Migrationsprozess erleichtert.

Daten fließen von MQTT-Themen in eine Architektur, die Amazon Kinesis Data Streams, eine AWS Lambda Funktion, eine Amazon DynamoDB-Tabelle und Amazon-Zeitpläne kombiniert. EventBridge Diese Kombination von Diensten repliziert und erweitert die Funktionen, die zuvor von bereitgestellt wurden AWS IoT Events, und bietet Ihnen mehr Flexibilität und Kontrolle über Ihre IoT-Datenverarbeitungspipeline.

Architekturen im Vergleich

Die aktuelle AWS IoT Events Architektur nimmt Daten über eine AWS IoT Core Regel und die BatchPutMessage API auf. Diese Architektur wird AWS IoT Core für die Datenaufnahme und die Veröffentlichung von Ereignissen verwendet, wobei Nachrichten über AWS IoT Events Eingänge an Detektormodelle weitergeleitet werden, die die Zustandslogik definieren. Eine IAM-Rolle verwaltet die erforderlichen Berechtigungen.

Die neue Lösung kümmert sich AWS IoT Core um die Datenaufnahme (jetzt mit speziellen MQTT-Themen für Eingabe und Ausgabe). Es führt Kinesis Data Streams für die Datenpartitionierung und eine Evaluator-Lambda-Funktion für die Zustandslogik ein. Gerätestatus werden jetzt in einer DynamoDB-Tabelle gespeichert, und eine erweiterte IAM-Rolle verwaltet die Berechtigungen für diese Dienste.

Zweck Lösung Unterschiede

Datenaufnahme — Empfängt Daten von IoT-Geräten

AWS IoT Core

Erfordert jetzt zwei unterschiedliche MQTT-Themen: eines für die Erfassung von Gerätedaten und eines für die Veröffentlichung von Ausgabeereignissen

Nachrichtenrichtung — Leitet eingehende Nachrichten an entsprechende Dienste weiter

AWS IoT Core Regel für die Nachrichtenweiterleitung

Behält die gleiche Routing-Funktionalität bei, leitet aber Nachrichten jetzt an Kinesis Data Streams weiter, anstatt AWS IoT Events

Datenverarbeitung — Verarbeitet und organisiert eingehende Datenströme

Kinesis Data Streams

Ersetzt AWS IoT Events Eingabefunktionen und ermöglicht die Datenaufnahme mit Geräte-ID-Partitionierung für die Nachrichtenverarbeitung

Logikauswertung — Verarbeitet Statusänderungen und löst Aktionen aus

Evaluator Lambda

Ersetzt das AWS IoT Events Detektormodell und ermöglicht eine anpassbare Auswertung der Zustandslogik durch Code anstelle eines visuellen Workflows

Statusverwaltung — Behält den Gerätestatus bei

DynamoDB-Tabelle

Neue Komponente, die eine persistente Speicherung von Gerätestatus ermöglicht und die interne AWS IoT Events Statusverwaltung ersetzt

Sicherheit — Verwaltet Dienstberechtigungen

IAM-Rolle

Die aktualisierten Berechtigungen umfassen jetzt zusätzlich zu den vorhandenen Berechtigungen den Zugriff auf Kinesis Data Streams und EventBridge DynamoDB AWS IoT Core

Schritt 1: (Optional) Exportieren AWS IoT Events Sie die Modellkonfigurationen der Detektoren

Bevor Sie neue Ressourcen erstellen, exportieren Sie Ihre AWS IoT Events Detektormodelldefinitionen. Diese enthalten Ihre Logik zur Ereignisverarbeitung und können als historische Referenz für die Implementierung Ihrer neuen Lösung dienen.

Console

Führen Sie mithilfe von die folgenden Schritte aus AWS IoT Events AWS Management Console, um Ihre Meldermodellkonfigurationen zu exportieren:

Um Detektormodelle zu exportieren, verwenden Sie AWS Management Console
  1. Melden Sie sich an der AWS IoT Events -Konsole an.

  2. Wählen Sie im linken Navigationsbereich Detector models (Detektormodelle).

  3. Wählen Sie das zu exportierende Detektormodell aus.

  4. Wählen Sie Export aus. Lesen Sie die Informationsmeldung zur Ausgabe und wählen Sie dann erneut Exportieren.

  5. Wiederholen Sie den Vorgang für jedes Meldermodell, das Sie exportieren möchten.

Eine Datei, die eine JSON-Ausgabe Ihres Detektormodells enthält, wird dem Download-Ordner Ihres Browsers hinzugefügt. Sie können optional jede Konfiguration des Detektormodells speichern, um historische Daten beizubehalten.

AWS CLI

Führen Sie mit dem die folgenden Befehle aus AWS CLI, um Ihre Detektormodellkonfigurationen zu exportieren:

Um Detektormodelle zu exportieren mit AWS CLI
  1. Alle Meldermodelle in Ihrem Konto auflisten:

    aws iotevents list-detector-models
  2. Exportieren Sie für jedes Meldermodell dessen Konfiguration, indem Sie den folgenden Befehl ausführen:

    aws iotevents describe-detector-model \ --detector-model-name your-detector-model-name
  3. Speichern Sie die Ausgabe für jedes Detektormodell.

Schritt 2: Erstellen einer IAM-Rolle

Erstellen Sie eine IAM-Rolle, um Berechtigungen zur Replikation der Funktionalität von bereitzustellen. AWS IoT Events Die Rolle in diesem Beispiel gewährt Zugriff auf DynamoDB für die Statusverwaltung, EventBridge für die Planung, Kinesis Data Streams für die Datenaufnahme, für die Veröffentlichung von Nachrichten und AWS IoT Core für die Protokollierung. CloudWatch Zusammen können diese Dienste als Ersatz für verwendet werden. AWS IoT Events

  1. Erstellen Sie eine IAM-Rolle mit den folgenden Berechtigungen. Ausführlichere Anweisungen zum Erstellen einer IAM-Rolle finden Sie unter Erstellen einer Rolle zum Delegieren von Berechtigungen für einen AWS Dienst im IAM-Benutzerhandbuch.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "DynamoDBAccess", "Effect": "Allow", "Action": [ "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:UpdateItem", "dynamodb:DeleteItem", "dynamodb:Query", "dynamodb:Scan" ], "Resource": "arn:aws:dynamodb:your-region:your-account-id:table/EventsStateTable" }, { "Sid": "SchedulerAccess", "Effect": "Allow", "Action": [ "scheduler:CreateSchedule", "scheduler:DeleteSchedule" ], "Resource": "arn:aws:scheduler:your-region:your-account-id:schedule/*" }, { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "arn:aws:kinesis:your-region:your-account-id:stream/*" }, { "Sid": "IoTPublishAccess", "Effect": "Allow", "Action": "iot:Publish", "Resource": "arn:aws:iot:your-region:your-account-id:topic/*" }, { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:your-region:your-account-id:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs::your-account-id:log-group:/aws/lambda/your-lambda:*" ] } ] }
  2. Fügen Sie die folgende Vertrauensrichtlinie für IAM-Rollen hinzu. Eine Vertrauensrichtlinie ermöglicht es den angegebenen AWS Diensten, die IAM-Rolle zu übernehmen, sodass sie die erforderlichen Aktionen ausführen können. Ausführlichere Anweisungen zur Erstellung einer IAM-Vertrauensrichtlinie finden Sie im IAM-Benutzerhandbuch unter Erstellen einer Rolle mithilfe benutzerdefinierter Vertrauensrichtlinien.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "scheduler.amazonaws.com", "lambda.amazonaws.com", "iot.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }

Schritt 3: Amazon Kinesis Data Streams erstellen

Erstellen Sie Amazon Kinesis Data Streams mit dem AWS Management Console oder AWS CLI.

Console

Um einen Kinesis-Datenstream mit dem zu erstellen AWS Management Console, folgen Sie dem Verfahren auf der Seite Einen Datenstream erstellen im Amazon Kinesis Data Streams Developer Guide.

Passen Sie die Anzahl der Shards an Ihre Geräteanzahl und die Größe der Nachrichtennutzlast an.

AWS CLI

Erstellen Sie mithilfe von Amazon Kinesis Data Streams AWS CLI, um die Daten von Ihren Geräten aufzunehmen und zu partitionieren.

Kinesis Data Streams werden bei dieser Migration verwendet, um die Datenaufnahmefunktion von zu ersetzen. AWS IoT Events Es bietet eine skalierbare und effiziente Methode zur Erfassung, Verarbeitung und Analyse von Echtzeit-Streaming-Daten von Ihren IoT-Geräten und bietet gleichzeitig eine flexible Datenverarbeitung und Integration mit anderen AWS Diensten.

aws kinesis create-stream --stream-name your-kinesis-stream-name --shard-count 4 --region your-region

Passen Sie die Anzahl der Shards an die Anzahl Ihrer Geräte und die Größe der Nachrichtennutzlast an.

Schritt 4: Erstellen oder aktualisieren Sie die MQTT-Nachrichtenrouting-Regel

Sie können eine neue MQTT-Nachrichtenrouting-Regel erstellen oder eine bestehende Regel aktualisieren.

Console
  1. Stellen Sie fest, ob Sie eine neue MQTT-Nachrichtenrouting-Regel benötigen oder ob Sie eine bestehende Regel aktualisieren können.

  2. Öffnen Sie die AWS IoT Core -Konsole.

  3. Wählen Sie im Navigationsbereich Message Routing und dann Regeln aus.

  4. Wählen Sie im Abschnitt Verwalten die Option Nachrichtenweiterleitung und dann Regeln aus.

  5. Wählen Sie Regel erstellen aus.

  6. Geben Sie auf der Seite Regeleigenschaften angeben den AWS IoT Core Regelnamen für Regelname ein. Geben Sie unter Regelbeschreibung — optional eine Beschreibung ein, um zu kennzeichnen, dass Sie Ereignisse verarbeiten und an Kinesis Data Streams weiterleiten.

  7. Geben Sie auf der Seite „SQL-Anweisung konfigurieren“ Folgendes für die SQL-Anweisung ein: SELECT * FROM 'your-database' und wählen Sie dann Weiter.

  8. Wählen Sie auf der Seite „Regeln anhängen“ und unter Regelaktionen die Option Kinesis aus.

  9. Wählen Sie Ihren Kinesis-Stream für den Stream aus. Geben Sie als Partitionsschlüssel your-instance-id ein. Wählen Sie die entsprechende Rolle für die IAM-Rolle aus und klicken Sie dann auf Regelaktion hinzufügen.

Weitere Informationen finden Sie unter AWS IoT-Regeln erstellen, um Gerätedaten an andere Dienste weiterzuleiten.

AWS CLI
  1. Erstellen Sie eine JSON Datei mit dem Namen und dem folgenden Inhalt. Diese JSON-Konfigurationsdatei definiert eine AWS IoT Core Regel, die alle Nachrichten aus einem Thema auswählt und sie an den angegebenen Kinesis-Stream weiterleitet, wobei die Instanz-ID als Partitionsschlüssel verwendet wird.

    { "sql": "SELECT * FROM 'your-config-file'", "description": "Rule to process events and forward to Kinesis Data Streams", "actions": [ { "kinesis": { "streamName": "your-kinesis-stream-name", "roleArn": "arn:aws:iam::your-account-id:role/service-role/your-iam-role", "partitionKey": "${your-instance-id}" } } ], "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23" }
  2. Erstellen Sie die MQTT-Themenregel mit dem. AWS CLI In diesem Schritt wird mithilfe der AWS CLI eine AWS IoT Core Themenregel unter Verwendung der in der events_rule.json Datei definierten Konfiguration erstellt.

    aws iot create-topic-rule \ --rule-name "your-iot-core-rule" \ --topic-rule-payload file://your-file-name.json

Schritt 5: Ermitteln Sie den Endpunkt für das MQTT-Zielthema

Verwenden Sie das Ziel-MQTT-Thema, um zu konfigurieren, wo Ihre Themen ausgehende Nachrichten veröffentlichen, und ersetzen Sie damit die Funktionalität, die zuvor von verwendet wurde. AWS IoT Events Der Endpunkt ist für Ihr AWS Konto und Ihre Region einzigartig.

Console
  1. Öffnen Sie die AWS IoT Core -Konsole.

  2. Wählen Sie im linken Navigationsbereich im Bereich Connect die Option Domain-Konfiguration aus.

  3. Wählen Sie die Domainkonfiguration IoT:Data-ATS, um die Detailseite der Konfiguration zu öffnen.

  4. Kopieren Sie den Wert für den Domainnamen. Dieser Wert ist der Endpunkt. Speichern Sie den Endpunktwert, da Sie ihn in späteren Schritten benötigen werden.

AWS CLI

Führen Sie den folgenden Befehl aus, um den AWS IoT Core Endpunkt für die Veröffentlichung ausgehender Nachrichten für Ihr Konto zu ermitteln.

aws iot describe-endpoint --endpoint-type iot:Data-ATS --region your-region

Schritt 6: Erstellen Sie eine Amazon DynamoDB-Tabelle

Eine Amazon DynamoDB-Tabelle ersetzt die Statusverwaltungsfunktion von und bietet eine skalierbare und flexible Möglichkeit AWS IoT Events, den Status Ihrer Geräte und die Detektormodelllogik in Ihrer neuen Lösungsarchitektur beizubehalten und zu verwalten.

Console

Erstellen Sie eine Amazon DynamoDB-Tabelle, um den Status der Detektormodelle beizubehalten. Weitere Informationen finden Sie unter Erstellen einer Tabelle in DynamoDB im Amazon DynamoDB Developer Guide.

Verwenden Sie Folgendes für die Tabellendetails:

  • Geben Sie unter Tabellenname einen Tabellennamen Ihrer Wahl ein.

  • Geben Sie für Partitionsschlüssel Ihre eigene Instanz-ID ein.

  • Sie können die Standardeinstellungen für die Tabelleneinstellungen verwenden

AWS CLI

Führen Sie den folgenden Befehl aus, um eine DynamoDB-Tabelle zu erstellen.

aws dynamodb create-table \ --table-name your-table-name \ --attribute-definitions AttributeName=your-instance-id,AttributeType=S \ --key-schema AttributeName=your-instance-id,KeyType=HASH \

Schritt 7: Erstellen Sie eine AWS Lambda Funktion (Konsole)

Die Lambda-Funktion dient als zentrale Verarbeitungs-Engine und ersetzt die Bewertungslogik des Detektormodells von AWS IoT Events. In diesem Beispiel integrieren wir andere AWS Dienste, um eingehende Daten zu verarbeiten, den Status zu verwalten und Aktionen auf der Grundlage Ihrer definierten Regeln auszulösen.

Erstellen Sie eine Lambda-Funktion mit NodeJS Laufzeit. Verwenden Sie den folgenden Codeausschnitt, der die hartcodierten Konstanten ersetzt:

  1. Öffnen Sie die AWS Lambda console.

  2. Wählen Sie Funktion erstellen.

  3. Geben Sie einen Namen für den Funktionsnamen ein.

  4. Wählen Sie NodeJS 2.x als Runtime aus.

  5. Wählen Sie in der Dropdownliste Standardausführungsrolle ändern die Option Bestehende Rolle verwenden und wählen Sie dann die IAM-Rolle aus, die Sie in den vorherigen Schritten erstellt haben.

  6. Wählen Sie Funktion erstellen.

  7. Fügen Sie den folgenden Codeausschnitt ein, nachdem Sie die hartcodierten Konstanten ersetzt haben.

  8. Fügen Sie nach der Erstellung Ihrer Funktion auf der Registerkarte Code das folgende Codebeispiel ein und ersetzen Sie den your-destination-endpoint Endpunkt durch Ihren eigenen.

import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb'; import { PutItemCommand } from '@aws-sdk/client-dynamodb'; import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane"; import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import //// External Clients and Constants const scheduler = new SchedulerClient({}); const iot = new IoTDataPlaneClient({ endpoint: 'https://your-destination-endpoint-ats.iot.your-region.amazonaws.com/' }); const ddb = new DynamoDBClient({}); //// Lambda Handler function export const handler = async (event) => { console.log('Incoming event:', JSON.stringify(event, null, 2)); if (!event.Records) { throw new Error('No records found in event'); } const processedRecords = []; for (const record of event.Records) { try { if (record.eventSource !== 'aws:kinesis') { console.log(`Skipping non-Kinesis record from ${record.eventSource}`); continue; } // Assumes that we are processing records from Kinesis const payload = record.kinesis.data; const decodedData = Buffer.from(payload, 'base64').toString(); console.log("decoded payload is ", decodedData); const output = await handleDecodedData(decodedData); // Add additional processing logic here const processedData = { output, sequenceNumber: record.kinesis.sequenceNumber, partitionKey: record.kinesis.partitionKey, timestamp: record.kinesis.approximateArrivalTimestamp }; processedRecords.push(processedData); } catch (error) { console.error('Error processing record:', error); console.error('Failed record:', record); // Decide whether to throw error or continue processing other records // throw error; // Uncomment to stop processing on first error } } return { statusCode: 200, body: JSON.stringify({ message: 'Processing complete', processedCount: processedRecords.length, records: processedRecords }) }; }; // Helper function to handle decoded data async function handleDecodedData(payload) { try { // Parse the decoded data const parsedData = JSON.parse(payload); // Extract instanceId const instanceId = parsedData.instanceId; // Parse the input field const inputData = JSON.parse(parsedData.payload); const temperature = inputData.temperature; console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature); await iotEvents.process(instanceId, inputData) return { instanceId, temperature, // Add any other fields you want to return rawInput: inputData }; } catch (error) { console.error('Error handling decoded data:', error); throw error; } } //// Classes for declaring/defining the state machine class CurrentState { constructor(instanceId, stateName, variables, inputs) { this.stateName = stateName; this.variables = variables; this.inputs = inputs; this.instanceId = instanceId } static async load(instanceId) { console.log(`Loading state for id ${instanceId}`); try { const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({ TableName: 'EventsStateTable', Key: { 'InstanceId': { S: `${instanceId}` } } })); const { stateName, variables, inputs } = JSON.parse(stateContent); return new CurrentState(instanceId, stateName, variables, inputs); } catch (e) { console.log(`No state for id ${instanceId}: ${e}`); return undefined; } } static async save(instanceId, state) { console.log(`Saving state for id ${instanceId}`); await ddb.send(new PutItemCommand({ TableName: 'your-events-state-table-name', Item: { 'InstanceId': { S: `${instanceId}` }, 'state': { S: state } } })); } setVariable(name, value) { this.variables[name] = value; } changeState(stateName) { console.log(`Changing state from ${this.stateName} to ${stateName}`); this.stateName = stateName; } async setTimer(instanceId, frequencyInMinutes, payload) { console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`); const base64Payload = Buffer.from(JSON.stringify(payload)).toString(); console.log(base64Payload); const scheduleName = `your-schedule-name-${instanceId}-schedule`; const scheduleParams = { Name: scheduleName, FlexibleTimeWindow: { Mode: 'OFF' }, ScheduleExpression: `rate(${frequencyInMinutes} minutes)`, Target: { Arn: "arn:aws::kinesis:your-region:your-account-id:stream/your-kinesis-stream-name", RoleArn: "arn:aws::iam::your-account-id:role/service-role/your-iam-role", Input: base64Payload, KinesisParameters: { PartitionKey: instanceId, }, RetryPolicy: { MaximumRetryAttempts: 3 } }, }; const command = new CreateScheduleCommand(scheduleParams); console.log(`Sending command to set timer ${JSON.stringify(command)}`); await scheduler.send(command); } async clearTimer(instanceId) { console.log(`Cleaning timer ${instanceId}`); const scheduleName = `your-schedule-name-${instanceId}-schdule`; const command = new DeleteScheduleCommand({ Name: scheduleName }); await scheduler.send(command); } async executeAction(actionType, actionPayload) { console.log(`Will execute the ${actionType} with payload ${actionPayload}`); await iot.send(new PublishCommand({ topic: `${this.instanceId}`, payload: actionPayload, qos: 0 })); } setInput(value) { this.inputs = { ...this.inputs, ...value }; } input(name) { return this.inputs[name]; } } class IoTEvents { constructor(initialState) { this.initialState = initialState; this.states = {}; } state(name) { const state = new IoTEventsState(); this.states[name] = state; return state; } async process(instanceId, input) { let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {}); currentState.setInput(input); console.log(`With inputs as: ${JSON.stringify(currentState)}`); const state = this.states[currentState.stateName]; currentState = await state.evaluate(currentState); console.log(`With output as: ${JSON.stringify(currentState)}`); await CurrentState.save(instanceId, JSON.stringify(currentState)); } } class Event { constructor(condition, action) { this.condition = condition; this.action = action; } } class IoTEventsState { constructor() { this.eventsList = [] } events(eventListArg) { this.eventsList.push(...eventListArg); return this; } async evaluate(currentState) { for (const e of this.eventsList) { console.log(`Evaluating event ${e.condition}`); if (e.condition(currentState)) { console.log(`Event condition met`); // Execute any action as defined in iotEvents DM Definition await e.action(currentState); } } return currentState; } } ////// DetectorModel Definitions - replace with your own defintions let processAlarmStateEvent = new Event( (currentState) => { const source = currentState.input('source'); return ( currentState.input('temperature') < 70 ); }, async (currentState) => { currentState.changeState('normal'); await currentState.clearTimer(currentState.instanceId) await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`); } ); let processTimerEvent = new Event( (currentState) => { const source = currentState.input('source'); console.log(`Evaluating timer event with source ${source}`); const booleanOutput = (source !== undefined && source !== null && typeof source === 'string' && source.toLowerCase() === 'timer' && // check if the currentState == state from the timer payload currentState.input('currentState') !== undefined && currentState.input('currentState') !== null && currentState.input('currentState').toLowerCase !== 'normal'); console.log(`Timer event evaluated as ${booleanOutput}`); return booleanOutput; }, async (currentState) => { await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`); } ); let processNormalEvent = new Event( (currentState) => currentState.input('temperature') > 70, async (currentState) => { currentState.changeState('alarm'); await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`); await currentState.setTimer(currentState.instanceId, 5, { "instanceId": currentState.instanceId, "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}" }); } ); const iotEvents = new IoTEvents('normal'); iotEvents .state('normal') .events( [ processNormalEvent ]); iotEvents .state('alarm') .events([ processAlarmStateEvent, processTimerEvent ] );

Schritt 8: Einen Amazon Kinesis Data Streams Streams-Trigger hinzufügen

Fügen Sie der Lambda-Funktion mithilfe von oder einen Kinesis Data Streams Streams-Trigger hinzu. AWS Management Console AWS CLI

Durch das Hinzufügen eines Kinesis Data Streams Streams-Triggers zu Ihrer Lambda-Funktion wird die Verbindung zwischen Ihrer Datenerfassungspipeline und Ihrer Verarbeitungslogik hergestellt, sodass eingehende IoT-Datenströme automatisch ausgewertet und in Echtzeit auf Ereignisse reagiert werden kann, ähnlich wie bei der Verarbeitung von Eingaben. AWS IoT Events

Console

Weitere Informationen finden Sie unter Erstellen einer Ereignisquellenzuordnung zum Aufrufen einer Lambda-Funktion im AWS Lambda Entwicklerhandbuch.

Einzelheiten zur Zuordnung der Ereignisquellen finden Sie wie folgt:

AWS CLI

Führen Sie den folgenden Befehl aus, um den Lambda-Funktionstrigger zu erstellen.

aws lambda create-event-source-mapping \ --function-name your-lambda-name \ --event-source arn:aws:kinesis:your-region:your-account-id:stream/your-kinesis-stream-name \ --batch-size 10 \ --starting-position LATEST \ --region your-region

Schritt 9: Testen Sie die Funktionen zur Datenaufnahme und -ausgabe ()AWS CLI

Veröffentlichen Sie eine Nutzlast zum MQTT-Thema, die auf dem basiert, was Sie in Ihrem Detektormodell definiert haben. Im Folgenden finden Sie ein Beispiel für eine Payload zum MQTT-Thema your-topic-name zum Testen einer Implementierung.

{ "instanceId": "your-instance-id", "payload": "{\"temperature\":78}" }

Sie sollten eine MQTT-Nachricht sehen, die zu einem Thema veröffentlicht wurde und den folgenden (oder ähnlichen) Inhalt hat:

{ "state": "alarm detected, timer started" }