终止支持通知:2026 年 5 月 20 日, AWS 将终止对的支持。 AWS IoT Events 2026 年 5 月 20 日之后,您将无法再访问 AWS IoT Events 控制台或 AWS IoT Events 资源。有关更多信息,请参阅AWS IoT Events 终止支持。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
中探测器模型的迁移程序 AWS IoT Events
本节介绍替代解决方案,这些解决方案可在您迁移到其他探测器模型时提供类似的探测器模型功能 AWS IoT Events。
您可以通过 AWS IoT Core 规则将数据摄取迁移到其他 AWS 服务的组合。可以将数据路由到 MQT BatchPutMessageT 主题,而不是通过 API 获取数据。 AWS IoT Core
这种迁移方法利用 AWS IoT Core MQTT 主题作为物联网数据的入口点,取代了直接输入。 AWS IoT Events选择 MQTT 主题有几个关键原因。由于 MQTT 在行业中的广泛使用,它们与物联网设备具有广泛的兼容性。这些主题可以处理来自众多设备的大量消息,从而确保可扩展性。它们还允许根据内容或设备类型灵活地路由和筛选消息。此外, AWS IoT Core MQTT 主题与其他 AWS 服务无缝集成,从而简化了迁移过程。
数据从 MQTT 主题流入一个架构,该架构结合了 Amazon Kinesis Data Streams、 AWS Lambda 一个函数、一个亚马逊 DynamoDB 表和亚马逊计划。 EventBridge 这种服务组合复制并增强了以前提供的功能 AWS IoT Events,使您可以更加灵活地控制物联网数据处理管道。
比较架构
当前 AWS IoT Events 架构通过 AWS IoT Core 规则和 BatchPutMessage
API 提取数据。该架构 AWS IoT Core 用于数据摄取和事件发布,消息通过 AWS IoT Events 输入路由到定义状态逻辑的探测器模型。IAM 角色管理必要的权限。
新的解决方案 AWS IoT Core 用于数据摄取(现在有专门的输入和输出 MQTT 主题)。它引入了用于数据分区的 Kinesis Data Streams 和用于状态逻辑的评估器 Lambda 函数。设备状态现在存储在 DynamoDB 表中,增强的 IAM 角色管理这些服务的权限。
用途 | 解决方案 | 差异 |
---|---|---|
数据摄取 — 从物联网设备接收数据 |
AWS IoT Core |
现在需要两个不同的 MQTT 主题:一个用于摄取设备数据,另一个用于发布输出事件 |
消息方向-将传入的消息路由到相应的服务 |
AWS IoT Core 邮件路由规则 |
保持相同的路由功能,但现在将消息定向到 Kinesis Data Streams 而不是 AWS IoT Events |
数据处理-处理和组织传入的数据流 |
Kinesis Data Streams |
取代 AWS IoT Events 输入功能,使用设备 ID 分区提供数据摄取,用于消息处理 |
逻辑评估-处理状态变化并触发操作 |
评估者 Lambda |
取代 AWS IoT Events 探测器模型,通过代码而不是可视化工作流程提供可自定义的状态逻辑评估 |
状态管理-维护设备状态 |
DynamoDB 表 |
提供设备状态永久存储的新组件,取代了内部 AWS IoT Events 状态管理 |
安全-管理服务权限 |
IAM 角色 |
更新后的权限现在包括访问 Kinesis Data Streams、DynamoDB 以及 EventBridge 现有权限 AWS IoT Core |
步骤 1:(可选)导出 AWS IoT Events 探测器型号配置
在创建新资源之前,请导出您的 AWS IoT Events 探测器模型定义。它们包含您的事件处理逻辑,可以作为实施新解决方案的历史参考。
步骤 2:创建 IAM 角色
创建 IAM 角色以提供复制功能的权限 AWS IoT Events。本示例中的角色授予访问 DynamoDB 以进行状态管理 EventBridge、调度、Kinesis Data Streams 用于数据 AWS IoT Core 摄取、发布消息和记录的权限。 CloudWatch 这些服务共同起到替代作用 AWS IoT Events。
-
创建具有以下权限的 IAM 角色。有关创建 IAM 角色的更多详细说明,请参阅 I A M 用户指南中的创建角色以向 AWS 服务委派权限。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "DynamoDBAccess", "Effect": "Allow", "Action": [ "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:UpdateItem", "dynamodb:DeleteItem", "dynamodb:Query", "dynamodb:Scan" ], "Resource": "arn:aws:dynamodb:
your-region
:your-account-id
:table/EventsStateTable" }, { "Sid": "SchedulerAccess", "Effect": "Allow", "Action": [ "scheduler:CreateSchedule", "scheduler:DeleteSchedule" ], "Resource": "arn:aws:scheduler:your-region
:your-account-id
:schedule/*" }, { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "arn:aws:kinesis:your-region
:your-account-id
:stream/*" }, { "Sid": "IoTPublishAccess", "Effect": "Allow", "Action": "iot:Publish", "Resource": "arn:aws:iot:your-region
:your-account-id
:topic/*" }, { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:your-region
:your-account-id
:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs::your-account-id
:log-group:/aws/lambda/your-lambda
:*" ] } ] } -
添加以下 IAM 角色信任策略。信任策略允许指定的 AWS 服务担任 IAM 角色,以便它们可以执行必要的操作。有关创建 IAM 信任策略的更多详细说明,请参阅 I AM 用户指南中的使用自定义信任策略创建角色。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "scheduler.amazonaws.com", "lambda.amazonaws.com", "iot.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }
第 3 步:创建 Amazon Kinesis Data Streams
使用或创建 Amazon Kinesis Data Streams AWS Management Console 。 AWS CLI
步骤 4:创建或更新 MQTT 消息路由规则
您可以创建新的 MQTT 消息路由规则或更新现有规则。
步骤 5:获取目标 MQTT 主题的终端节点
使用目标 MQTT 主题配置您的主题发布传出消息的位置,取代之前由 AWS IoT Events处理的功能。终端节点是您的 AWS 账户和地区所独有的。
第 6 步:创建亚马逊 DynamoDB 表
Amazon DynamoDB 表取代了 AWS IoT Events的状态管理功能,为在新解决方案架构中保留和管理设备状态和探测器模型逻辑提供了一种可扩展且灵活的方式。
步骤 7:创建 AWS Lambda 函数(控制台)
Lambda 函数用作核心处理引擎,取代了的探测器模型评估逻辑。 AWS IoT Events在示例中,我们与其他 AWS 服务集成,以根据您定义的规则处理传入数据、管理状态和触发操作。
使用运行时创建一个 Lambda 函数。NodeJS使用以下代码片段,替换硬编码的常量:
-
选择创建函数。
-
输入函数名称的名称。
-
选择 NodeJS 22.x 作为运行时间。
-
在更改默认执行角色下拉列表中,选择使用现有角色,然后选择您在之前的步骤中创建的 IAM 角色。
-
选择创建函数。
-
替换硬编码常量后,粘贴以下代码片段。
-
函数创建后,在 “代码” 选项卡下粘贴以下代码示例,将端点替换为自己的
your-destination-endpoint
端点。
import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb'; import { PutItemCommand } from '@aws-sdk/client-dynamodb'; import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane"; import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import //// External Clients and Constants const scheduler = new SchedulerClient({}); const iot = new IoTDataPlaneClient({ endpoint: 'https://
your-destination-endpoint
-ats.iot.your-region
.amazonaws.com/' }); const ddb = new DynamoDBClient({}); //// Lambda Handler function export const handler = async (event) => { console.log('Incoming event:', JSON.stringify(event, null, 2)); if (!event.Records) { throw new Error('No records found in event'); } const processedRecords = []; for (const record of event.Records) { try { if (record.eventSource !== 'aws:kinesis') { console.log(`Skipping non-Kinesis record from ${record.eventSource}`); continue; } // Assumes that we are processing records from Kinesis const payload = record.kinesis.data; const decodedData = Buffer.from(payload, 'base64').toString(); console.log("decoded payload is ", decodedData); const output = await handleDecodedData(decodedData); // Add additional processing logic here const processedData = { output, sequenceNumber: record.kinesis.sequenceNumber, partitionKey: record.kinesis.partitionKey, timestamp: record.kinesis.approximateArrivalTimestamp }; processedRecords.push(processedData); } catch (error) { console.error('Error processing record:', error); console.error('Failed record:', record); // Decide whether to throw error or continue processing other records // throw error; // Uncomment to stop processing on first error } } return { statusCode: 200, body: JSON.stringify({ message: 'Processing complete', processedCount: processedRecords.length, records: processedRecords }) }; }; // Helper function to handle decoded data async function handleDecodedData(payload) { try { // Parse the decoded data const parsedData = JSON.parse(payload); // Extract instanceId const instanceId = parsedData.instanceId; // Parse the input field const inputData = JSON.parse(parsedData.payload); const temperature = inputData.temperature; console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature); await iotEvents.process(instanceId, inputData) return { instanceId, temperature, // Add any other fields you want to return rawInput: inputData }; } catch (error) { console.error('Error handling decoded data:', error); throw error; } } //// Classes for declaring/defining the state machine class CurrentState { constructor(instanceId, stateName, variables, inputs) { this.stateName = stateName; this.variables = variables; this.inputs = inputs; this.instanceId = instanceId } static async load(instanceId) { console.log(`Loading state for id ${instanceId}`); try { const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({ TableName: 'EventsStateTable', Key: { 'InstanceId': { S: `${instanceId}` } } })); const { stateName, variables, inputs } = JSON.parse(stateContent); return new CurrentState(instanceId, stateName, variables, inputs); } catch (e) { console.log(`No state for id ${instanceId}: ${e}`); return undefined; } } static async save(instanceId, state) { console.log(`Saving state for id ${instanceId}`); await ddb.send(new PutItemCommand({ TableName: 'your-events-state-table-name
', Item: { 'InstanceId': { S: `${instanceId}` }, 'state': { S: state } } })); } setVariable(name, value) { this.variables[name] = value; } changeState(stateName) { console.log(`Changing state from ${this.stateName} to ${stateName}`); this.stateName = stateName; } async setTimer(instanceId, frequencyInMinutes, payload) { console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`); const base64Payload = Buffer.from(JSON.stringify(payload)).toString(); console.log(base64Payload); const scheduleName = `your-schedule-name
-${instanceId}-schedule`; const scheduleParams = { Name: scheduleName, FlexibleTimeWindow: { Mode: 'OFF' }, ScheduleExpression: `rate(${frequencyInMinutes} minutes)`, Target: { Arn: "arn:aws::kinesis:your-region
:your-account-id
:stream/your-kinesis-stream-name
", RoleArn: "arn:aws::iam::your-account-id
:role/service-role/your-iam-role
", Input: base64Payload, KinesisParameters: { PartitionKey: instanceId, }, RetryPolicy: { MaximumRetryAttempts: 3 } }, }; const command = new CreateScheduleCommand(scheduleParams); console.log(`Sending command to set timer ${JSON.stringify(command)}`); await scheduler.send(command); } async clearTimer(instanceId) { console.log(`Cleaning timer ${instanceId}`); const scheduleName = `your-schedule-name
-${instanceId}-schdule`; const command = new DeleteScheduleCommand({ Name: scheduleName }); await scheduler.send(command); } async executeAction(actionType, actionPayload) { console.log(`Will execute the ${actionType} with payload ${actionPayload}`); await iot.send(new PublishCommand({ topic: `${this.instanceId}`, payload: actionPayload, qos: 0 })); } setInput(value) { this.inputs = { ...this.inputs, ...value }; } input(name) { return this.inputs[name]; } } class IoTEvents { constructor(initialState) { this.initialState = initialState; this.states = {}; } state(name) { const state = new IoTEventsState(); this.states[name] = state; return state; } async process(instanceId, input) { let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {}); currentState.setInput(input); console.log(`With inputs as: ${JSON.stringify(currentState)}`); const state = this.states[currentState.stateName]; currentState = await state.evaluate(currentState); console.log(`With output as: ${JSON.stringify(currentState)}`); await CurrentState.save(instanceId, JSON.stringify(currentState)); } } class Event { constructor(condition, action) { this.condition = condition; this.action = action; } } class IoTEventsState { constructor() { this.eventsList = [] } events(eventListArg) { this.eventsList.push(...eventListArg); return this; } async evaluate(currentState) { for (const e of this.eventsList) { console.log(`Evaluating event ${e.condition}`); if (e.condition(currentState)) { console.log(`Event condition met`); // Execute any action as defined in iotEvents DM Definition await e.action(currentState); } } return currentState; } } ////// DetectorModel Definitions - replace with your own defintionslet processAlarmStateEvent = new Event( (currentState) => { const source = currentState.input('source'); return ( currentState.input('temperature') < 70 ); }, async (currentState) => { currentState.changeState('normal'); await currentState.clearTimer(currentState.instanceId) await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`); } ); let processTimerEvent = new Event( (currentState) => { const source = currentState.input('source'); console.log(`Evaluating timer event with source ${source}`); const booleanOutput = (source !== undefined && source !== null && typeof source === 'string' && source.toLowerCase() === 'timer' && // check if the currentState == state from the timer payload currentState.input('currentState') !== undefined && currentState.input('currentState') !== null && currentState.input('currentState').toLowerCase !== 'normal'); console.log(`Timer event evaluated as ${booleanOutput}`); return booleanOutput; }, async (currentState) => { await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`); } ); let processNormalEvent = new Event( (currentState) => currentState.input('temperature') > 70, async (currentState) => { currentState.changeState('alarm'); await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`); await currentState.setTimer(currentState.instanceId, 5, { "instanceId": currentState.instanceId, "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}" }); } );
const iotEvents = new IoTEvents('normal'); iotEvents .state('normal') .events( [ processNormalEvent ]); iotEvents .state('alarm') .events([ processAlarmStateEvent, processTimerEvent ] );
第 8 步:添加 Amazon Kinesis Data Streams 触发器
使用或将 Kinesis Data Streams 触发器添加到 Lambda 函数中。 AWS Management Console AWS CLI
在您的 Lambda 函数中添加 Kinesis Data Streams 触发器可在您的数据摄取管道和处理逻辑之间建立连接,使其能够自动评估传入的物联网数据流并实时对事件做出反应,类似于处理输入的方式。 AWS IoT Events
步骤 9:测试数据摄取和输出功能 ()AWS CLI
根据您在探测器模型中定义的内容向 MQTT 主题发布有效负载。以下是 MQTT 主题的示例负载your-topic-name
,用于测试实现。
{ "instanceId": "your-instance-id", "payload": "{\"temperature\":78}" }
您应该会看到一条发布到主题的 MQTT 消息,其中包含以下(或类似)内容:
{ "state": "alarm detected, timer started" }