中探测器模型的迁移程序 AWS IoT Events - AWS IoT Events

终止支持通知:2026 年 5 月 20 日, AWS 将终止对的支持。 AWS IoT Events 2026 年 5 月 20 日之后,您将无法再访问 AWS IoT Events 控制台或 AWS IoT Events 资源。有关更多信息,请参阅AWS IoT Events 终止支持

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

中探测器模型的迁移程序 AWS IoT Events

本节介绍替代解决方案,这些解决方案可在您迁移到其他探测器模型时提供类似的探测器模型功能 AWS IoT Events。

您可以通过 AWS IoT Core 规则将数据摄取迁移到其他 AWS 服务的组合。可以将数据路由到 MQT BatchPutMessageT 主题,而不是通过 API 获取数据。 AWS IoT Core

这种迁移方法利用 AWS IoT Core MQTT 主题作为物联网数据的入口点,取代了直接输入。 AWS IoT Events选择 MQTT 主题有几个关键原因。由于 MQTT 在行业中的广泛使用,它们与物联网设备具有广泛的兼容性。这些主题可以处理来自众多设备的大量消息,从而确保可扩展性。它们还允许根据内容或设备类型灵活地路由和筛选消息。此外, AWS IoT Core MQTT 主题与其他 AWS 服务无缝集成,从而简化了迁移过程。

数据从 MQTT 主题流入一个架构,该架构结合了 Amazon Kinesis Data Streams、 AWS Lambda 一个函数、一个亚马逊 DynamoDB 表和亚马逊计划。 EventBridge 这种服务组合复制并增强了以前提供的功能 AWS IoT Events,使您可以更加灵活地控制物联网数据处理管道。

比较架构

当前 AWS IoT Events 架构通过 AWS IoT Core 规则和 BatchPutMessage API 提取数据。该架构 AWS IoT Core 用于数据摄取和事件发布,消息通过 AWS IoT Events 输入路由到定义状态逻辑的探测器模型。IAM 角色管理必要的权限。

新的解决方案 AWS IoT Core 用于数据摄取(现在有专门的输入和输出 MQTT 主题)。它引入了用于数据分区的 Kinesis Data Streams 和用于状态逻辑的评估器 Lambda 函数。设备状态现在存储在 DynamoDB 表中,增强的 IAM 角色管理这些服务的权限。

用途 解决方案 差异

数据摄取 — 从物联网设备接收数据

AWS IoT Core

现在需要两个不同的 MQTT 主题:一个用于摄取设备数据,另一个用于发布输出事件

消息方向-将传入的消息路由到相应的服务

AWS IoT Core 邮件路由规则

保持相同的路由功能,但现在将消息定向到 Kinesis Data Streams 而不是 AWS IoT Events

数据处理-处理和组织传入的数据流

Kinesis Data Streams

取代 AWS IoT Events 输入功能,使用设备 ID 分区提供数据摄取,用于消息处理

逻辑评估-处理状态变化并触发操作

评估者 Lambda

取代 AWS IoT Events 探测器模型,通过代码而不是可视化工作流程提供可自定义的状态逻辑评估

状态管理-维护设备状态

DynamoDB 表

提供设备状态永久存储的新组件,取代了内部 AWS IoT Events 状态管理

安全-管理服务权限

IAM 角色

更新后的权限现在包括访问 Kinesis Data Streams、DynamoDB 以及 EventBridge 现有权限 AWS IoT Core

步骤 1:(可选)导出 AWS IoT Events 探测器型号配置

在创建新资源之前,请导出您的 AWS IoT Events 探测器模型定义。它们包含您的事件处理逻辑,可以作为实施新解决方案的历史参考。

Console

使用执行以下步骤以导出您的探测器模型配置: AWS IoT Events AWS Management Console

要使用导出探测器模型 AWS Management Console
  1. 登录 AWS IoT Events 控制台

  2. 在左侧导航窗格中,选择探测器模型

  3. 选择要导出的探测器型号。

  4. 选择导出。阅读有关输出的信息消息,然后再次选择 “出”。

  5. 对要导出的每个探测器模型重复该过程。

