のディテクターモデルの移行手順 AWS IoT Events - AWS IoT Events

サポート終了通知: 2026 年 5 月 20 日に、 AWS は のサポートを終了します AWS IoT Events。2026 年 5 月 20 日以降、 AWS IoT Events コンソールまたは AWS IoT Events リソースにアクセスできなくなります。詳細については、AWS IoT Events 「サポート終了」を参照してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

のディテクターモデルの移行手順 AWS IoT Events

このセクションでは、移行時に同様のディテクターモデル機能を提供する代替ソリューションについて説明します AWS IoT Events。

AWS IoT Core ルールによるデータインジェストは、他の AWS サービスの組み合わせに移行できます。BatchPutMessage API によるデータ取り込みの代わりに、データを AWS IoT Core MQTT トピックにルーティングできます。

この移行アプローチでは、 AWS IoT Core MQTT トピックを IoT データのエントリポイントとして活用し、 への直接入力を置き換えます AWS IoT Events。MQTT トピックは、いくつかの主な理由で選択されます。MQTT は業界で広く使用されているため、IoT デバイスとの幅広い互換性を提供します。これらのトピックでは、多数のデバイスからの大量のメッセージを処理できるため、スケーラビリティが確保されます。また、コンテンツまたはデバイスタイプに基づいてメッセージのルーティングとフィルタリングを柔軟に行うことができます。さらに、 AWS IoT Core MQTT トピックは他の AWS サービスとシームレスに統合されるため、移行プロセスが容易になります。

MQTT トピックから Amazon Kinesis Data Streams、 AWS Lambda 関数、Amazon DynamoDB テーブル、Amazon EventBridge スケジュールを組み合わせたアーキテクチャにデータが流れます。このサービスの組み合わせにより、 によって以前に提供された機能がレプリケートおよび強化され AWS IoT Events、IoT データ処理パイプラインをより柔軟に制御できます。

アーキテクチャの比較

現在の AWS IoT Events アーキテクチャは、 AWS IoT Core ルールと BatchPutMessage API を通じてデータを取り込みます。このアーキテクチャでは、 AWS IoT Core を使用してデータの取り込みとイベント発行を行います。メッセージは、状態ロジックを定義するディテクターモデルへの AWS IoT Events 入力を介してルーティングされます。IAM ロールは、必要なアクセス許可を管理します。

新しいソリューションは、データインジェスト (現在は専用の入出力 MQTT トピックを使用) AWS IoT Core のために を維持します。データパーティショニング用の Kinesis Data Streams と、状態ロジック用の評価者 Lambda 関数が導入されました。デバイスの状態が DynamoDB テーブルに保存され、拡張 IAM ロールがこれらのサービス全体のアクセス許可を管理するようになりました。

目的 ソリューション 相違点

データ取り込み — IoT デバイスからデータを受信します

AWS IoT Core

2 つの異なる MQTT トピックが必要になりました。1 つはデバイスデータの取り込み用、もう 1 つは出力イベントの発行用です。

メッセージの方向 – 受信メッセージを適切なサービスにルーティングします

AWS IoT Core メッセージルーティングルール

同じルーティング機能を維持しますが、 ではなく Kinesis Data Streams にメッセージを送信するようになりました。 AWS IoT Events

データ処理 – 受信データストリームを処理および整理します

Kinesis Data Streams

AWS IoT Events 入力機能を置き換え、データインジェストをメッセージ処理用のデバイス ID パーティショニングに置き換えます

ロジック評価 – 状態の変更を処理し、アクションをトリガーします

評価者 Lambda

AWS IoT Events ディテクターモデルを置き換え、ビジュアルワークフローの代わりにコードを通じてカスタマイズ可能な状態ロジック評価を提供

状態管理 – デバイスの状態を維持します

DynamoDB テーブル

デバイス状態の永続的ストレージを提供し、内部 AWS IoT Events 状態管理を置き換える新しいコンポーネント

セキュリティ – サービスアクセス許可を管理します

IAM ロール

更新されたアクセス許可には、既存のアクセス AWS IoT Core 許可に加えて、Kinesis Data Streams、DynamoDB、EventBridge へのアクセスが含まれるようになりました。

ステップ 1: (オプション) AWS IoT Events ディテクターモデル設定をエクスポートする

新しいリソースを作成する前に、 AWS IoT Events ディテクターモデル定義をエクスポートします。これらにはイベント処理ロジックが含まれており、新しいソリューションを実装するための過去のリファレンスとして使用できます。

Console

