終止支援通知:2026 年 5 月 20 日, AWS 將終止對 的支援 AWS IoT Events。2026 年 5 月 20 日之後,您將無法再存取 AWS IoT Events 主控台或 AWS IoT Events 資源。如需詳細資訊,請參閱AWS IoT Events 終止支援。
本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
中的偵測器模型遷移程序 AWS IoT Events
本節說明替代解決方案,可在您遷移時提供類似的偵測器模型功能 AWS IoT Events。
您可以透過 AWS IoT Core 規則將資料擷取遷移至其他服務 AWS 的組合。資料可以路由到 AWS IoT Core MQTT 主題,而不是透過 BatchPutMessage API 擷取資料。
此遷移方法利用 AWS IoT Core MQTT 主題做為 IoT 資料的進入點,取代 的直接輸入 AWS IoT Events。MQTT 主題的選擇有幾個主要原因。由於 MQTT 在業界的廣泛使用,它們提供了與 IoT 裝置的廣泛相容性。這些主題可以處理來自許多裝置的大量訊息,以確保可擴展性。它們也提供根據內容或裝置類型路由和篩選訊息的彈性。此外, AWS IoT Core MQTT 主題與其他 AWS 服務無縫整合,促進遷移程序。
資料從 MQTT 主題流向結合 Amazon Kinesis Data Streams、 AWS Lambda 函數、Amazon DynamoDB 資料表和 Amazon EventBridge 排程的架構。此服務組合會複寫並增強 先前提供的功能 AWS IoT Events,讓您更靈活地控制 IoT 資料處理管道。
比較架構
目前的 AWS IoT Events 架構會透過 AWS IoT Core 規則和 BatchPutMessage
API 擷取資料。此架構使用 AWS IoT Core 進行資料擷取和事件發佈,並將訊息透過 AWS IoT Events 輸入路由至定義狀態邏輯的偵測器模型。IAM 角色會管理必要的許可。
新的解決方案 AWS IoT Core 會維護資料擷取 (現在具有專用輸入和輸出 MQTT 主題)。它引入用於資料分割的 Kinesis Data Streams 和用於狀態邏輯的評估器 Lambda 函數。裝置狀態現在會存放在 DynamoDB 資料表中,而增強型 IAM 角色會管理這些服務的許可。
用途 | 解決方案 | 差異 |
---|---|---|
資料擷取 – 從 IoT 裝置接收資料 |
AWS IoT Core |
現在需要兩個不同的 MQTT 主題:一個用於擷取裝置資料,另一個用於發佈輸出事件 |
訊息方向 – 將傳入的訊息路由到適當的服務 |
AWS IoT Core 訊息路由規則 |
維持相同的路由功能,但現在會將訊息導向 Kinesis Data Streams,而不是 AWS IoT Events |
資料處理 – 處理和組織傳入的資料串流 |
Kinesis Data Streams |
取代 AWS IoT Events 輸入功能,使用裝置 ID 分割提供資料擷取以進行訊息處理 |
邏輯評估 – 處理狀態變更並觸發動作 |
評估器 Lambda |
取代 AWS IoT Events 偵測器模型,透過程式碼而非視覺化工作流程提供可自訂的狀態邏輯評估 |
狀態管理 – 維護裝置狀態 |
DynamoDB 表 |
提供裝置狀態持久性儲存的新元件,取代內部 AWS IoT Events 狀態管理 |
安全性 – 管理服務許可 |
IAM 角色 |
已更新的許可現在除了現有 AWS IoT Core 許可之外,還包括對 Kinesis Data Streams、DynamoDB 和 EventBridge 的存取 |
步驟 1:(選用) 匯出 AWS IoT Events 偵測器模型組態
建立新資源之前,請先匯出 AWS IoT Events 偵測器模型定義。這些包含您的事件處理邏輯,可以做為實作新解決方案的歷史參考。
步驟 2:建立 IAM 角色
建立 IAM 角色以提供複寫 功能的許可 AWS IoT Events。此範例中的角色會授予 DynamoDB 的存取權以進行狀態管理、EventBridge 用於排程、Kinesis Data Streams 用於資料擷取、 AWS IoT Core 用於發佈訊息,以及 CloudWatch 用於記錄。這些服務共同用來取代 AWS IoT Events。
-
建立具備下列許可的 IAM 角色。如需建立 IAM 角色的詳細說明,請參閱《IAM 使用者指南》中的建立角色以將許可委派給 AWS 服務。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "DynamoDBAccess", "Effect": "Allow", "Action": [ "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:UpdateItem", "dynamodb:DeleteItem", "dynamodb:Query", "dynamodb:Scan" ], "Resource": "arn:aws:dynamodb:
your-region
:your-account-id
:table/EventsStateTable" }, { "Sid": "SchedulerAccess", "Effect": "Allow", "Action": [ "scheduler:CreateSchedule", "scheduler:DeleteSchedule" ], "Resource": "arn:aws:scheduler:your-region
:your-account-id
:schedule/*" }, { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "arn:aws:kinesis:your-region
:your-account-id
:stream/*" }, { "Sid": "IoTPublishAccess", "Effect": "Allow", "Action": "iot:Publish", "Resource": "arn:aws:iot:your-region
:your-account-id
:topic/*" }, { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:your-region
:your-account-id
:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs::your-account-id
:log-group:/aws/lambda/your-lambda
:*" ] } ] } -
新增下列 IAM 角色信任政策。信任政策允許指定的 AWS 服務擔任 IAM 角色,以便他們可以執行必要的動作。如需建立 IAM 信任政策的詳細說明,請參閱《IAM 使用者指南》中的使用自訂信任政策建立角色。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "scheduler.amazonaws.com", "lambda.amazonaws.com", "iot.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }
步驟 3:建立 Amazon Kinesis Data Streams
使用 AWS Management Console 或 建立 Amazon Kinesis Data Streams AWS CLI。
步驟 4:建立或更新 MQTT 訊息路由規則
您可以建立新的 MQTT 訊息路由規則或更新現有的規則。
步驟 5:取得目的地 MQTT 主題的端點
使用目的地 MQTT 主題來設定主題發佈傳出訊息的位置,取代先前處理的功能 AWS IoT Events。端點對 AWS 您的帳戶和區域是唯一的。
步驟 6:建立 Amazon DynamoDB 資料表
Amazon DynamoDB 資料表會取代 的狀態管理功能 AWS IoT Events,提供可擴展且彈性的方式,在新的解決方案架構中保留和管理裝置狀態和偵測器模型邏輯。
步驟 7:建立 AWS Lambda 函數 (主控台)
Lambda 函數做為核心處理引擎,取代 的偵測器模型評估邏輯 AWS IoT Events。在此範例中,我們與其他 AWS 服務整合,以根據您定義的規則處理傳入資料、管理狀態和觸發動作。
使用NodeJS執行時間建立 Lambda 函數。使用下列程式碼片段,取代硬式編碼常數:
-
選擇 Create function (建立函數)。
-
輸入函數名稱的名稱。
-
選取 NodeJS 22.x 做為執行時間。
-
在變更預設執行角色下拉式清單中,選擇使用現有角色,然後選取您在先前步驟中建立的 IAM 角色。
-
選擇 Create function (建立函數)。
-
在取代硬式編碼常數之後,貼上下列程式碼片段。
-
函數建立之後,請在程式碼索引標籤下貼上下列程式碼範例,以您自己的程式碼取代
your-destination-endpoint
端點。
import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb'; import { PutItemCommand } from '@aws-sdk/client-dynamodb'; import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane"; import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import //// External Clients and Constants const scheduler = new SchedulerClient({}); const iot = new IoTDataPlaneClient({ endpoint: 'https://
your-destination-endpoint
-ats.iot.your-region
.amazonaws.com/' }); const ddb = new DynamoDBClient({}); //// Lambda Handler function export const handler = async (event) => { console.log('Incoming event:', JSON.stringify(event, null, 2)); if (!event.Records) { throw new Error('No records found in event'); } const processedRecords = []; for (const record of event.Records) { try { if (record.eventSource !== 'aws:kinesis') { console.log(`Skipping non-Kinesis record from ${record.eventSource}`); continue; } // Assumes that we are processing records from Kinesis const payload = record.kinesis.data; const decodedData = Buffer.from(payload, 'base64').toString(); console.log("decoded payload is ", decodedData); const output = await handleDecodedData(decodedData); // Add additional processing logic here const processedData = { output, sequenceNumber: record.kinesis.sequenceNumber, partitionKey: record.kinesis.partitionKey, timestamp: record.kinesis.approximateArrivalTimestamp }; processedRecords.push(processedData); } catch (error) { console.error('Error processing record:', error); console.error('Failed record:', record); // Decide whether to throw error or continue processing other records // throw error; // Uncomment to stop processing on first error } } return { statusCode: 200, body: JSON.stringify({ message: 'Processing complete', processedCount: processedRecords.length, records: processedRecords }) }; }; // Helper function to handle decoded data async function handleDecodedData(payload) { try { // Parse the decoded data const parsedData = JSON.parse(payload); // Extract instanceId const instanceId = parsedData.instanceId; // Parse the input field const inputData = JSON.parse(parsedData.payload); const temperature = inputData.temperature; console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature); await iotEvents.process(instanceId, inputData) return { instanceId, temperature, // Add any other fields you want to return rawInput: inputData }; } catch (error) { console.error('Error handling decoded data:', error); throw error; } } //// Classes for declaring/defining the state machine class CurrentState { constructor(instanceId, stateName, variables, inputs) { this.stateName = stateName; this.variables = variables; this.inputs = inputs; this.instanceId = instanceId } static async load(instanceId) { console.log(`Loading state for id ${instanceId}`); try { const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({ TableName: 'EventsStateTable', Key: { 'InstanceId': { S: `${instanceId}` } } })); const { stateName, variables, inputs } = JSON.parse(stateContent); return new CurrentState(instanceId, stateName, variables, inputs); } catch (e) { console.log(`No state for id ${instanceId}: ${e}`); return undefined; } } static async save(instanceId, state) { console.log(`Saving state for id ${instanceId}`); await ddb.send(new PutItemCommand({ TableName: 'your-events-state-table-name
', Item: { 'InstanceId': { S: `${instanceId}` }, 'state': { S: state } } })); } setVariable(name, value) { this.variables[name] = value; } changeState(stateName) { console.log(`Changing state from ${this.stateName} to ${stateName}`); this.stateName = stateName; } async setTimer(instanceId, frequencyInMinutes, payload) { console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`); const base64Payload = Buffer.from(JSON.stringify(payload)).toString(); console.log(base64Payload); const scheduleName = `your-schedule-name
-${instanceId}-schedule`; const scheduleParams = { Name: scheduleName, FlexibleTimeWindow: { Mode: 'OFF' }, ScheduleExpression: `rate(${frequencyInMinutes} minutes)`, Target: { Arn: "arn:aws::kinesis:your-region
:your-account-id
:stream/your-kinesis-stream-name
", RoleArn: "arn:aws::iam::your-account-id
:role/service-role/your-iam-role
", Input: base64Payload, KinesisParameters: { PartitionKey: instanceId, }, RetryPolicy: { MaximumRetryAttempts: 3 } }, }; const command = new CreateScheduleCommand(scheduleParams); console.log(`Sending command to set timer ${JSON.stringify(command)}`); await scheduler.send(command); } async clearTimer(instanceId) { console.log(`Cleaning timer ${instanceId}`); const scheduleName = `your-schedule-name
-${instanceId}-schdule`; const command = new DeleteScheduleCommand({ Name: scheduleName }); await scheduler.send(command); } async executeAction(actionType, actionPayload) { console.log(`Will execute the ${actionType} with payload ${actionPayload}`); await iot.send(new PublishCommand({ topic: `${this.instanceId}`, payload: actionPayload, qos: 0 })); } setInput(value) { this.inputs = { ...this.inputs, ...value }; } input(name) { return this.inputs[name]; } } class IoTEvents { constructor(initialState) { this.initialState = initialState; this.states = {}; } state(name) { const state = new IoTEventsState(); this.states[name] = state; return state; } async process(instanceId, input) { let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {}); currentState.setInput(input); console.log(`With inputs as: ${JSON.stringify(currentState)}`); const state = this.states[currentState.stateName]; currentState = await state.evaluate(currentState); console.log(`With output as: ${JSON.stringify(currentState)}`); await CurrentState.save(instanceId, JSON.stringify(currentState)); } } class Event { constructor(condition, action) { this.condition = condition; this.action = action; } } class IoTEventsState { constructor() { this.eventsList = [] } events(eventListArg) { this.eventsList.push(...eventListArg); return this; } async evaluate(currentState) { for (const e of this.eventsList) { console.log(`Evaluating event ${e.condition}`); if (e.condition(currentState)) { console.log(`Event condition met`); // Execute any action as defined in iotEvents DM Definition await e.action(currentState); } } return currentState; } } ////// DetectorModel Definitions - replace with your own defintionslet processAlarmStateEvent = new Event( (currentState) => { const source = currentState.input('source'); return ( currentState.input('temperature') < 70 ); }, async (currentState) => { currentState.changeState('normal'); await currentState.clearTimer(currentState.instanceId) await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`); } ); let processTimerEvent = new Event( (currentState) => { const source = currentState.input('source'); console.log(`Evaluating timer event with source ${source}`); const booleanOutput = (source !== undefined && source !== null && typeof source === 'string' && source.toLowerCase() === 'timer' && // check if the currentState == state from the timer payload currentState.input('currentState') !== undefined && currentState.input('currentState') !== null && currentState.input('currentState').toLowerCase !== 'normal'); console.log(`Timer event evaluated as ${booleanOutput}`); return booleanOutput; }, async (currentState) => { await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`); } ); let processNormalEvent = new Event( (currentState) => currentState.input('temperature') > 70, async (currentState) => { currentState.changeState('alarm'); await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`); await currentState.setTimer(currentState.instanceId, 5, { "instanceId": currentState.instanceId, "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}" }); } );
const iotEvents = new IoTEvents('normal'); iotEvents .state('normal') .events( [ processNormalEvent ]); iotEvents .state('alarm') .events([ processAlarmStateEvent, processTimerEvent ] );
步驟 8:新增 Amazon Kinesis Data Streams 觸發條件
使用 AWS Management Console 或 將 Kinesis Data Streams 觸發新增至 Lambda 函數 AWS CLI。
將 Kinesis Data Streams 觸發新增至 Lambda 函數會建立資料擷取管道與處理邏輯之間的連線,讓它自動評估傳入的 IoT 資料串流,並即時對事件做出反應,類似於 AWS IoT Events 處理輸入的方式。
步驟 9:測試資料擷取和輸出功能 (AWS CLI)
根據您在偵測器模型中定義的內容,將承載發佈至 MQTT 主題。以下是 MQTT 主題的範例承載,your-topic-name
用於測試實作。
{ "instanceId": "your-instance-id", "payload": "{\"temperature\":78}" }
您應該會看到 MQTT 訊息發佈至具有下列 (或類似) 內容的主題:
{ "state": "alarm detected, timer started" }