의 감지기 모델에 대한 마이그레이션 절차 AWS IoT Events - AWS IoT Events

지원 종료 알림: 2026년 5월 20일에 AWS 에 대한 지원이 종료됩니다 AWS IoT Events. 2026년 5월 20일 이후에는 AWS IoT Events 콘솔 또는 AWS IoT Events 리소스에 더 이상 액세스할 수 없습니다. 자세한 내용은 AWS IoT Events 지원 종료를 참조하세요.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

의 감지기 모델에 대한 마이그레이션 절차 AWS IoT Events

이 섹션에서는 마이그레이션할 때 유사한 감지기 모델 기능을 제공하는 대체 솔루션에 대해 설명합니다 AWS IoT Events.

AWS IoT Core 규칙을 통해 데이터 수집을 다른 AWS 서비스의 조합으로 마이그레이션할 수 있습니다. BatchPutMessage API를 통한 데이터 수집 대신 데이터를 AWS IoT Core MQTT 주제로 라우팅할 수 있습니다.

이 마이그레이션 접근 방식은 AWS IoT Core MQTT 주제를 IoT 데이터의 진입점으로 활용하여 직접 입력을 대체합니다 AWS IoT Events. MQTT 주제는 몇 가지 주요 이유로 선택됩니다. MQTT는 업계에서 널리 사용되므로 IoT 디바이스와의 호환성이 광범위합니다. 이러한 주제는 수많은 디바이스의 대량 메시지를 처리하여 확장성을 보장할 수 있습니다. 또한 콘텐츠 또는 디바이스 유형에 따라 메시지를 라우팅하고 필터링할 수 있는 유연성도 제공합니다. 또한 AWS IoT Core MQTT 주제는 다른 AWS 서비스와 원활하게 통합되어 마이그레이션 프로세스를 용이하게 합니다.

MQTT 주제에서 Amazon Kinesis Data Streams, 함수, Amazon DynamoDB 테이블 및 Amazon EventBridge 일정을 결합한 AWS Lambda 아키텍처로 데이터가 흐릅니다. 이러한 서비스 조합은 이전에에서 제공한 기능을 복제하고 개선 AWS IoT Events하여 IoT 데이터 처리 파이프라인에 대한 유연성과 제어를 강화합니다.

아키텍처 비교

현재 AWS IoT Events 아키텍처는 AWS IoT Core 규칙과 BatchPutMessage API를 통해 데이터를 수집합니다. 이 아키텍처는 데이터 수집 및 이벤트 게시 AWS IoT Core 에를 사용하며, 메시지는 AWS IoT Events 입력을 통해 상태 로직을 정의하는 감지기 모델로 라우팅됩니다. IAM 역할은 필요한 권한을 관리합니다.

새로운 솔루션은 데이터 수집을 AWS IoT Core 위해를 유지합니다(이제 전용 입력 및 출력 MQTT 주제 사용). 데이터 파티셔닝을 위한 Kinesis Data Streams와 상태 로직을 위한 평가자 Lambda 함수를 소개합니다. 이제 디바이스 상태가 DynamoDB 테이블에 저장되고 향상된 IAM 역할이 이러한 서비스 전반의 권한을 관리합니다.

용도 Solution 차이

데이터 수집 - IoT 디바이스에서 데이터를 수신합니다.

AWS IoT Core

이제 두 가지 MQTT 주제가 필요합니다. 하나는 디바이스 데이터를 수집하기 위한 주제이고 다른 하나는 출력 이벤트를 게시하기 위한 주제입니다.

메시지 방향 - 수신 메시지를 적절한 서비스로 라우팅합니다.

AWS IoT Core 메시지 라우팅 규칙

동일한 라우팅 기능을 유지하지만 이제 대신 Kinesis Data Streams로 메시지를 전달합니다. AWS IoT Events

데이터 처리 - 수신 데이터 스트림을 처리하고 구성합니다.

Kinesis Data Streams

AWS IoT Events 입력 기능을 대체하여 데이터 수집을 메시지 처리를 위한 디바이스 ID 파티셔닝으로 제공합니다.

로직 평가 - 상태 변경을 처리하고 작업을 트리거합니다.

평가자 Lambda

AWS IoT Events 감지기 모델을 교체하여 시각적 워크플로 대신 코드를 통해 사용자 지정 가능한 상태 로직 평가 제공