を使用して AWS IoT Events AWS Management Console、次の手順を実行してディテクターモデル設定をエクスポートします。

を使用してディテクターモデルをエクスポートするには AWS Management Console
  1. AWS IoT Events コンソール にログインします。

  2. 左のナビゲーションペインで、[Detector models (ディテクターモデル)] を選択します。

  3. エクスポートするディテクターモデルを選択します。

  4. [エクスポート] を選択します。出力に関する情報メッセージを読み、もう一度エクスポートを選択します。

  5. エクスポートするディテクターモデルごとにプロセスを繰り返します。

ディテクターモデルの JSON 出力を含むファイルがブラウザのダウンロードフォルダに追加されます。オプションで各ディテクターモデル設定を保存して、履歴データを保持できます。

AWS CLI

を使用して AWS CLI、次のコマンドを実行してディテクターモデル設定をエクスポートします。

を使用してディテクターモデルをエクスポートするには AWS CLI
  1. アカウント内のすべてのディテクターモデルを一覧表示します。

    aws iotevents list-detector-models
  2. ディテクターモデルごとに、以下を実行して設定をエクスポートします。

    aws iotevents describe-detector-model \ --detector-model-name your-detector-model-name
  3. 各ディテクターモデルの出力を保存します。

ステップ 2: IAM ロールを作成する