包含探测器模型的 JSON 输出的文件已添加到浏览器的下载文件夹中。您可以选择保存每个探测器模型配置以保留历史数据。

AWS CLI

使用运行以下命令导出您的探测器模型配置: AWS CLI

要使用导出探测器模型 AWS CLI
  1. 列出您账户中的所有探测器型号:

    aws iotevents list-detector-models
  2. 对于每个探测器型号,通过运行以下命令导出其配置:

    aws iotevents describe-detector-model \ --detector-model-name your-detector-model-name
  3. 保存每个探测器型号的输出。

步骤 2:创建 IAM 角色

创建 IAM 角色以提供复制功能的权限 AWS IoT Events。本示例中的角色授予访问 DynamoDB 以进行状态管理 EventBridge、调度、Kinesis Data Streams 用于数据 AWS IoT Core 摄取、发布消息和记录的权限。 CloudWatch 这些服务共同起到替代作用 AWS IoT Events。

  1. 创建具有以下权限的 IAM 角色。有关创建 IAM 角色的更多详细说明,请参阅 I A M 用户指南中的创建角色以向 AWS 服务委派权限

    { "Version": "2012-10-17", "Statement": [ { "Sid": "DynamoDBAccess", "Effect": "Allow", "Action": [ "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:UpdateItem", "dynamodb:DeleteItem", "dynamodb:Query", "dynamodb:Scan" ], "Resource": "arn:aws:dynamodb:your-region:your-account-id:table/EventsStateTable" }, { "Sid": "SchedulerAccess", "Effect": "Allow", "Action": [ "scheduler:CreateSchedule", "scheduler:DeleteSchedule" ], "Resource": "arn:aws:scheduler:your-region:your-account-id:schedule/*" }, { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "arn:aws:kinesis:your-region:your-account-id:stream/*" }, { "Sid": "IoTPublishAccess", "Effect": "Allow", "Action": "iot:Publish", "Resource": "arn:aws:iot:your-region:your-account-id:topic/*" }, { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:your-region:your-account-id:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs::your-account-id:log-group:/aws/lambda/your-lambda:*" ] } ] }
  2. 添加以下 IAM 角色信任策略。信任策略允许指定的 AWS 服务担任 IAM 角色,以便它们可以执行必要的操作。有关创建 IAM 信任策略的更多详细说明,请参阅 I AM 用户指南中的使用自定义信任策略创建角色

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "scheduler.amazonaws.com", "lambda.amazonaws.com", "iot.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }

第 3 步:创建 Amazon Kinesis Data Streams

使用或创建 Amazon Kinesis Data Streams AWS Management Console 。 AWS CLI

Console

要使用创建 Kinesis 数据流 AWS Management Console,请按照《Amazon Kinesis Data Streams 开发者指南》中创建数据页面上的步骤进行操作。

根据您的设备数量和消息负载大小调整分片计数。

AWS CLI

使用 AWS CLI创建 Amazon Kinesis Data Streams,从您的设备中提取和分区数据。

在本次迁移中使用 Kinesis Data Streams 来取代的数据提取功能。 AWS IoT Events它提供了一种可扩展且高效的方法来收集、处理和分析来自物联网设备的实时流数据,同时还提供灵活的数据处理以及与其他 AWS 服务的集成。

aws kinesis create-stream --stream-name your-kinesis-stream-name --shard-count 4 --region your-region

根据您的设备数量和消息负载大小调整分片计数。

步骤 4:创建或更新 MQTT 消息路由规则

您可以创建新的 MQTT 消息路由规则或更新现有规则。

Console
  1. 确定是否需要新的 MQTT 消息路由规则,或者是否可以更新现有规则。

  2. 打开 AWS IoT Core 管理控制台

  3. 在导航窗格中,选择 “消息路由”,然后选择 “规则”。

  4. 管理部分,选择邮件路由,然后选择规则

  5. 选择创建规则

  6. 指定规则属性页面上,输入 AWS IoT Core 规则名称的规则名称。在 “规则描述-可选” 中,输入描述以标识您正在处理事件并将其转发到 Kinesis Data Stre ams。

  7. 在 “配置 SQL 语句” 页上,为 SQL 语句输入以下内容:SELECT * FROM 'your-database',然后选择下一步

  8. 附加规则操作页面和规则操作下,选择 k inesis

  9. 为直播选择你的 Kinesis 直播。对于分区键,输入 your-instance-id。为 IAM 角色选择相应的角色,然后选择添加规则操作

有关更多信息,请参阅创建 AWS IoT 规则以将设备数据路由到其他服务

AWS CLI
  1. 使用以下内容创建 JSON 文件 。此 JSON 配置文件定义了一 AWS IoT Core 条规则,该规则使用实例 ID 作为分区键,从主题中选择所有消息并将其转发到指定的 Kinesis 流。

    { "sql": "SELECT * FROM 'your-config-file'", "description": "Rule to process events and forward to Kinesis Data Streams", "actions": [ { "kinesis": { "streamName": "your-kinesis-stream-name", "roleArn": "arn:aws:iam::your-account-id:role/service-role/your-iam-role", "partitionKey": "${your-instance-id}" } } ], "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23" }
  2. 使用创建 MQTT 主题规则。 AWS CLI此步骤使用使用events_rule.json文件中定义的配置创建 AWS IoT Core 主题规则。 AWS CLI

    aws iot create-topic-rule \ --rule-name "your-iot-core-rule" \ --topic-rule-payload file://your-file-name.json

步骤 5:获取目标 MQTT 主题的终端节点

使用目标 MQTT 主题配置您的主题发布传出消息的位置,取代之前由 AWS IoT Events处理的功能。终端节点是您的 AWS 账户和地区所独有的。

Console
  1. 打开 AWS IoT Core 管理控制台

  2. 在左侧导航面板的 Connect 部分,选择域配置

  3. 选择 IOT: data-ATS 域配置以打开配置的详细信息页面。

  4. 复制域名值。此值是终端节点。保存终端节点值,因为您将在以后的步骤中使用它。

AWS CLI

运行以下命令以获取用于发布账户传出消息的 AWS IoT Core 终端节点。

aws iot describe-endpoint --endpoint-type iot:Data-ATS --region your-region

第 6 步:创建亚马逊 DynamoDB 表

Amazon DynamoDB 表取代了 AWS IoT Events的状态管理功能,为在新解决方案架构中保留和管理设备状态和探测器模型逻辑提供了一种可扩展且灵活的方式。

Console

创建 Amazon DynamoDB 表以保留探测器模型的状态。有关更多信息,请参阅亚马逊 DynamoDB 开发者指南中的在 DynamoDB 中创建表

使用以下内容获取表格详细信息:

  • 表名中,输入您选择的表名。

  • 对于分区密钥,请输入您自己的实例 ID。

  • 您可以使用 “表格” 设置的 “默认” 设置

AWS CLI

运行以下命令创建 DynamoDB 表。

aws dynamodb create-table \ --table-name your-table-name \ --attribute-definitions AttributeName=your-instance-id,AttributeType=S \ --key-schema AttributeName=your-instance-id,KeyType=HASH \

步骤 7:创建 AWS Lambda 函数(控制台)

Lambda 函数用作核心处理引擎,取代了的探测器模型评估逻辑。 AWS IoT Events在示例中,我们与其他 AWS 服务集成,以根据您定义的规则处理传入数据、管理状态和触发操作。

使用运行时创建一个 Lambda 函数。NodeJS使用以下代码片段,替换硬编码的常量:

  1. 打开 AWS Lambda console

  2. 选择创建函数

  3. 输入函数名称的名称

  4. 选择 NodeJS 22.x 作为运行时间。

  5. 更改默认执行角色下拉列表中,选择使用现有角色,然后选择您在之前的步骤中创建的 IAM 角色。

  6. 选择创建函数

  7. 替换硬编码常量后,粘贴以下代码片段。

  8. 函数创建后,在 “代码” 选项卡下粘贴以下代码示例,将端点替换为自己的your-destination-endpoint端点。

import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb'; import { PutItemCommand } from '@aws-sdk/client-dynamodb'; import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane"; import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import //// External Clients and Constants const scheduler = new SchedulerClient({}); const iot = new IoTDataPlaneClient({ endpoint: 'https://your-destination-endpoint-ats.iot.your-region.amazonaws.com/' }); const ddb = new DynamoDBClient({}); //// Lambda Handler function export const handler = async (event) => { console.log('Incoming event:', JSON.stringify(event, null, 2)); if (!event.Records) { throw new Error('No records found in event'); } const processedRecords = []; for (const record of event.Records) { try { if (record.eventSource !== 'aws:kinesis') { console.log(`Skipping non-Kinesis record from ${record.eventSource}`); continue; } // Assumes that we are processing records from Kinesis const payload = record.kinesis.data; const decodedData = Buffer.from(payload, 'base64').toString(); console.log("decoded payload is ", decodedData); const output = await handleDecodedData(decodedData); // Add additional processing logic here const processedData = { output, sequenceNumber: record.kinesis.sequenceNumber, partitionKey: record.kinesis.partitionKey, timestamp: record.kinesis.approximateArrivalTimestamp }; processedRecords.push(processedData); } catch (error) { console.error('Error processing record:', error); console.error('Failed record:', record); // Decide whether to throw error or continue processing other records // throw error; // Uncomment to stop processing on first error } } return { statusCode: 200, body: JSON.stringify({ message: 'Processing complete', processedCount: processedRecords.length, records: processedRecords }) }; }; // Helper function to handle decoded data async function handleDecodedData(payload) { try { // Parse the decoded data const parsedData = JSON.parse(payload); // Extract instanceId const instanceId = parsedData.instanceId; // Parse the input field const inputData = JSON.parse(parsedData.payload); const temperature = inputData.temperature; console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature); await iotEvents.process(instanceId, inputData) return { instanceId, temperature, // Add any other fields you want to return rawInput: inputData }; } catch (error) { console.error('Error handling decoded data:', error); throw error; } } //// Classes for declaring/defining the state machine class CurrentState { constructor(instanceId, stateName, variables, inputs) { this.stateName = stateName; this.variables = variables; this.inputs = inputs; this.instanceId = instanceId } static async load(instanceId) { console.log(`Loading state for id ${instanceId}`); try { const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({ TableName: 'EventsStateTable', Key: { 'InstanceId': { S: `${instanceId}` } } })); const { stateName, variables, inputs } = JSON.parse(stateContent); return new CurrentState(instanceId, stateName, variables, inputs); } catch (e) { console.log(`No state for id ${instanceId}: ${e}`); return undefined; } } static async save(instanceId, state) { console.log(`Saving state for id ${instanceId}`); await ddb.send(new PutItemCommand({ TableName: 'your-events-state-table-name', Item: { 'InstanceId': { S: `${instanceId}` }, 'state': { S: state } } })); } setVariable(name, value) { this.variables[name] = value; } changeState(stateName) { console.log(`Changing state from ${this.stateName} to ${stateName}`); this.stateName = stateName; } async setTimer(instanceId, frequencyInMinutes, payload) { console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`); const base64Payload = Buffer.from(JSON.stringify(payload)).toString(); console.log(base64Payload); const scheduleName = `your-schedule-name-${instanceId}-schedule`; const scheduleParams = { Name: scheduleName, FlexibleTimeWindow: { Mode: 'OFF' }, ScheduleExpression: `rate(${frequencyInMinutes} minutes)`, Target: { Arn: "arn:aws::kinesis:your-region:your-account-id:stream/your-kinesis-stream-name", RoleArn: "arn:aws::iam::your-account-id:role/service-role/your-iam-role", Input: base64Payload, KinesisParameters: { PartitionKey: instanceId, }, RetryPolicy: { MaximumRetryAttempts: 3 } }, }; const command = new CreateScheduleCommand(scheduleParams); console.log(`Sending command to set timer ${JSON.stringify(command)}`); await scheduler.send(command); } async clearTimer(instanceId) { console.log(`Cleaning timer ${instanceId}`); const scheduleName = `your-schedule-name-${instanceId}-schdule`; const command = new DeleteScheduleCommand({ Name: scheduleName }); await scheduler.send(command); } async executeAction(actionType, actionPayload) { console.log(`Will execute the ${actionType} with payload ${actionPayload}`); await iot.send(new PublishCommand({ topic: `${this.instanceId}`, payload: actionPayload, qos: 0 })); } setInput(value) { this.inputs = { ...this.inputs, ...value }; } input(name) { return this.inputs[name]; } } class IoTEvents { constructor(initialState) { this.initialState = initialState; this.states = {}; } state(name) { const state = new IoTEventsState(); this.states[name] = state; return state; } async process(instanceId, input) { let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {}); currentState.setInput(input); console.log(`With inputs as: ${JSON.stringify(currentState)}`); const state = this.states[currentState.stateName]; currentState = await state.evaluate(currentState); console.log(`With output as: ${JSON.stringify(currentState)}`); await CurrentState.save(instanceId, JSON.stringify(currentState)); } } class Event { constructor(condition, action) { this.condition = condition; this.action = action; } } class IoTEventsState { constructor() { this.eventsList = [] } events(eventListArg) { this.eventsList.push(...eventListArg); return this; } async evaluate(currentState) { for (const e of this.eventsList) { console.log(`Evaluating event ${e.condition}`); if (e.condition(currentState)) { console.log(`Event condition met`); // Execute any action as defined in iotEvents DM Definition await e.action(currentState); } } return currentState; } } ////// DetectorModel Definitions - replace with your own defintions let processAlarmStateEvent = new Event( (currentState) => { const source = currentState.input('source'); return ( currentState.input('temperature') < 70 ); }, async (currentState) => { currentState.changeState('normal'); await currentState.clearTimer(currentState.instanceId) await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`); } ); let processTimerEvent = new Event( (currentState) => { const source = currentState.input('source'); console.log(`Evaluating timer event with source ${source}`); const booleanOutput = (source !== undefined && source !== null && typeof source === 'string' && source.toLowerCase() === 'timer' && // check if the currentState == state from the timer payload currentState.input('currentState') !== undefined && currentState.input('currentState') !== null && currentState.input('currentState').toLowerCase !== 'normal'); console.log(`Timer event evaluated as ${booleanOutput}`); return booleanOutput; }, async (currentState) => { await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`); } ); let processNormalEvent = new Event( (currentState) => currentState.input('temperature') > 70, async (currentState) => { currentState.changeState('alarm'); await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`); await currentState.setTimer(currentState.instanceId, 5, { "instanceId": currentState.instanceId, "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}" }); } ); const iotEvents = new IoTEvents('normal'); iotEvents .state('normal') .events( [ processNormalEvent ]); iotEvents .state('alarm') .events([ processAlarmStateEvent, processTimerEvent ] );

第 8 步:添加 Amazon Kinesis Data Streams 触发器

使用或将 Kinesis Data Streams 触发器添加到 Lambda 函数中。 AWS Management Console AWS CLI

在您的 Lambda 函数中添加 Kinesis Data Streams 触发器可在您的数据摄取管道和处理逻辑之间建立连接,使其能够自动评估传入的物联网数据流并实时对事件做出反应,类似于处理输入的方式。 AWS IoT Events

Console

有关更多信息,请参阅AWS Lambda 开发人员指南中的创建事件源映射以调用 Lambda 函数

使用以下内容了解事件源映射的详细信息:

AWS CLI

运行以下命令创建 Lambda 函数触发器。

aws lambda create-event-source-mapping \ --function-name your-lambda-name \ --event-source arn:aws:kinesis:your-region:your-account-id:stream/your-kinesis-stream-name \ --batch-size 10 \ --starting-position LATEST \ --region your-region

步骤 9:测试数据摄取和输出功能 ()AWS CLI

根据您在探测器模型中定义的内容向 MQTT 主题发布有效负载。以下是 MQTT 主题的示例负载your-topic-name,用于测试实现。

{ "instanceId": "your-instance-id", "payload": "{\"temperature\":78}" }

您应该会看到一条发布到主题的 MQTT 消息,其中包含以下(或类似)内容:

{ "state": "alarm detected, timer started" }