サポート終了通知: 2026 年 5 月 20 日に、 AWS は のサポートを終了します AWS IoT Events。2026 年 5 月 20 日以降、 AWS IoT Events コンソールまたは AWS IoT Events リソースにアクセスできなくなります。詳細については、AWS IoT Events 「サポート終了」を参照してください。
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
のディテクターモデルの移行手順 AWS IoT Events
このセクションでは、移行時に同様のディテクターモデル機能を提供する代替ソリューションについて説明します AWS IoT Events。
AWS IoT Core ルールによるデータインジェストは、他の AWS サービスの組み合わせに移行できます。BatchPutMessage API によるデータ取り込みの代わりに、データを AWS IoT Core MQTT トピックにルーティングできます。
この移行アプローチでは、 AWS IoT Core MQTT トピックを IoT データのエントリポイントとして活用し、 への直接入力を置き換えます AWS IoT Events。MQTT トピックは、いくつかの主な理由で選択されます。MQTT は業界で広く使用されているため、IoT デバイスとの幅広い互換性を提供します。これらのトピックでは、多数のデバイスからの大量のメッセージを処理できるため、スケーラビリティが確保されます。また、コンテンツまたはデバイスタイプに基づいてメッセージのルーティングとフィルタリングを柔軟に行うことができます。さらに、 AWS IoT Core MQTT トピックは他の AWS サービスとシームレスに統合されるため、移行プロセスが容易になります。
MQTT トピックから Amazon Kinesis Data Streams、 AWS Lambda 関数、Amazon DynamoDB テーブル、Amazon EventBridge スケジュールを組み合わせたアーキテクチャにデータが流れます。このサービスの組み合わせにより、 によって以前に提供された機能がレプリケートおよび強化され AWS IoT Events、IoT データ処理パイプラインをより柔軟に制御できます。
アーキテクチャの比較
現在の AWS IoT Events アーキテクチャは、 AWS IoT Core ルールと BatchPutMessage
API を通じてデータを取り込みます。このアーキテクチャでは、 AWS IoT Core を使用してデータの取り込みとイベント発行を行います。メッセージは、状態ロジックを定義するディテクターモデルへの AWS IoT Events 入力を介してルーティングされます。IAM ロールは、必要なアクセス許可を管理します。
新しいソリューションは、データインジェスト (現在は専用の入出力 MQTT トピックを使用) AWS IoT Core のために を維持します。データパーティショニング用の Kinesis Data Streams と、状態ロジック用の評価者 Lambda 関数が導入されました。デバイスの状態が DynamoDB テーブルに保存され、拡張 IAM ロールがこれらのサービス全体のアクセス許可を管理するようになりました。
目的 | ソリューション | 相違点 |
---|---|---|
データ取り込み — IoT デバイスからデータを受信します |
AWS IoT Core |
2 つの異なる MQTT トピックが必要になりました。1 つはデバイスデータの取り込み用、もう 1 つは出力イベントの発行用です。 |
メッセージの方向 – 受信メッセージを適切なサービスにルーティングします |
AWS IoT Core メッセージルーティングルール |
同じルーティング機能を維持しますが、 ではなく Kinesis Data Streams にメッセージを送信するようになりました。 AWS IoT Events |
データ処理 – 受信データストリームを処理および整理します |
Kinesis Data Streams |
AWS IoT Events 入力機能を置き換え、データインジェストをメッセージ処理用のデバイス ID パーティショニングに置き換えます |
ロジック評価 – 状態の変更を処理し、アクションをトリガーします |
評価者 Lambda |
AWS IoT Events ディテクターモデルを置き換え、ビジュアルワークフローの代わりにコードを通じてカスタマイズ可能な状態ロジック評価を提供 |
状態管理 – デバイスの状態を維持します |
DynamoDB テーブル |
デバイス状態の永続的ストレージを提供し、内部 AWS IoT Events 状態管理を置き換える新しいコンポーネント |
セキュリティ – サービスアクセス許可を管理します |
IAM ロール |
更新されたアクセス許可には、既存のアクセス AWS IoT Core 許可に加えて、Kinesis Data Streams、DynamoDB、EventBridge へのアクセスが含まれるようになりました。 |
ステップ 1: (オプション) AWS IoT Events ディテクターモデル設定をエクスポートする
新しいリソースを作成する前に、 AWS IoT Events ディテクターモデル定義をエクスポートします。これらにはイベント処理ロジックが含まれており、新しいソリューションを実装するための過去のリファレンスとして使用できます。
ステップ 2: IAM ロールを作成する
IAM ロールを作成して、 の機能をレプリケートするアクセス許可を付与します AWS IoT Events。この例のロールは、状態管理用の DynamoDB、スケジューリング用の EventBridge、データ取り込み用の Kinesis Data Streams、メッセージの発行 AWS IoT Core 用の 、ログ記録用の CloudWatch へのアクセスを許可します。これらのサービスは、 の代替として連携して機能します AWS IoT Events。
-
以下のアクセス許可を持つ IAM ロールを作成します。IAM ロールの作成の詳細については、IAM ユーザーガイドの「 AWS サービスにアクセス許可を委任するロールを作成する」を参照してください。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "DynamoDBAccess", "Effect": "Allow", "Action": [ "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:UpdateItem", "dynamodb:DeleteItem", "dynamodb:Query", "dynamodb:Scan" ], "Resource": "arn:aws:dynamodb:
your-region
:your-account-id
:table/EventsStateTable" }, { "Sid": "SchedulerAccess", "Effect": "Allow", "Action": [ "scheduler:CreateSchedule", "scheduler:DeleteSchedule" ], "Resource": "arn:aws:scheduler:your-region
:your-account-id
:schedule/*" }, { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "arn:aws:kinesis:your-region
:your-account-id
:stream/*" }, { "Sid": "IoTPublishAccess", "Effect": "Allow", "Action": "iot:Publish", "Resource": "arn:aws:iot:your-region
:your-account-id
:topic/*" }, { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:your-region
:your-account-id
:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs::your-account-id
:log-group:/aws/lambda/your-lambda
:*" ] } ] } -
次の IAM ロールの信頼ポリシーを追加します。信頼ポリシーは、指定された AWS サービスが必要なアクションを実行できるように、IAM ロールを引き受けることを許可します。IAM 信頼ポリシーの作成方法の詳細については、IAM ユーザーガイドのカスタム信頼ポリシーを使用してロールを作成するを参照してください。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "scheduler.amazonaws.com", "lambda.amazonaws.com", "iot.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }
ステップ 3: Amazon Kinesis Data Streams を作成する
AWS Management Console または を使用して Amazon Kinesis Data Streams を作成します AWS CLI。
ステップ 4: MQTT メッセージルーティングルールを作成または更新する
新しい MQTT メッセージルーティングルールを作成するか、既存のルールを更新できます。
ステップ 5: 送信先 MQTT トピックのエンドポイントを取得する
送信先 MQTT トピックを使用して、トピックが送信メッセージを発行する場所を設定し、以前に処理された機能を置き換えます AWS IoT Events。エンドポイントは、 AWS アカウントとリージョンに固有です。
ステップ 6: Amazon DynamoDB テーブルを作成する
Amazon DynamoDB テーブルは、 の状態管理機能を置き換え AWS IoT Events、新しいソリューションアーキテクチャでデバイスの状態とディテクターモデルロジックを持続および管理するためのスケーラブルで柔軟な方法を提供します。
ステップ 7: AWS Lambda 関数を作成する (コンソール)
Lambda 関数はコア処理エンジンとして機能し、ディテクターモデルの評価ロジックを置き換えます AWS IoT Events。この例では、他の AWS サービスと統合して、定義されたルールに基づいて受信データを処理し、状態を管理し、アクションをトリガーします。
NodeJS ランタイムを使用して Lambda 関数を作成します。次のコードスニペットを使用して、ハードコードされた定数を置き換えます。
-
AWS Lambda console
を開きます。 -
[関数の作成] を選択してください。
-
関数名の名前を入力します。
-
ランタイムとして NodeJS 22.x を選択します。
-
デフォルトの実行ロールの変更ドロップダウンで、既存のロールを使用を選択し、前のステップで作成した IAM ロールを選択します。
-
[関数の作成] を選択してください。
-
ハードコードされた定数を置き換えた後、次のコードスニペットに貼り付けます。
-
関数を作成したら、Code タブに次のコード例を貼り付け、
your-destination-endpoint
エンドポイントを独自のコードに置き換えます。
import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb'; import { PutItemCommand } from '@aws-sdk/client-dynamodb'; import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane"; import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import //// External Clients and Constants const scheduler = new SchedulerClient({}); const iot = new IoTDataPlaneClient({ endpoint: 'https://
your-destination-endpoint
-ats.iot.your-region
.amazonaws.com/' }); const ddb = new DynamoDBClient({}); //// Lambda Handler function export const handler = async (event) => { console.log('Incoming event:', JSON.stringify(event, null, 2)); if (!event.Records) { throw new Error('No records found in event'); } const processedRecords = []; for (const record of event.Records) { try { if (record.eventSource !== 'aws:kinesis') { console.log(`Skipping non-Kinesis record from ${record.eventSource}`); continue; } // Assumes that we are processing records from Kinesis const payload = record.kinesis.data; const decodedData = Buffer.from(payload, 'base64').toString(); console.log("decoded payload is ", decodedData); const output = await handleDecodedData(decodedData); // Add additional processing logic here const processedData = { output, sequenceNumber: record.kinesis.sequenceNumber, partitionKey: record.kinesis.partitionKey, timestamp: record.kinesis.approximateArrivalTimestamp }; processedRecords.push(processedData); } catch (error) { console.error('Error processing record:', error); console.error('Failed record:', record); // Decide whether to throw error or continue processing other records // throw error; // Uncomment to stop processing on first error } } return { statusCode: 200, body: JSON.stringify({ message: 'Processing complete', processedCount: processedRecords.length, records: processedRecords }) }; }; // Helper function to handle decoded data async function handleDecodedData(payload) { try { // Parse the decoded data const parsedData = JSON.parse(payload); // Extract instanceId const instanceId = parsedData.instanceId; // Parse the input field const inputData = JSON.parse(parsedData.payload); const temperature = inputData.temperature; console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature); await iotEvents.process(instanceId, inputData) return { instanceId, temperature, // Add any other fields you want to return rawInput: inputData }; } catch (error) { console.error('Error handling decoded data:', error); throw error; } } //// Classes for declaring/defining the state machine class CurrentState { constructor(instanceId, stateName, variables, inputs) { this.stateName = stateName; this.variables = variables; this.inputs = inputs; this.instanceId = instanceId } static async load(instanceId) { console.log(`Loading state for id ${instanceId}`); try { const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({ TableName: 'EventsStateTable', Key: { 'InstanceId': { S: `${instanceId}` } } })); const { stateName, variables, inputs } = JSON.parse(stateContent); return new CurrentState(instanceId, stateName, variables, inputs); } catch (e) { console.log(`No state for id ${instanceId}: ${e}`); return undefined; } } static async save(instanceId, state) { console.log(`Saving state for id ${instanceId}`); await ddb.send(new PutItemCommand({ TableName: 'your-events-state-table-name
', Item: { 'InstanceId': { S: `${instanceId}` }, 'state': { S: state } } })); } setVariable(name, value) { this.variables[name] = value; } changeState(stateName) { console.log(`Changing state from ${this.stateName} to ${stateName}`); this.stateName = stateName; } async setTimer(instanceId, frequencyInMinutes, payload) { console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`); const base64Payload = Buffer.from(JSON.stringify(payload)).toString(); console.log(base64Payload); const scheduleName = `your-schedule-name
-${instanceId}-schedule`; const scheduleParams = { Name: scheduleName, FlexibleTimeWindow: { Mode: 'OFF' }, ScheduleExpression: `rate(${frequencyInMinutes} minutes)`, Target: { Arn: "arn:aws::kinesis:your-region
:your-account-id
:stream/your-kinesis-stream-name
", RoleArn: "arn:aws::iam::your-account-id
:role/service-role/your-iam-role
", Input: base64Payload, KinesisParameters: { PartitionKey: instanceId, }, RetryPolicy: { MaximumRetryAttempts: 3 } }, }; const command = new CreateScheduleCommand(scheduleParams); console.log(`Sending command to set timer ${JSON.stringify(command)}`); await scheduler.send(command); } async clearTimer(instanceId) { console.log(`Cleaning timer ${instanceId}`); const scheduleName = `your-schedule-name
-${instanceId}-schdule`; const command = new DeleteScheduleCommand({ Name: scheduleName }); await scheduler.send(command); } async executeAction(actionType, actionPayload) { console.log(`Will execute the ${actionType} with payload ${actionPayload}`); await iot.send(new PublishCommand({ topic: `${this.instanceId}`, payload: actionPayload, qos: 0 })); } setInput(value) { this.inputs = { ...this.inputs, ...value }; } input(name) { return this.inputs[name]; } } class IoTEvents { constructor(initialState) { this.initialState = initialState; this.states = {}; } state(name) { const state = new IoTEventsState(); this.states[name] = state; return state; } async process(instanceId, input) { let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {}); currentState.setInput(input); console.log(`With inputs as: ${JSON.stringify(currentState)}`); const state = this.states[currentState.stateName]; currentState = await state.evaluate(currentState); console.log(`With output as: ${JSON.stringify(currentState)}`); await CurrentState.save(instanceId, JSON.stringify(currentState)); } } class Event { constructor(condition, action) { this.condition = condition; this.action = action; } } class IoTEventsState { constructor() { this.eventsList = [] } events(eventListArg) { this.eventsList.push(...eventListArg); return this; } async evaluate(currentState) { for (const e of this.eventsList) { console.log(`Evaluating event ${e.condition}`); if (e.condition(currentState)) { console.log(`Event condition met`); // Execute any action as defined in iotEvents DM Definition await e.action(currentState); } } return currentState; } } ////// DetectorModel Definitions - replace with your own defintionslet processAlarmStateEvent = new Event( (currentState) => { const source = currentState.input('source'); return ( currentState.input('temperature') < 70 ); }, async (currentState) => { currentState.changeState('normal'); await currentState.clearTimer(currentState.instanceId) await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`); } ); let processTimerEvent = new Event( (currentState) => { const source = currentState.input('source'); console.log(`Evaluating timer event with source ${source}`); const booleanOutput = (source !== undefined && source !== null && typeof source === 'string' && source.toLowerCase() === 'timer' && // check if the currentState == state from the timer payload currentState.input('currentState') !== undefined && currentState.input('currentState') !== null && currentState.input('currentState').toLowerCase !== 'normal'); console.log(`Timer event evaluated as ${booleanOutput}`); return booleanOutput; }, async (currentState) => { await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`); } ); let processNormalEvent = new Event( (currentState) => currentState.input('temperature') > 70, async (currentState) => { currentState.changeState('alarm'); await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`); await currentState.setTimer(currentState.instanceId, 5, { "instanceId": currentState.instanceId, "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}" }); } );
const iotEvents = new IoTEvents('normal'); iotEvents .state('normal') .events( [ processNormalEvent ]); iotEvents .state('alarm') .events([ processAlarmStateEvent, processTimerEvent ] );
ステップ 8: Amazon Kinesis Data Streams トリガーを追加する
AWS Management Console または を使用して、Kinesis Data Streams トリガーを Lambda 関数に追加します AWS CLI。
Lambda 関数に Kinesis Data Streams トリガーを追加すると、データ取り込みパイプラインと処理ロジック間の接続が確立され、入力 AWS IoT Events の処理方法と同様に、受信 IoT データストリームを自動的に評価し、イベントにリアルタイムで対応できるようになります。
ステップ 9: データの取り込みと出力機能をテストする (AWS CLI)
ディテクターモデルで定義した内容に基づいて MQTT トピックにペイロードを発行します。実装をテストyour-topic-name
するための MQTT トピックへのペイロードの例を次に示します。
{ "instanceId": "your-instance-id", "payload": "{\"temperature\":78}" }
次の (または同様の) コンテンツを含むトピックに発行された MQTT メッセージが表示されます。
{ "state": "alarm detected, timer started" }