상태 관리 - 디바이스 상태를 유지합니다.

DynamoDB 테이블

디바이스 상태의 영구 스토리지를 제공하여 내부 AWS IoT Events 상태 관리를 대체하는 새로운 구성 요소

보안 - 서비스 권한을 관리합니다.

IAM 역할

업데이트된 권한에는 기존 권한 외에도 Kinesis Data Streams, DynamoDB 및 EventBridge에 대한 액세스 AWS IoT Core 가 포함됩니다.

1단계: (선택 사항) AWS IoT Events 감지기 모델 구성 내보내기

새 리소스를 생성하기 전에 AWS IoT Events 감지기 모델 정의를 내보냅니다. 여기에는 이벤트 처리 로직이 포함되며 새 솔루션을 구현하기 위한 기록 참조 역할을 할 수 있습니다.

Console

를 사용하여 다음 단계를 AWS IoT Events AWS Management Console수행하여 감지기 모델 구성을 내보냅니다.

를 사용하여 감지기 모델을 내보내려면 AWS Management Console
  1. AWS IoT Events 콘솔에 로그인합니다.

  2. 왼쪽 탐색 창에서 [Detector models(감지기 모델)]를 선택합니다.

  3. 내보낼 감지기 모델을 선택합니다.

  4. 내보내기를 선택합니다. 출력과 관련된 정보 메시지를 읽은 다음 내보내기를 다시 선택합니다.

  5. 내보내려는 각 감지기 모델에 대해 프로세스를 반복합니다.

감지기 모델의 JSON 출력이 포함된 파일이 브라우저의 다운로드 폴더에 추가됩니다. 필요에 따라 각 감지기 모델 구성을 저장하여 기록 데이터를 보존할 수 있습니다.

AWS CLI

를 사용하여 다음 명령을 AWS CLI실행하여 감지기 모델 구성을 내보냅니다.

를 사용하여 감지기 모델을 내보내려면 AWS CLI
  1. 계정의 모든 감지기 모델을 나열합니다.

    aws iotevents list-detector-models
  2. 각 감지기 모델에 대해 다음을 실행하여 구성을 내보냅니다.

    aws iotevents describe-detector-model \ --detector-model-name your-detector-model-name
  3. 각 감지기 모델의 출력을 저장합니다.

2단계 - IAM 역할 생성