IAM ロールを作成して、 の機能をレプリケートするアクセス許可を付与します AWS IoT Events。この例のロールは、状態管理用の DynamoDB、スケジューリング用の EventBridge、データ取り込み用の Kinesis Data Streams、メッセージの発行 AWS IoT Core 用の 、ログ記録用の CloudWatch へのアクセスを許可します。これらのサービスは、 の代替として連携して機能します AWS IoT Events。

  1. 以下のアクセス許可を持つ IAM ロールを作成します。IAM ロールの作成の詳細については、IAM ユーザーガイド「 AWS サービスにアクセス許可を委任するロールを作成する」を参照してください。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "DynamoDBAccess", "Effect": "Allow", "Action": [ "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:UpdateItem", "dynamodb:DeleteItem", "dynamodb:Query", "dynamodb:Scan" ], "Resource": "arn:aws:dynamodb:your-region:your-account-id:table/EventsStateTable" }, { "Sid": "SchedulerAccess", "Effect": "Allow", "Action": [ "scheduler:CreateSchedule", "scheduler:DeleteSchedule" ], "Resource": "arn:aws:scheduler:your-region:your-account-id:schedule/*" }, { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "arn:aws:kinesis:your-region:your-account-id:stream/*" }, { "Sid": "IoTPublishAccess", "Effect": "Allow", "Action": "iot:Publish", "Resource": "arn:aws:iot:your-region:your-account-id:topic/*" }, { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:your-region:your-account-id:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs::your-account-id:log-group:/aws/lambda/your-lambda:*" ] } ] }
  2. 次の IAM ロールの信頼ポリシーを追加します。信頼ポリシーは、指定された AWS サービスが必要なアクションを実行できるように、IAM ロールを引き受けることを許可します。IAM 信頼ポリシーの作成方法の詳細については、IAM ユーザーガイドのカスタム信頼ポリシーを使用してロールを作成するを参照してください。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "scheduler.amazonaws.com", "lambda.amazonaws.com", "iot.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }

ステップ 3: Amazon Kinesis Data Streams を作成する

AWS Management Console または を使用して Amazon Kinesis Data Streams を作成します AWS CLI。

Console

を使用して Kinesis データストリームを作成するには AWS Management Console、Amazon Kinesis Data Streams デベロッパーガイド」の「データストリームの作成」ページにある手順に従ってください。

デバイス数とメッセージペイロードサイズに基づいてシャード数を調整します。

AWS CLI

を使用して AWS CLI Amazon Kinesis Data Streams を作成し、デバイスからデータを取り込んでパーティション化します。

Kinesis Data Streams は、この移行でデータの取り込み機能を置き換えるために使用されます AWS IoT Events。IoT IoT デバイスからリアルタイムのストリーミングデータを収集、処理、分析するスケーラブルで効率的な方法を提供すると同時に、柔軟なデータ処理と他の AWS サービスとの統合を提供します。

aws kinesis create-stream --stream-name your-kinesis-stream-name --shard-count 4 --region your-region

デバイス数とメッセージペイロードサイズに基づいてシャード数を調整します。

ステップ 4: MQTT メッセージルーティングルールを作成または更新する

新しい MQTT メッセージルーティングルールを作成するか、既存のルールを更新できます。

Console
  1. 新しい MQTT メッセージルーティングルールが必要かどうか、または既存のルールを更新できるかどうかを決定します。

  2. AWS IoT Core コンソールを開きます。

  3. ナビゲーションペインで、メッセージルーティングを選択し、ルールを選択します

  4. 管理セクションで、メッセージルーティングを選択し、ルールを選択します。

  5. [‬ルールを作成]‭ を選択します。

  6. ルールプロパティの指定 ページで、 AWS IoT Core ルール名のルール名を入力します。ルールの説明 - オプションで、イベントを処理して Kinesis Data Streams に転送していることを識別する説明を入力します。

  7. SQL ステートメントの設定ページで、SQL ステートメントに「」と入力しSELECT * FROM 'your-database'次へを選択します。

  8. ルールアクションのアタッチページで、ルールアクションkinesis を選択します。

  9. ストリームの Kinesis ストリームを選択します。パーティションキーに「your-instance-id」と入力します。IAM ロールに適したロールを選択し、ルールの追加アクションを選択します。

詳細については、「Creating AWS IoT rules to route device data to other services」を参照してください。

AWS CLI
  1. 次の内容を含む JSON ファイル を作成します。この JSON 設定ファイルは、トピックからすべてのメッセージを選択し、インスタンス ID をパーティションキーとして使用して、指定された Kinesis ストリームに転送する AWS IoT Core ルールを定義します。

    { "sql": "SELECT * FROM 'your-config-file'", "description": "Rule to process events and forward to Kinesis Data Streams", "actions": [ { "kinesis": { "streamName": "your-kinesis-stream-name", "roleArn": "arn:aws:iam::your-account-id:role/service-role/your-iam-role", "partitionKey": "${your-instance-id}" } } ], "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23" }
  2. を使用して MQTT トピックルールを作成します AWS CLI。このステップでは AWS CLI 、 を使用して、 events_rule.json ファイルで定義された設定を使用して AWS IoT Core トピックルールを作成します。

    aws iot create-topic-rule \ --rule-name "your-iot-core-rule" \ --topic-rule-payload file://your-file-name.json

ステップ 5: 送信先 MQTT トピックのエンドポイントを取得する

送信先 MQTT トピックを使用して、トピックが送信メッセージを発行する場所を設定し、以前に処理された機能を置き換えます AWS IoT Events。エンドポイントは、 AWS アカウントとリージョンに固有です。

Console
  1. AWS IoT Core コンソールを開きます。

  2. 左側のナビゲーションパネルの接続セクションで、ドメイン設定を選択します。

  3. iot:Data-ATS ドメイン設定を選択して、設定の詳細ページを開きます。

  4. ドメイン名の値をコピーします。この値はエンドポイントです。後のステップで必要になるため、エンドポイント値を保存します。

AWS CLI

次のコマンドを実行して、アカウントの送信メッセージを発行するための AWS IoT Core エンドポイントを取得します。

aws iot describe-endpoint --endpoint-type iot:Data-ATS --region your-region

ステップ 6: Amazon DynamoDB テーブルを作成する

Amazon DynamoDB テーブルは、 の状態管理機能を置き換え AWS IoT Events、新しいソリューションアーキテクチャでデバイスの状態とディテクターモデルロジックを持続および管理するためのスケーラブルで柔軟な方法を提供します。

Console

Amazon DynamoDB テーブルを作成して、ディテクターモデルの状態を維持します。詳細については、「Amazon DynamoDB デベロッパーガイド」の「DynamoDB でテーブルを作成する」を参照してください。 DynamoDB

テーブルの詳細については、以下を使用します。

  • テーブル名には、選択したテーブル名を入力します。

  • パーティションキーには、独自のインスタンス ID を入力します。

  • テーブル設定のデフォルト設定を使用できます。

AWS CLI

次のコマンドを実行して、DynamoDB テーブルを作成します。

aws dynamodb create-table \ --table-name your-table-name \ --attribute-definitions AttributeName=your-instance-id,AttributeType=S \ --key-schema AttributeName=your-instance-id,KeyType=HASH \

ステップ 7: AWS Lambda 関数を作成する (コンソール)

Lambda 関数はコア処理エンジンとして機能し、ディテクターモデルの評価ロジックを置き換えます AWS IoT Events。この例では、他の AWS サービスと統合して、定義されたルールに基づいて受信データを処理し、状態を管理し、アクションをトリガーします。

NodeJS ランタイムを使用して Lambda 関数を作成します。次のコードスニペットを使用して、ハードコードされた定数を置き換えます。

  1. AWS Lambda consoleを開きます。

  2. [関数の作成] を選択してください。

  3. 関数名の名前を入力します。

  4. ランタイムとして NodeJS 22.x を選択します。

  5. デフォルトの実行ロールの変更ドロップダウンで、既存のロールを使用を選択し、前のステップで作成した IAM ロールを選択します。

  6. [関数の作成] を選択してください。

  7. ハードコードされた定数を置き換えた後、次のコードスニペットに貼り付けます。

  8. 関数を作成したら、Code タブに次のコード例を貼り付け、your-destination-endpointエンドポイントを独自のコードに置き換えます。

import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb'; import { PutItemCommand } from '@aws-sdk/client-dynamodb'; import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane"; import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import //// External Clients and Constants const scheduler = new SchedulerClient({}); const iot = new IoTDataPlaneClient({ endpoint: 'https://your-destination-endpoint-ats.iot.your-region.amazonaws.com/' }); const ddb = new DynamoDBClient({}); //// Lambda Handler function export const handler = async (event) => { console.log('Incoming event:', JSON.stringify(event, null, 2)); if (!event.Records) { throw new Error('No records found in event'); } const processedRecords = []; for (const record of event.Records) { try { if (record.eventSource !== 'aws:kinesis') { console.log(`Skipping non-Kinesis record from ${record.eventSource}`); continue; } // Assumes that we are processing records from Kinesis const payload = record.kinesis.data; const decodedData = Buffer.from(payload, 'base64').toString(); console.log("decoded payload is ", decodedData); const output = await handleDecodedData(decodedData); // Add additional processing logic here const processedData = { output, sequenceNumber: record.kinesis.sequenceNumber, partitionKey: record.kinesis.partitionKey, timestamp: record.kinesis.approximateArrivalTimestamp }; processedRecords.push(processedData); } catch (error) { console.error('Error processing record:', error); console.error('Failed record:', record); // Decide whether to throw error or continue processing other records // throw error; // Uncomment to stop processing on first error } } return { statusCode: 200, body: JSON.stringify({ message: 'Processing complete', processedCount: processedRecords.length, records: processedRecords }) }; }; // Helper function to handle decoded data async function handleDecodedData(payload) { try { // Parse the decoded data const parsedData = JSON.parse(payload); // Extract instanceId const instanceId = parsedData.instanceId; // Parse the input field const inputData = JSON.parse(parsedData.payload); const temperature = inputData.temperature; console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature); await iotEvents.process(instanceId, inputData) return { instanceId, temperature, // Add any other fields you want to return rawInput: inputData }; } catch (error) { console.error('Error handling decoded data:', error); throw error; } } //// Classes for declaring/defining the state machine class CurrentState { constructor(instanceId, stateName, variables, inputs) { this.stateName = stateName; this.variables = variables; this.inputs = inputs; this.instanceId = instanceId } static async load(instanceId) { console.log(`Loading state for id ${instanceId}`); try { const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({ TableName: 'EventsStateTable', Key: { 'InstanceId': { S: `${instanceId}` } } })); const { stateName, variables, inputs } = JSON.parse(stateContent); return new CurrentState(instanceId, stateName, variables, inputs); } catch (e) { console.log(`No state for id ${instanceId}: ${e}`); return undefined; } } static async save(instanceId, state) { console.log(`Saving state for id ${instanceId}`); await ddb.send(new PutItemCommand({ TableName: 'your-events-state-table-name', Item: { 'InstanceId': { S: `${instanceId}` }, 'state': { S: state } } })); } setVariable(name, value) { this.variables[name] = value; } changeState(stateName) { console.log(`Changing state from ${this.stateName} to ${stateName}`); this.stateName = stateName; } async setTimer(instanceId, frequencyInMinutes, payload) { console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`); const base64Payload = Buffer.from(JSON.stringify(payload)).toString(); console.log(base64Payload); const scheduleName = `your-schedule-name-${instanceId}-schedule`; const scheduleParams = { Name: scheduleName, FlexibleTimeWindow: { Mode: 'OFF' }, ScheduleExpression: `rate(${frequencyInMinutes} minutes)`, Target: { Arn: "arn:aws::kinesis:your-region:your-account-id:stream/your-kinesis-stream-name", RoleArn: "arn:aws::iam::your-account-id:role/service-role/your-iam-role", Input: base64Payload, KinesisParameters: { PartitionKey: instanceId, }, RetryPolicy: { MaximumRetryAttempts: 3 } }, }; const command = new CreateScheduleCommand(scheduleParams); console.log(`Sending command to set timer ${JSON.stringify(command)}`); await scheduler.send(command); } async clearTimer(instanceId) { console.log(`Cleaning timer ${instanceId}`); const scheduleName = `your-schedule-name-${instanceId}-schdule`; const command = new DeleteScheduleCommand({ Name: scheduleName }); await scheduler.send(command); } async executeAction(actionType, actionPayload) { console.log(`Will execute the ${actionType} with payload ${actionPayload}`); await iot.send(new PublishCommand({ topic: `${this.instanceId}`, payload: actionPayload, qos: 0 })); } setInput(value) { this.inputs = { ...this.inputs, ...value }; } input(name) { return this.inputs[name]; } } class IoTEvents { constructor(initialState) { this.initialState = initialState; this.states = {}; } state(name) { const state = new IoTEventsState(); this.states[name] = state; return state; } async process(instanceId, input) { let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {}); currentState.setInput(input); console.log(`With inputs as: ${JSON.stringify(currentState)}`); const state = this.states[currentState.stateName]; currentState = await state.evaluate(currentState); console.log(`With output as: ${JSON.stringify(currentState)}`); await CurrentState.save(instanceId, JSON.stringify(currentState)); } } class Event { constructor(condition, action) { this.condition = condition; this.action = action; } } class IoTEventsState { constructor() { this.eventsList = [] } events(eventListArg) { this.eventsList.push(...eventListArg); return this; } async evaluate(currentState) { for (const e of this.eventsList) { console.log(`Evaluating event ${e.condition}`); if (e.condition(currentState)) { console.log(`Event condition met`); // Execute any action as defined in iotEvents DM Definition await e.action(currentState); } } return currentState; } } ////// DetectorModel Definitions - replace with your own defintions let processAlarmStateEvent = new Event( (currentState) => { const source = currentState.input('source'); return ( currentState.input('temperature') < 70 ); }, async (currentState) => { currentState.changeState('normal'); await currentState.clearTimer(currentState.instanceId) await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`); } ); let processTimerEvent = new Event( (currentState) => { const source = currentState.input('source'); console.log(`Evaluating timer event with source ${source}`); const booleanOutput = (source !== undefined && source !== null && typeof source === 'string' && source.toLowerCase() === 'timer' && // check if the currentState == state from the timer payload currentState.input('currentState') !== undefined && currentState.input('currentState') !== null && currentState.input('currentState').toLowerCase !== 'normal'); console.log(`Timer event evaluated as ${booleanOutput}`); return booleanOutput; }, async (currentState) => { await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`); } ); let processNormalEvent = new Event( (currentState) => currentState.input('temperature') > 70, async (currentState) => { currentState.changeState('alarm'); await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`); await currentState.setTimer(currentState.instanceId, 5, { "instanceId": currentState.instanceId, "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}" }); } ); const iotEvents = new IoTEvents('normal'); iotEvents .state('normal') .events( [ processNormalEvent ]); iotEvents .state('alarm') .events([ processAlarmStateEvent, processTimerEvent ] );

ステップ 8: Amazon Kinesis Data Streams トリガーを追加する

AWS Management Console または を使用して、Kinesis Data Streams トリガーを Lambda 関数に追加します AWS CLI。

Lambda 関数に Kinesis Data Streams トリガーを追加すると、データ取り込みパイプラインと処理ロジック間の接続が確立され、入力 AWS IoT Events の処理方法と同様に、受信 IoT データストリームを自動的に評価し、イベントにリアルタイムで対応できるようになります。

Console

詳細については、「 デベロッパーガイド」の「Lambda 関数を呼び出すイベントソースマッピングを作成する」を参照してください。 AWS Lambda

イベントソースマッピングの詳細には、以下を使用します。

AWS CLI

次のコマンドを実行して、Lambda 関数トリガーを作成します。

aws lambda create-event-source-mapping \ --function-name your-lambda-name \ --event-source arn:aws:kinesis:your-region:your-account-id:stream/your-kinesis-stream-name \ --batch-size 10 \ --starting-position LATEST \ --region your-region

ステップ 9: データの取り込みと出力機能をテストする (AWS CLI)

ディテクターモデルで定義した内容に基づいて MQTT トピックにペイロードを発行します。実装をテストyour-topic-nameするための MQTT トピックへのペイロードの例を次に示します。

{ "instanceId": "your-instance-id", "payload": "{\"temperature\":78}" }

次の (または同様の) コンテンツを含むトピックに発行された MQTT メッセージが表示されます。

{ "state": "alarm detected, timer started" }