中的偵測器模型遷移程序 AWS IoT Events - AWS IoT Events

終止支援通知:2026 年 5 月 20 日, AWS 將終止對 的支援 AWS IoT Events。2026 年 5 月 20 日之後,您將無法再存取 AWS IoT Events 主控台或 AWS IoT Events 資源。如需詳細資訊,請參閱AWS IoT Events 終止支援

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

中的偵測器模型遷移程序 AWS IoT Events

本節說明替代解決方案,可在您遷移時提供類似的偵測器模型功能 AWS IoT Events。

您可以透過 AWS IoT Core 規則將資料擷取遷移至其他服務 AWS 的組合。資料可以路由到 AWS IoT Core MQTT 主題,而不是透過 BatchPutMessage API 擷取資料。

此遷移方法利用 AWS IoT Core MQTT 主題做為 IoT 資料的進入點,取代 的直接輸入 AWS IoT Events。MQTT 主題的選擇有幾個主要原因。由於 MQTT 在業界的廣泛使用,它們提供了與 IoT 裝置的廣泛相容性。這些主題可以處理來自許多裝置的大量訊息,以確保可擴展性。它們也提供根據內容或裝置類型路由和篩選訊息的彈性。此外, AWS IoT Core MQTT 主題與其他 AWS 服務無縫整合,促進遷移程序。

資料從 MQTT 主題流向結合 Amazon Kinesis Data Streams、 AWS Lambda 函數、Amazon DynamoDB 資料表和 Amazon EventBridge 排程的架構。此服務組合會複寫並增強 先前提供的功能 AWS IoT Events,讓您更靈活地控制 IoT 資料處理管道。

比較架構

目前的 AWS IoT Events 架構會透過 AWS IoT Core 規則和 BatchPutMessage API 擷取資料。此架構使用 AWS IoT Core 進行資料擷取和事件發佈,並將訊息透過 AWS IoT Events 輸入路由至定義狀態邏輯的偵測器模型。IAM 角色會管理必要的許可。

新的解決方案 AWS IoT Core 會維護資料擷取 (現在具有專用輸入和輸出 MQTT 主題)。它引入用於資料分割的 Kinesis Data Streams 和用於狀態邏輯的評估器 Lambda 函數。裝置狀態現在會存放在 DynamoDB 資料表中,而增強型 IAM 角色會管理這些服務的許可。

用途 解決方案 差異

資料擷取 – 從 IoT 裝置接收資料

AWS IoT Core

現在需要兩個不同的 MQTT 主題:一個用於擷取裝置資料,另一個用於發佈輸出事件

訊息方向 – 將傳入的訊息路由到適當的服務

AWS IoT Core 訊息路由規則

維持相同的路由功能,但現在會將訊息導向 Kinesis Data Streams,而不是 AWS IoT Events

資料處理 – 處理和組織傳入的資料串流

Kinesis Data Streams

取代 AWS IoT Events 輸入功能,使用裝置 ID 分割提供資料擷取以進行訊息處理

邏輯評估 – 處理狀態變更並觸發動作

評估器 Lambda

取代 AWS IoT Events 偵測器模型,透過程式碼而非視覺化工作流程提供可自訂的狀態邏輯評估

狀態管理 – 維護裝置狀態

DynamoDB 表

提供裝置狀態持久性儲存的新元件,取代內部 AWS IoT Events 狀態管理

安全性 – 管理服務許可

IAM 角色

已更新的許可現在除了現有 AWS IoT Core 許可之外,還包括對 Kinesis Data Streams、DynamoDB 和 EventBridge 的存取

步驟 1:(選用) 匯出 AWS IoT Events 偵測器模型組態

建立新資源之前,請先匯出 AWS IoT Events 偵測器模型定義。這些包含您的事件處理邏輯,可以做為實作新解決方案的歷史參考。

Console

使用 匯出偵測器模型組態時 AWS IoT Events AWS Management Console,請執行下列步驟:

使用 匯出偵測器模型 AWS Management Console
  1. 登入 AWS IoT Events 主控台

  2. 在左側導覽窗格中,選擇 Detector models (偵測器模型)

  3. 選取要匯出的偵測器模型。

  4. 選擇 Export (匯出)。閱讀有關輸出的資訊訊息,然後再次選擇匯出

  5. 針對您要匯出的每個偵測器模型重複此程序。