의 기능을 복제할 수 있는 권한을 제공하는 IAM 역할을 생성합니다 AWS IoT Events. 이 예제의 역할은 상태 관리를 위한 DynamoDB, 예약을 위한 EventBridge, 데이터 수집을 위한 Kinesis Data Streams, 메시지 게시를 AWS IoT Core 위한 및 로깅을 위한 CloudWatch에 대한 액세스 권한을 부여합니다. 이러한 서비스는를 대체하는 역할을 합니다 AWS IoT Events.

  1. 다음 권한을 가진 IAM 역할을 만듭니다. IAM 역할 생성에 대한 자세한 지침은 IAM 사용 설명서AWS 서비스에 권한을 위임할 역할 생성을 참조하세요.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "DynamoDBAccess", "Effect": "Allow", "Action": [ "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:UpdateItem", "dynamodb:DeleteItem", "dynamodb:Query", "dynamodb:Scan" ], "Resource": "arn:aws:dynamodb:your-region:your-account-id:table/EventsStateTable" }, { "Sid": "SchedulerAccess", "Effect": "Allow", "Action": [ "scheduler:CreateSchedule", "scheduler:DeleteSchedule" ], "Resource": "arn:aws:scheduler:your-region:your-account-id:schedule/*" }, { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "arn:aws:kinesis:your-region:your-account-id:stream/*" }, { "Sid": "IoTPublishAccess", "Effect": "Allow", "Action": "iot:Publish", "Resource": "arn:aws:iot:your-region:your-account-id:topic/*" }, { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:your-region:your-account-id:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs::your-account-id:log-group:/aws/lambda/your-lambda:*" ] } ] }
  2. 다음 IAM 역할 신뢰 정책을 추가합니다. 신뢰 정책은 지정된 AWS 서비스가 필요한 작업을 수행할 수 있도록 IAM 역할을 수임하도록 허용합니다. IAM 신뢰 정책 생성에 대한 자세한 지침은 IAM 사용 설명서사용자 지정 신뢰 정책을 사용하여 역할 생성을 참조하세요.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "scheduler.amazonaws.com", "lambda.amazonaws.com", "iot.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }

3단계: Amazon Kinesis Data Streams 생성

AWS Management Console 또는를 사용하여 Amazon Kinesis Data Streams를 생성합니다 AWS CLI.

Console

를 사용하여 Kinesis 데이터 스트림을 생성하려면 Amazon Kinesis Data Streams 개발자 안내서데이터 스트림 생성 페이지에 있는 절차를 AWS Management Console따르세요.

디바이스 수와 메시지 페이로드 크기에 따라 샤드 수를 조정합니다.

AWS CLI

AWS CLI를 사용하여 Amazon Kinesis Data Streams를 생성하여 디바이스에서 데이터를 수집하고 분할합니다.

Kinesis Data Streams는이 마이그레이션에서의 데이터 수집 기능을 대체하는 데 사용됩니다 AWS IoT Events. IoT 디바이스에서 실시간 스트리밍 데이터를 수집, 처리 및 분석하는 확장 가능하고 효율적인 방법을 제공하는 동시에 유연한 데이터 처리 및 다른 AWS 서비스와의 통합을 제공합니다.

aws kinesis create-stream --stream-name your-kinesis-stream-name --shard-count 4 --region your-region

디바이스 수와 메시지 페이로드 크기에 따라 샤드 수를 조정합니다.

4단계: MQTT 메시지 라우팅 규칙 생성 또는 업데이트

새 MQTT 메시지 라우팅 규칙을 생성하거나 기존 규칙을 업데이트할 수 있습니다.

Console
  1. 새 MQTT 메시지 라우팅 규칙이 필요한지 또는 기존 규칙을 업데이트할 수 있는지 확인합니다.

  2. AWS IoT Core 콘솔을 엽니다.

  3. 탐색 창에서 메시지 라우팅을 선택한 다음 규칙을 선택합니다.

  4. 관리 섹션에서 메시지 라우팅을 선택한 다음 규칙을 선택합니다.

  5. 규칙 생성을 선택합니다.

  6. 규칙 속성 지정 페이지에서 AWS IoT Core 규칙 이름에 규칙 이름을 입력합니다. 규칙 설명 - 선택 사항에서 이벤트를 처리하고 있음을 식별하여 Kinesis Data Streams에 전달하는 설명을 입력합니다.

  7. SQL 문 구성 페이지에서 SQL 문에 다음을 입력한 후 다음을 SELECT * FROM 'your-database'선택합니다.

  8. 규칙 작업 연결 페이지의 규칙 작업에서 kinesis를 선택합니다.

  9. 스트림에 대한 Kinesis 스트림을 선택합니다. 파티션 키에 your-instance-id을 입력합니다. IAM 역할에 적합한 역할을 선택한 다음 규칙 작업 추가를 선택합니다.

자세한 내용은 디바이스 데이터를 다른 서비스로 라우팅하는 AWS IoT 규칙 생성을 참조하세요.

AWS CLI
  1. 다음 콘텐츠를 포함하는 JSON 파일을 생성합니다. 이 JSON 구성 파일은 인스턴스 ID를 파티션 키로 사용하여 주제에서 모든 메시지를 선택하고 지정된 Kinesis 스트림으로 전달하는 AWS IoT Core 규칙을 정의합니다.

    { "sql": "SELECT * FROM 'your-config-file'", "description": "Rule to process events and forward to Kinesis Data Streams", "actions": [ { "kinesis": { "streamName": "your-kinesis-stream-name", "roleArn": "arn:aws:iam::your-account-id:role/service-role/your-iam-role", "partitionKey": "${your-instance-id}" } } ], "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23" }
  2. 를 사용하여 MQTT 주제 규칙을 생성합니다 AWS CLI. 이 단계에서는 AWS CLI 를 사용하여 events_rule.json 파일에 정의된 구성을 사용하여 AWS IoT Core 주제 규칙을 생성합니다.

    aws iot create-topic-rule \ --rule-name "your-iot-core-rule" \ --topic-rule-payload file://your-file-name.json

5단계: 대상 MQTT 주제의 엔드포인트 가져오기

대상 MQTT 주제를 사용하여 주제가 발신 메시지를 게시하는 위치를 구성하고 이전에 처리한 기능을 바꿉니다 AWS IoT Events. 엔드포인트는 AWS 계정 및 리전에 고유합니다.

Console
  1. AWS IoT Core 콘솔을 엽니다.

  2. 왼쪽 탐색 패널의 연결 섹션에서 도메인 구성을 선택합니다.

  3. iot:Data-ATS 도메인 구성을 선택하여 구성의 세부 정보 페이지를 엽니다.

  4. 도메인 이름 값을 복사합니다. 이 값은 엔드포인트입니다. 엔드포인트 값은 이후 단계에서 필요하므로 저장합니다.

AWS CLI

다음 명령을 실행하여 계정에 대한 발신 메시지를 게시하기 위한 AWS IoT Core 엔드포인트를 가져옵니다.

aws iot describe-endpoint --endpoint-type iot:Data-ATS --region your-region

6단계: Amazon DynamoDB 테이블 생성

Amazon DynamoDB 테이블은의 상태 관리 기능을 대체하여 새로운 솔루션 아키텍처에서 디바이스의 상태와 감지기 모델 로직을 지속하고 관리할 수 있는 확장 가능하고 유연한 방법을 AWS IoT Events제공합니다.

Console

Amazon DynamoDB 테이블을 생성하여 감지기 모델의 상태를 유지합니다. 자세한 내용은 Amazon DynamoDB 개발자 안내서의 DynamoDB에서 테이블 생성을 참조하세요. DynamoDB

테이블 세부 정보는 다음을 사용합니다.

  • 테이블 이름에 선택한 테이블 이름을 입력합니다.

  • 파티션 키에 자체 인스턴스 ID를 입력합니다.

  • 테이블 설정에 기본 설정을 사용할 수 있습니다.

AWS CLI

다음 명령을 실행하여 DynamoDB 테이블을 생성합니다.

aws dynamodb create-table \ --table-name your-table-name \ --attribute-definitions AttributeName=your-instance-id,AttributeType=S \ --key-schema AttributeName=your-instance-id,KeyType=HASH \

7단계: AWS Lambda 함수 생성(콘솔)

Lambda 함수는의 감지기 모델 평가 로직을 대체하는 코어 처리 엔진 역할을 합니다 AWS IoT Events. 이 예제에서는 다른 AWS 서비스와 통합되어 수신 데이터를 처리하고, 상태를 관리하고, 정의된 규칙에 따라 작업을 트리거합니다.

NodeJS 런타임을 사용하여 Lambda 함수를 생성합니다. 하드 코딩된 상수를 대체하여 다음 코드 조각을 사용합니다.

  1. AWS Lambda console을 엽니다.

  2. 함수 생성을 선택합니다.

  3. 함수 이름의 이름을 입력합니다.

  4. NodeJS 22.x런타임으로 선택합니다.

  5. 기본 실행 역할 변경 드롭다운에서 기존 역할 사용을 선택한 다음 이전 단계에서 생성한 IAM 역할을 선택합니다.

  6. 함수 생성을 선택합니다.

  7. 하드 코딩된 상수를 교체한 후 다음 코드 조각을 붙여 넣습니다.

  8. 함수가 생성된 후 코드 탭에서 다음 코드 예제를 붙여넣고 your-destination-endpoint 엔드포인트를 사용자 고유의 것으로 바꿉니다.

import { DynamoDBClient, GetItemCommand } from '@aws-sdk/client-dynamodb'; import { PutItemCommand } from '@aws-sdk/client-dynamodb'; import { IoTDataPlaneClient, PublishCommand } from "@aws-sdk/client-iot-data-plane"; import { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } from "@aws-sdk/client-scheduler"; // ES Modules import //// External Clients and Constants const scheduler = new SchedulerClient({}); const iot = new IoTDataPlaneClient({ endpoint: 'https://your-destination-endpoint-ats.iot.your-region.amazonaws.com/' }); const ddb = new DynamoDBClient({}); //// Lambda Handler function export const handler = async (event) => { console.log('Incoming event:', JSON.stringify(event, null, 2)); if (!event.Records) { throw new Error('No records found in event'); } const processedRecords = []; for (const record of event.Records) { try { if (record.eventSource !== 'aws:kinesis') { console.log(`Skipping non-Kinesis record from ${record.eventSource}`); continue; } // Assumes that we are processing records from Kinesis const payload = record.kinesis.data; const decodedData = Buffer.from(payload, 'base64').toString(); console.log("decoded payload is ", decodedData); const output = await handleDecodedData(decodedData); // Add additional processing logic here const processedData = { output, sequenceNumber: record.kinesis.sequenceNumber, partitionKey: record.kinesis.partitionKey, timestamp: record.kinesis.approximateArrivalTimestamp }; processedRecords.push(processedData); } catch (error) { console.error('Error processing record:', error); console.error('Failed record:', record); // Decide whether to throw error or continue processing other records // throw error; // Uncomment to stop processing on first error } } return { statusCode: 200, body: JSON.stringify({ message: 'Processing complete', processedCount: processedRecords.length, records: processedRecords }) }; }; // Helper function to handle decoded data async function handleDecodedData(payload) { try { // Parse the decoded data const parsedData = JSON.parse(payload); // Extract instanceId const instanceId = parsedData.instanceId; // Parse the input field const inputData = JSON.parse(parsedData.payload); const temperature = inputData.temperature; console.log('For InstanceId: ', instanceId, ' the temperature is:', temperature); await iotEvents.process(instanceId, inputData) return { instanceId, temperature, // Add any other fields you want to return rawInput: inputData }; } catch (error) { console.error('Error handling decoded data:', error); throw error; } } //// Classes for declaring/defining the state machine class CurrentState { constructor(instanceId, stateName, variables, inputs) { this.stateName = stateName; this.variables = variables; this.inputs = inputs; this.instanceId = instanceId } static async load(instanceId) { console.log(`Loading state for id ${instanceId}`); try { const { Item: { state: { S: stateContent } } } = await ddb.send(new GetItemCommand({ TableName: 'EventsStateTable', Key: { 'InstanceId': { S: `${instanceId}` } } })); const { stateName, variables, inputs } = JSON.parse(stateContent); return new CurrentState(instanceId, stateName, variables, inputs); } catch (e) { console.log(`No state for id ${instanceId}: ${e}`); return undefined; } } static async save(instanceId, state) { console.log(`Saving state for id ${instanceId}`); await ddb.send(new PutItemCommand({ TableName: 'your-events-state-table-name', Item: { 'InstanceId': { S: `${instanceId}` }, 'state': { S: state } } })); } setVariable(name, value) { this.variables[name] = value; } changeState(stateName) { console.log(`Changing state from ${this.stateName} to ${stateName}`); this.stateName = stateName; } async setTimer(instanceId, frequencyInMinutes, payload) { console.log(`Setting timer ${instanceId} for frequency of ${frequencyInMinutes} minutes`); const base64Payload = Buffer.from(JSON.stringify(payload)).toString(); console.log(base64Payload); const scheduleName = `your-schedule-name-${instanceId}-schedule`; const scheduleParams = { Name: scheduleName, FlexibleTimeWindow: { Mode: 'OFF' }, ScheduleExpression: `rate(${frequencyInMinutes} minutes)`, Target: { Arn: "arn:aws::kinesis:your-region:your-account-id:stream/your-kinesis-stream-name", RoleArn: "arn:aws::iam::your-account-id:role/service-role/your-iam-role", Input: base64Payload, KinesisParameters: { PartitionKey: instanceId, }, RetryPolicy: { MaximumRetryAttempts: 3 } }, }; const command = new CreateScheduleCommand(scheduleParams); console.log(`Sending command to set timer ${JSON.stringify(command)}`); await scheduler.send(command); } async clearTimer(instanceId) { console.log(`Cleaning timer ${instanceId}`); const scheduleName = `your-schedule-name-${instanceId}-schdule`; const command = new DeleteScheduleCommand({ Name: scheduleName }); await scheduler.send(command); } async executeAction(actionType, actionPayload) { console.log(`Will execute the ${actionType} with payload ${actionPayload}`); await iot.send(new PublishCommand({ topic: `${this.instanceId}`, payload: actionPayload, qos: 0 })); } setInput(value) { this.inputs = { ...this.inputs, ...value }; } input(name) { return this.inputs[name]; } } class IoTEvents { constructor(initialState) { this.initialState = initialState; this.states = {}; } state(name) { const state = new IoTEventsState(); this.states[name] = state; return state; } async process(instanceId, input) { let currentState = await CurrentState.load(instanceId) || new CurrentState(instanceId, this.initialState, {}, {}); currentState.setInput(input); console.log(`With inputs as: ${JSON.stringify(currentState)}`); const state = this.states[currentState.stateName]; currentState = await state.evaluate(currentState); console.log(`With output as: ${JSON.stringify(currentState)}`); await CurrentState.save(instanceId, JSON.stringify(currentState)); } } class Event { constructor(condition, action) { this.condition = condition; this.action = action; } } class IoTEventsState { constructor() { this.eventsList = [] } events(eventListArg) { this.eventsList.push(...eventListArg); return this; } async evaluate(currentState) { for (const e of this.eventsList) { console.log(`Evaluating event ${e.condition}`); if (e.condition(currentState)) { console.log(`Event condition met`); // Execute any action as defined in iotEvents DM Definition await e.action(currentState); } } return currentState; } } ////// DetectorModel Definitions - replace with your own defintions let processAlarmStateEvent = new Event( (currentState) => { const source = currentState.input('source'); return ( currentState.input('temperature') < 70 ); }, async (currentState) => { currentState.changeState('normal'); await currentState.clearTimer(currentState.instanceId) await currentState.executeAction('MQTT', `{"state": "alarm cleared, timer deleted" }`); } ); let processTimerEvent = new Event( (currentState) => { const source = currentState.input('source'); console.log(`Evaluating timer event with source ${source}`); const booleanOutput = (source !== undefined && source !== null && typeof source === 'string' && source.toLowerCase() === 'timer' && // check if the currentState == state from the timer payload currentState.input('currentState') !== undefined && currentState.input('currentState') !== null && currentState.input('currentState').toLowerCase !== 'normal'); console.log(`Timer event evaluated as ${booleanOutput}`); return booleanOutput; }, async (currentState) => { await currentState.executeAction('MQTT', `{"state": "timer timed out in Alarming state" }`); } ); let processNormalEvent = new Event( (currentState) => currentState.input('temperature') > 70, async (currentState) => { currentState.changeState('alarm'); await currentState.executeAction('MQTT', `{"state": "alarm detected, timer started" }`); await currentState.setTimer(currentState.instanceId, 5, { "instanceId": currentState.instanceId, "payload":"{\"currentState\": \"alarm\", \"source\": \"timer\"}" }); } ); const iotEvents = new IoTEvents('normal'); iotEvents .state('normal') .events( [ processNormalEvent ]); iotEvents .state('alarm') .events([ processAlarmStateEvent, processTimerEvent ] );

8단계: Amazon Kinesis Data Streams 트리거 추가

AWS Management Console 또는를 사용하여 Lambda 함수에 Kinesis Data Streams 트리거를 추가합니다 AWS CLI.

Lambda 함수에 Kinesis Data Streams 트리거를 추가하면 데이터 수집 파이프라인과 처리 로직 간의 연결이 설정되므로가 입력을 AWS IoT Events 처리하는 방식과 마찬가지로 들어오는 IoT 데이터 스트림을 자동으로 평가하고 이벤트에 실시간으로 대응할 수 있습니다.

Console

자세한 내용은 AWS Lambda 개발자 안내서이벤트 소스 매핑 생성을 참조하여 Lambda 함수를 호출하세요.

이벤트 소스 매핑 세부 정보에 다음을 사용합니다.

  • 함수 이름에에 사용된 Lambda 이름을 입력합니다7단계: AWS Lambda 함수 생성(콘솔).

  • 소비자 - 선택 사항에서 Kinesis 스트림의 ARN을 입력합니다.

  • 배치 크기10을 입력합니다.

AWS CLI

다음 명령을 실행하여 Lambda 함수 트리거를 생성합니다.

aws lambda create-event-source-mapping \ --function-name your-lambda-name \ --event-source arn:aws:kinesis:your-region:your-account-id:stream/your-kinesis-stream-name \ --batch-size 10 \ --starting-position LATEST \ --region your-region

9단계: 데이터 수집 및 출력 기능 테스트(AWS CLI)

감지기 모델에서 정의한 내용을 기반으로 MQTT 주제에 페이로드를 게시합니다. 다음은 구현을 테스트하기 위한 MQTT 주제의 your-topic-name 페이로드 예제입니다.

{ "instanceId": "your-instance-id", "payload": "{\"temperature\":78}" }

다음(또는 유사한) 콘텐츠가 포함된 주제에 게시된 MQTT 메시지가 표시되어야 합니다.

{ "state": "alarm detected, timer started" }