包含偵測器模型 JSON 輸出的檔案會新增至瀏覽器的下載資料夾。您可以選擇性地儲存每個偵測器模型組態,以保留歷史資料。

AWS CLI

使用 AWS CLI執行下列命令來匯出偵測器模型組態:

使用 匯出偵測器模型 AWS CLI
  1. 列出您帳戶中的所有偵測器模型:

    aws iotevents list-detector-models
  2. 針對每個偵測器模型,執行下列動作來匯出其組態:

    aws iotevents describe-detector-model \ --detector-model-name your-detector-model-name
  3. 儲存每個偵測器模型的輸出。

步驟 2:建立 IAM 角色

建立 IAM 角色以提供複寫 功能的許可 AWS IoT Events。此範例中的角色會授予 DynamoDB 的存取權以進行狀態管理、EventBridge 用於排程、Kinesis Data Streams 用於資料擷取、 AWS IoT Core 用於發佈訊息,以及 CloudWatch 用於記錄。這些服務共同用來取代 AWS IoT Events。

  1. 建立具備下列許可的 IAM 角色。如需建立 IAM 角色的詳細說明,請參閱《IAM 使用者指南》中的建立角色以將許可委派給 AWS 服務

    { "Version": "2012-10-17", "Statement": [ { "Sid": "DynamoDBAccess", "Effect": "Allow", "Action": [ "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:UpdateItem", "dynamodb:DeleteItem", "dynamodb:Query", "dynamodb:Scan" ], "Resource": "arn:aws:dynamodb:your-region:your-account-id:table/EventsStateTable" }, { "Sid": "SchedulerAccess", "Effect": "Allow", "Action": [ "scheduler:CreateSchedule", "scheduler:DeleteSchedule" ], "Resource": "arn:aws:scheduler:your-region:your-account-id:schedule/*" }, { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "arn:aws:kinesis:your-region:your-account-id:stream/*" }, { "Sid": "IoTPublishAccess", "Effect": "Allow", "Action": "iot:Publish", "Resource": "arn:aws:iot:your-region:your-account-id:topic/*" }, { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:your-region:your-account-id:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs::your-account-id:log-group:/aws/lambda/your-lambda:*" ] } ] }
  2. 新增下列 IAM 角色信任政策。信任政策允許指定的 AWS 服務擔任 IAM 角色,以便他們可以執行必要的動作。如需建立 IAM 信任政策的詳細說明,請參閱《IAM 使用者指南》中的使用自訂信任政策建立角色

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "scheduler.amazonaws.com", "lambda.amazonaws.com", "iot.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }

步驟 3:建立 Amazon Kinesis Data Streams

使用 AWS Management Console 或 建立 Amazon Kinesis Data Streams AWS CLI。

Console

若要使用 建立 Kinesis 資料串流 AWS Management Console,請遵循 Amazon Kinesis Data Streams 開發人員指南建立資料串流頁面上的程序。

根據裝置計數和訊息承載大小調整碎片計數。

AWS CLI

使用 AWS CLI建立 Amazon Kinesis Data Streams,從您的裝置擷取和分割資料。

Kinesis Data Streams 用於此遷移,以取代 的資料擷取功能 AWS IoT Events。它提供可擴展且有效率的方式來從 IoT 裝置收集、處理和分析即時串流資料,同時提供靈活的資料處理和與其他 AWS 服務的整合。

aws kinesis create-stream --stream-name your-kinesis-stream-name --shard-count 4 --region your-region

根據裝置計數和訊息承載大小調整碎片計數。

步驟 4:建立或更新 MQTT 訊息路由規則

您可以建立新的 MQTT 訊息路由規則或更新現有的規則。

Console
  1. 判斷您是否需要新的 MQTT 訊息路由規則,或是是否可以更新現有的規則。

  2. 開啟 AWS IoT Core 主控台

  3. 在導覽窗格中,選擇訊息路由,然後選擇規則

  4. 管理區段中,選擇訊息路由,然後選擇規則

  5. 選擇建立規則

  6. 指定規則屬性頁面上,輸入 AWS IoT Core 規則名稱的規則名稱。針對規則描述 - 選用,輸入描述以識別您正在處理事件並將其轉送至 Kinesis Data Streams。

  7. 設定 SQL 陳述式頁面上,輸入 SQL 陳述式的下列項目:SELECT * FROM 'your-database',然後選擇下一步

  8. 連接規則動作頁面上的規則動作下,選擇 kinesis

  9. 選擇串流的 Kinesis 串流。針對分割區索引鍵,輸入 your-instance-id。選取 IAM 角色的適當角色,然後選擇新增規則動作

如需詳細資訊,請參閱建立 AWS IoT 規則以將裝置資料路由至其他 服務

AWS CLI
  1. 使用下列內容建立 JSON 檔案 。此 JSON 組態檔案會定義 AWS IoT Core 規則,從主題中選取所有訊息,並使用執行個體 ID 做為分割區索引鍵將其轉送至指定的 Kinesis 串流。

    { "sql": "SELECT * FROM 'your-config-file'", "description": "Rule to process events and forward to Kinesis Data Streams", "actions": [ { "kinesis": { "streamName": "your-kinesis-stream-name", "roleArn": "arn:aws:iam::your-account-id:role/service-role/your-iam-role", "partitionKey": "${your-instance-id}" } } ], "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23" }
  2. 使用 建立 MQTT 主題規則 AWS CLI。此步驟使用 AWS CLI ,使用 events_rule.json 檔案中定義的組態來建立 AWS IoT Core 主題規則。

    aws iot create-topic-rule \ --rule-name "your-iot-core-rule" \ --topic-rule-payload file://your-file-name.json

步驟 5:取得目的地 MQTT 主題的端點

使用目的地 MQTT 主題來設定主題發佈傳出訊息的位置,取代先前處理的功能 AWS IoT Events。端點對 AWS 您的帳戶和區域是唯一的。

Console
  1. 開啟 AWS IoT Core 主控台

  2. 在左側導覽面板的連線區段中,選擇網域組態

  3. 選擇 iot:Data-ATS 網域組態以開啟組態的詳細資訊頁面。

  4. 複製網域名稱值。此值是端點。儲存端點值,因為您會在後續步驟中需要它。

AWS CLI

執行下列命令以取得 AWS IoT Core 端點,以發佈您帳戶的傳出訊息。

aws iot describe-endpoint --endpoint-type iot:Data-ATS --region your-region

步驟 6:建立 Amazon DynamoDB 資料表

Amazon DynamoDB 資料表會取代 的狀態管理功能 AWS IoT Events,提供可擴展且彈性的方式,在新的解決方案架構中保留和管理裝置狀態和偵測器模型邏輯。

Console

建立 Amazon DynamoDB 資料表以保留偵測器模型的狀態。如需詳細資訊,請參閱《Amazon DynamoDB 開發人員指南》中的在 DynamoDB 中建立資料表 DynamoDB

使用下列表格詳細資訊:

  • 資料表名稱中,輸入您選擇的資料表名稱。

  • 針對分割區索引鍵,輸入您自己的執行個體 ID。

  • 您可以使用資料表設定的預設設定

AWS CLI

執行下列命令來建立 DynamoDB 資料表。

aws dynamodb create-table \ --table-name your-table-name \ --attribute-definitions AttributeName=your-instance-id,AttributeType=S \ --key-schema AttributeName=your-instance-id,KeyType=HASH \

步驟 7:建立 AWS Lambda 函數 (主控台)

Lambda 函數做為核心處理引擎,取代 的偵測器模型評估邏輯 AWS IoT Events。在此範例中,我們與其他 AWS 服務整合,以根據您定義的規則處理傳入資料、管理狀態和觸發動作。

使用NodeJS執行時間建立 Lambda 函數。使用下列程式碼片段,取代硬式編碼常數:

  1. 開啟 AWS Lambda console

  2. 選擇 Create function (建立函數)

  3. 輸入函數名稱的名稱

  4. 選取 NodeJS 22.x 做為執行時間

  5. 變更預設執行角色下拉式清單中,選擇使用現有角色,然後選取您在先前步驟中建立的 IAM 角色。

  6. 選擇 Create function (建立函數)

  7. 在取代硬式編碼常數之後,貼上下列程式碼片段。

  8. 函數建立之後,請在程式碼索引標籤下貼上下列程式碼範例,以您自己的程式碼取代your-destination-endpoint端點。

import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb'; import { PutItemCommand } from '@aws-sdk/client-dynamodb'; import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane"; import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import //// External Clients and Constants const scheduler = new SchedulerClient({}); const iot = new IoTDataPlaneClient({ endpoint: 'https://your-destination-endpoint-ats.iot.your-region.amazonaws.com/' }); const ddb = new DynamoDBClient({}); //// Lambda Handler function export const handler = async (event) => { console.log('Incoming event:', JSON.stringify(event, null, 2)); if (!event.Records) { throw new Error('No records found in event'); } const processedRecords = []; for (const record of event.Records) { try { if (record.eventSource !== 'aws:kinesis') { console.log(`Skipping non-Kinesis record from ${record.eventSource}`); continue; } // Assumes that we are processing records from Kinesis const payload = record.kinesis.data; const decodedData = Buffer.from(payload, 'base64').toString(); console.log("decoded payload is ", decodedData); const output = await handleDecodedData(decodedData); // Add additional processing logic here const processedData = { output, sequenceNumber: record.kinesis.sequenceNumber, partitionKey: record.kinesis.partitionKey, timestamp: record.kinesis.approximateArrivalTimestamp }; processedRecords.push(processedData); } catch (error) { console.error('Error processing record:', error); console.error('Failed record:', record); // Decide whether to throw error or continue processing other records // throw error; // Uncomment to stop processing on first error } } return { statusCode: 200, body: JSON.stringify({ message: 'Processing complete', processedCount: processedRecords.length, records: processedRecords }) }; }; // Helper function to handle decoded data async function handleDecodedData(payload) { try { // Parse the decoded data const parsedData = JSON.parse(payload); // Extract instanceId const instanceId = parsedData.instanceId; // Parse the input field const inputData = JSON.parse(parsedData.payload); const temperature = inputData.temperature; console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature); await iotEvents.process(instanceId, inputData) return { instanceId, temperature, // Add any other fields you want to return rawInput: inputData }; } catch (error) { console.error('Error handling decoded data:', error); throw error; } } //// Classes for declaring/defining the state machine class CurrentState { constructor(instanceId, stateName, variables, inputs) { this.stateName = stateName; this.variables = variables; this.inputs = inputs; this.instanceId = instanceId } static async load(instanceId) { console.log(`Loading state for id ${instanceId}`); try { const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({ TableName: 'EventsStateTable', Key: { 'InstanceId': { S: `${instanceId}` } } })); const { stateName, variables, inputs } = JSON.parse(stateContent); return new CurrentState(instanceId, stateName, variables, inputs); } catch (e) { console.log(`No state for id ${instanceId}: ${e}`); return undefined; } } static async save(instanceId, state) { console.log(`Saving state for id ${instanceId}`); await ddb.send(new PutItemCommand({ TableName: 'your-events-state-table-name', Item: { 'InstanceId': { S: `${instanceId}` }, 'state': { S: state } } })); } setVariable(name, value) { this.variables[name] = value; } changeState(stateName) { console.log(`Changing state from ${this.stateName} to ${stateName}`); this.stateName = stateName; } async setTimer(instanceId, frequencyInMinutes, payload) { console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`); const base64Payload = Buffer.from(JSON.stringify(payload)).toString(); console.log(base64Payload); const scheduleName = `your-schedule-name-${instanceId}-schedule`; const scheduleParams = { Name: scheduleName, FlexibleTimeWindow: { Mode: 'OFF' }, ScheduleExpression: `rate(${frequencyInMinutes} minutes)`, Target: { Arn: "arn:aws::kinesis:your-region:your-account-id:stream/your-kinesis-stream-name", RoleArn: "arn:aws::iam::your-account-id:role/service-role/your-iam-role", Input: base64Payload, KinesisParameters: { PartitionKey: instanceId, }, RetryPolicy: { MaximumRetryAttempts: 3 } }, }; const command = new CreateScheduleCommand(scheduleParams); console.log(`Sending command to set timer ${JSON.stringify(command)}`); await scheduler.send(command); } async clearTimer(instanceId) { console.log(`Cleaning timer ${instanceId}`); const scheduleName = `your-schedule-name-${instanceId}-schdule`; const command = new DeleteScheduleCommand({ Name: scheduleName }); await scheduler.send(command); } async executeAction(actionType, actionPayload) { console.log(`Will execute the ${actionType} with payload ${actionPayload}`); await iot.send(new PublishCommand({ topic: `${this.instanceId}`, payload: actionPayload, qos: 0 })); } setInput(value) { this.inputs = { ...this.inputs, ...value }; } input(name) { return this.inputs[name]; } } class IoTEvents { constructor(initialState) { this.initialState = initialState; this.states = {}; } state(name) { const state = new IoTEventsState(); this.states[name] = state; return state; } async process(instanceId, input) { let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {}); currentState.setInput(input); console.log(`With inputs as: ${JSON.stringify(currentState)}`); const state = this.states[currentState.stateName]; currentState = await state.evaluate(currentState); console.log(`With output as: ${JSON.stringify(currentState)}`); await CurrentState.save(instanceId, JSON.stringify(currentState)); } } class Event { constructor(condition, action) { this.condition = condition; this.action = action; } } class IoTEventsState { constructor() { this.eventsList = [] } events(eventListArg) { this.eventsList.push(...eventListArg); return this; } async evaluate(currentState) { for (const e of this.eventsList) { console.log(`Evaluating event ${e.condition}`); if (e.condition(currentState)) { console.log(`Event condition met`); // Execute any action as defined in iotEvents DM Definition await e.action(currentState); } } return currentState; } } ////// DetectorModel Definitions - replace with your own defintions let processAlarmStateEvent = new Event( (currentState) => { const source = currentState.input('source'); return ( currentState.input('temperature') < 70 ); }, async (currentState) => { currentState.changeState('normal'); await currentState.clearTimer(currentState.instanceId) await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`); } ); let processTimerEvent = new Event( (currentState) => { const source = currentState.input('source'); console.log(`Evaluating timer event with source ${source}`); const booleanOutput = (source !== undefined && source !== null && typeof source === 'string' && source.toLowerCase() === 'timer' && // check if the currentState == state from the timer payload currentState.input('currentState') !== undefined && currentState.input('currentState') !== null && currentState.input('currentState').toLowerCase !== 'normal'); console.log(`Timer event evaluated as ${booleanOutput}`); return booleanOutput; }, async (currentState) => { await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`); } ); let processNormalEvent = new Event( (currentState) => currentState.input('temperature') > 70, async (currentState) => { currentState.changeState('alarm'); await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`); await currentState.setTimer(currentState.instanceId, 5, { "instanceId": currentState.instanceId, "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}" }); } ); const iotEvents = new IoTEvents('normal'); iotEvents .state('normal') .events( [ processNormalEvent ]); iotEvents .state('alarm') .events([ processAlarmStateEvent, processTimerEvent ] );

步驟 8:新增 Amazon Kinesis Data Streams 觸發條件

使用 AWS Management Console 或 將 Kinesis Data Streams 觸發新增至 Lambda 函數 AWS CLI。

將 Kinesis Data Streams 觸發新增至 Lambda 函數會建立資料擷取管道與處理邏輯之間的連線,讓它自動評估傳入的 IoT 資料串流,並即時對事件做出反應,類似於 AWS IoT Events 處理輸入的方式。

Console

如需詳細資訊,請參閱《 AWS Lambda 開發人員指南》中的建立事件來源映射以叫用 Lambda 函數

針對事件來源映射詳細資訊,請使用下列項目:

AWS CLI

執行下列命令來建立 Lambda 函數觸發。

aws lambda create-event-source-mapping \ --function-name your-lambda-name \ --event-source arn:aws:kinesis:your-region:your-account-id:stream/your-kinesis-stream-name \ --batch-size 10 \ --starting-position LATEST \ --region your-region

步驟 9:測試資料擷取和輸出功能 (AWS CLI)

根據您在偵測器模型中定義的內容,將承載發佈至 MQTT 主題。以下是 MQTT 主題的範例承載,your-topic-name用於測試實作。

{ "instanceId": "your-instance-id", "payload": "{\"temperature\":78}" }

您應該會看到 MQTT 訊息發佈至具有下列 (或類似) 內容的主題:

{ "state": "alarm detected, timer started" }