設定新裝置以測試 V2 上的 V1 應用程式 (Greengrass nucleus lite) V2 - AWS IoT Greengrass

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

設定新裝置以測試 V2 上的 V1 應用程式 (Greengrass nucleus lite) V2

使用 Greengrass nucleus lite 設定新裝置,以測試您建立的一般元件,將 AWS IoT Greengrass V1 應用程式遷移至 V2。Greengrass nucleus lite 是一種輕量型執行時間,針對資源受限的裝置進行最佳化。您可以使用此裝置來開發和測試執行原生程序的自訂 Greengrass 元件。在 Greengrass nucleus lite 裝置上測試應用程式後,您可以將 V2 元件部署到執行 Greengrass nucleus lite 的其他裝置,或將現有的 V1 核心裝置升級至 V2。

步驟 1:在新裝置上安裝 Greengrass nucleus lite

在新裝置上安裝 Greengrass nucleus lite。遵循 Greengrass nucleus lite 安裝指南來設定裝置。

注意

Greengrass nucleus lite 現在不支援本機影子服務、用戶端裝置或連接器。在繼續本指南之前,請確定您的 V1 應用程式不依賴這些功能。

步驟 2:建立和部署一般元件以遷移 AWS IoT Greengrass V1 Lambda 函數

若要在 Greengrass nucleus lite 上複寫 AWS IoT Greengrass V1 Lambda 函數的功能,您必須將其轉換為一般元件。這包括重寫您的 Lambda 函數程式碼,以使用 AWS IoT Device SDK V2 或 AWS IoT Greengrass 元件 SDK,而非 AWS IoT Greengrass 核心 SDK。

下表列出本指南中 V2 元件範例中所使用的 SDKs:

以下範例顯示使用 AWS IoT Greengrass V1 Core SDK 及其同等一般元件的 Lambda 函數,其中包含兩種主要案例的多個程式設計語言的元件程式碼、配方和建置說明:

  • 本機通訊 - 使用本機 pub/sub 與相同裝置上的其他元件通訊的元件

  • 雲端通訊 - 與 AWS IoT Core 或其他 AWS 服務通訊的元件

案例 1:本機通訊 (發佈者 → 處理器 → 訂閱者)

此案例示範將使用本機 pub/sub 通訊的 V1 Lambda 函數轉換為 V2 一般元件。

應用程式架構

此範例包含三個元件:

  • 發佈者 Lambda 發佈溫度資料

  • 處理器 Lambda 接收和處理資料

  • 處理器 Lambda 會將處理的結果發佈給訂閱者 Lambda

下面的程式碼範例著重於處理器 Lambda,這示範了訂閱和發佈訊息以進行本機通訊。

V1 群組訂閱

在 中 AWS IoT Greengrass V1,下列群組訂閱會啟用 Lambda 函數之間的通訊:

訂閱 1:發佈者 → 處理器

  • 來源:Lambda (發佈者)

  • 目標:Lambda (處理器)

  • 主題:感應器/溫度

訂閱 2:處理器 → 訂閱者

  • 來源:Lambda (處理器)

  • 目標:Lambda (訂閱者)

  • 主題:lambda/alerts

處理器 Lambda 函數 (V1)

Python
import greengrasssdk import json iot_client = greengrasssdk.client('iot-data') def lambda_handler(event, context): """ Receives temperature from publisher Lambda, processes it, and forwards to subscriber Lambda """ # Receive from publisher Lambda. sensor_id = event['sensor_id'] temperature = event['temperature'] print(f"Received from sensor {sensor_id}: {temperature}°F") # Process: Check if temperature is high. if temperature > 80: alert_data = { 'sensor_id': sensor_id, 'temperature': temperature, 'alert': 'HIGH_TEMPERATURE' } # Publish to another Lambda using greengrasssdk. iot_client.publish( topic='lambda/alerts', payload=json.dumps(alert_data) ) print(f"Alert sent to subscriber Lambda") return {'statusCode': 200}
Java
import com.amazonaws.greengrass.javasdk.IotDataClient; import com.amazonaws.greengrass.javasdk.model.PublishRequest; import com.amazonaws.services.lambda.runtime.Context; import com.google.gson.Gson; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; public class TemperatureProcessorLambda { private static final Gson gson = new Gson(); private final IotDataClient iotDataClient; public TemperatureProcessorLambda() { this.iotDataClient = new IotDataClient(); } public String handleRequest(Map<String, Object> event, Context context) { /* * Receives temperature from publisher Lambda, * processes it, and forwards to subscriber Lambda */ // Receive from publisher Lambda. String sensorId = (String) event.get("sensor_id"); Number temp = (Number) event.get("temperature"); int temperature = temp.intValue(); System.out.println("Received from sensor " + sensorId + ": " + temperature + "°F"); // Process: Check if temperature is high. if (temperature > 80) { Map<String, Object> alertData = new HashMap<>(); alertData.put("sensor_id", sensorId); alertData.put("temperature", temperature); alertData.put("alert", "HIGH_TEMPERATURE"); // Publish to another Lambda using greengrasssdk. String payload = gson.toJson(alertData); PublishRequest publishRequest = new PublishRequest() .withTopic("lambda/alerts") .withPayload(ByteBuffer.wrap(payload.getBytes(StandardCharsets.UTF_8))); iotDataClient.publish(publishRequest); System.out.println("Alert sent to subscriber Lambda"); } return "Success"; } }
JavaScript
const greengrasssdk = require('aws-greengrass-core-sdk'); const iotClient = new greengrasssdk.IotData(); /** * Greengrass v1 Lambda function * Receives temperature from publisher Lambda, * processes it, and forwards to subscriber Lambda */ exports.handler = function(event, context) { // Receive from publisher Lambda. const sensorId = event.sensor_id; const temperature = event.temperature; console.log(`Received from sensor ${sensorId}: ${temperature}°F`); // Process: Check if temperature is high. if (temperature > 80) { const alertData = { sensor_id: sensorId, temperature: temperature, alert: 'HIGH_TEMPERATURE' }; // Publish to another Lambda using greengrasssdk. const params = { topic: 'lambda/alerts', payload: JSON.stringify(alertData) }; iotClient.publish(params, (err) => { if (err) { console.error('Error publishing alert:', err); context.fail(err); } else { console.log('Alert sent to subscriber Lambda'); context.succeed('Success'); } }); } else { context.succeed('Success'); } };
C
#include <aws/greengrass/greengrasssdk.h> #include <stdio.h> #include <string.h> #include <jansson.h> // For JSON parsing. static aws_greengrass_iot_data_client *iot_client = NULL; void on_message_received(const char *topic, const uint8_t *payload, size_t payload_len, void *user_data) { // Parse the incoming message. char *payload_str = strndup((char *)payload, payload_len); json_error_t error; json_t *event = json_loads(payload_str, 0, &error); free(payload_str); if (!event) { fprintf(stderr, "Error parsing JSON: %s\n", error.text); return; } // Receive from publisher Lambda. json_t *sensor_id_obj = json_object_get(event, "sensor_id"); json_t *temperature_obj = json_object_get(event, "temperature"); const char *sensor_id = json_string_value(sensor_id_obj); int temperature = json_integer_value(temperature_obj); printf("Received from sensor %s: %d°F\n", sensor_id, temperature); // Process: Check if temperature is high. if (temperature > 80) { // Create alert data. json_t *alert_data = json_object(); json_object_set_new(alert_data, "sensor_id", json_string(sensor_id)); json_object_set_new(alert_data, "temperature", json_integer(temperature)); json_object_set_new(alert_data, "alert", json_string("HIGH_TEMPERATURE")); // Convert to JSON string. char *alert_payload = json_dumps(alert_data, JSON_COMPACT); // Publish to another Lambda using greengrasssdk. aws_greengrass_publish_params params = { .topic = "lambda/alerts", .payload = (uint8_t *)alert_payload, .payload_len = strlen(alert_payload) }; aws_greengrass_iot_data_publish(iot_client, &params); printf("Alert sent to subscriber Lambda\n"); free(alert_payload); json_decref(alert_data); } json_decref(event); } int main(int argc, char *argv[]) { // Initialize Greengrass SDK. iot_client = aws_greengrass_iot_data_client_new(); // Subscribe to temperature sensor topic. aws_greengrass_subscribe_params subscribe_params = { .topic = "sensors/temperature", .callback = on_message_received, .user_data = NULL }; aws_greengrass_iot_data_subscribe(iot_client, &subscribe_params); printf("Temperature Processor Lambda started\n"); printf("Subscribed to sensors/temperature\n"); printf("Waiting for sensor data...\n"); // Keep the Lambda running. while (1) { sleep(1); } return 0; }
C++
#include <aws/greengrass/greengrasssdk.h> #include <iostream> #include <string> #include <memory> #include <jansson.h> // For JSON parsing. #include <unistd.h> class TemperatureProcessor { private: std::unique_ptr<aws_greengrass_iot_data_client, decltype(&aws_greengrass_iot_data_client_destroy)> iot_client; static void message_callback_wrapper(const char *topic, const uint8_t *payload, size_t payload_len, void *user_data) { auto* processor = static_cast<TemperatureProcessor*>(user_data); processor->on_message_received(topic, payload, payload_len); } public: TemperatureProcessor() : iot_client(aws_greengrass_iot_data_client_new(), aws_greengrass_iot_data_client_destroy) { if (!iot_client) { throw std::runtime_error("Failed to create Greengrass IoT client"); } } void on_message_received(const char *topic, const uint8_t *payload, size_t payload_len) { // Parse the incoming message. std::string payload_str(reinterpret_cast<const char*>(payload), payload_len); json_error_t error; json_t *event = json_loads(payload_str.c_str(), 0, &error); if (!event) { std::cerr << "Error parsing JSON: " << error.text << std::endl; return; } json_t *sensor_id_obj = json_object_get(event, "sensor_id"); json_t *temperature_obj = json_object_get(event, "temperature"); const char *sensor_id = json_string_value(sensor_id_obj); int temperature = json_integer_value(temperature_obj); std::cout << "Received from sensor " << sensor_id << ": " << temperature << "°F" << std::endl; if (temperature > 80) { send_alert(sensor_id, temperature); } json_decref(event); } void send_alert(const char *sensor_id, int temperature) { // Create alert data. json_t *alert_data = json_object(); json_object_set_new(alert_data, "sensor_id", json_string(sensor_id)); json_object_set_new(alert_data, "temperature", json_integer(temperature)); json_object_set_new(alert_data, "alert", json_string("HIGH_TEMPERATURE")); // Convert to JSON string. char *alert_payload = json_dumps(alert_data, JSON_COMPACT); // Publish to another Lambda using greengrasssdk. aws_greengrass_publish_params params = { .topic = "lambda/alerts", .payload = reinterpret_cast<uint8_t*>(alert_payload), .payload_len = strlen(alert_payload) }; aws_greengrass_iot_data_publish(iot_client.get(), &params); std::cout << "Alert sent to subscriber Lambda" << std::endl; free(alert_payload); json_decref(alert_data); } void subscribe_to_topic(const std::string& topic) { aws_greengrass_subscribe_params subscribe_params = { .topic = topic.c_str(), .callback = message_callback_wrapper, .user_data = this }; aws_greengrass_iot_data_subscribe(iot_client.get(), &subscribe_params); std::cout << "Temperature Processor Lambda started" << std::endl; std::cout << "Subscribed to " << topic << std::endl; std::cout << "Waiting for sensor data..." << std::endl; } void run() { // Keep the Lambda running. while (true) { sleep(1); } } }; int main(int argc, char *argv[]) { try { TemperatureProcessor processor; processor.subscribe_to_topic("sensors/temperature"); processor.run(); } catch (const std::exception& e) { std::cerr << "Error: " << e.what() << std::endl; return 1; } return 0; }

一般元件 (V2)

若要在 中實現相同的功能 AWS IoT Greengrass V2,請使用下列項目建立一般元件:

1. 元件程式碼
Python

先決條件:使用此元件程式碼之前,請在 Greengrass 裝置上安裝並驗證 AWS IoT Device SDK 適用於 Python 的 :

# Install the SDK pip3 install awsiotsdk # Verify installation python3 -c "import awsiot.greengrasscoreipc.clientv2; print('SDK installed successfully')"

如果您在安裝期間遇到相依性衝突,請嘗試安裝特定版本的 AWS IoT Device SDK。

如果驗證命令列印「SDK 安裝成功」,您就可以使用元件程式碼:

from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( PublishMessage, JsonMessage ) import time ipc_client = GreengrassCoreIPCClientV2() def on_sensor_data(event): """ Receives temperature from sensor publisher component, processes it, and forwards to alert component """ try: # Receive from publisher component. data = event.json_message.message sensor_id = data['sensor_id'] temperature = data['temperature'] print(f"Received from sensor {sensor_id}: {temperature}°F") # Process: Check if temperature is high. if temperature > 80: alert_data = { 'sensor_id': sensor_id, 'temperature': temperature, 'alert': 'HIGH_TEMPERATURE' } # Publish to another component (AlertHandler). ipc_client.publish_to_topic( topic='component/alerts', publish_message=PublishMessage( json_message=JsonMessage(message=alert_data) ) ) print(f"Alert sent to AlertHandler component") except Exception as e: print(f"Error processing sensor data: {e}") def main(): print("Temperature Processor component starting...") # Subscribe to sensor data from publisher component. ipc_client.subscribe_to_topic( topic='sensors/temperature', on_stream_event=on_sensor_data ) print("Subscribed to sensors/temperature") print("Waiting for sensor data...") # Keep running. while True: time.sleep(1) if __name__ == '__main__': main()
Java
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.model.PublishMessage; import software.amazon.awssdk.aws.greengrass.model.PublishToTopicRequest; import software.amazon.awssdk.aws.greengrass.model.JsonMessage; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicRequest; import software.amazon.awssdk.aws.greengrass.model.SubscriptionResponseMessage; import java.util.HashMap; import java.util.Map; import java.util.Optional; public class TemperatureProcessor { private static GreengrassCoreIPCClientV2 ipcClient; public static void main(String[] args) { System.out.println("Temperature Processor component starting..."); try (GreengrassCoreIPCClientV2 client = GreengrassCoreIPCClientV2.builder().build()) { ipcClient = client; SubscribeToTopicRequest subscribeRequest = new SubscribeToTopicRequest() .withTopic("sensors/temperature"); ipcClient.subscribeToTopic( subscribeRequest, TemperatureProcessor::onSensorData, Optional.empty(), Optional.empty() ); System.out.println("Subscribed to sensors/temperature"); System.out.println("Waiting for sensor data..."); while (true) { Thread.sleep(1000); } } catch (Exception e) { System.err.println("Error: " + e.getMessage()); e.printStackTrace(); } } public static void onSensorData(SubscriptionResponseMessage message) { try { Map<String, Object> data = message.getJsonMessage().getMessage(); String sensorId = (String) data.get("sensor_id"); Number temp = (Number) data.get("temperature"); int temperature = temp.intValue(); System.out.println("Received from sensor " + sensorId + ": " + temperature + "F"); if (temperature > 80) { Map<String, Object> alertData = new HashMap<>(); alertData.put("sensor_id", sensorId); alertData.put("temperature", temperature); alertData.put("alert", "HIGH_TEMPERATURE"); JsonMessage jsonMessage = new JsonMessage().withMessage(alertData); PublishMessage publishMessage = new PublishMessage().withJsonMessage(jsonMessage); PublishToTopicRequest publishRequest = new PublishToTopicRequest() .withTopic("component/alerts") .withPublishMessage(publishMessage); ipcClient.publishToTopic(publishRequest); System.out.println("Alert sent to AlertHandler component"); } } catch (Exception e) { System.err.println("Error processing sensor data: " + e.getMessage()); } } }
JavaScript
const greengrasscoreipc = require('aws-iot-device-sdk-v2').greengrasscoreipc; class TemperatureProcessor { constructor() { this.ipcClient = null; } async start() { console.log('Temperature Processor component starting...'); try { this.ipcClient = greengrasscoreipc.createClient(); await this.ipcClient.connect(); const subscribeRequest = { topic: 'sensors/temperature' }; const streamingOperation = this.ipcClient.subscribeToTopic(subscribeRequest); streamingOperation.on('message', (message) => { this.onSensorData(message); }); streamingOperation.on('streamError', (error) => { console.error('Stream error:', error); }); streamingOperation.on('ended', () => { console.log('Subscription stream ended'); }); await streamingOperation.activate(); console.log('Subscribed to sensors/temperature'); console.log('Waiting for sensor data...'); } catch (error) { console.error('Error starting component:', error); process.exit(1); } } async onSensorData(message) { try { const data = message.jsonMessage.message; const sensorId = data.sensor_id; const temperature = data.temperature; console.log(`Received from sensor ${sensorId}: ${temperature}°F`); if (temperature > 80) { const alertData = { sensor_id: sensorId, temperature: temperature, alert: 'HIGH_TEMPERATURE' }; const publishRequest = { topic: 'component/alerts', publishMessage: { jsonMessage: { message: alertData } } }; await this.ipcClient.publishToTopic(publishRequest); console.log('Alert sent to AlertHandler component'); } } catch (error) { console.error('Error processing sensor data:', error); } } } // Start the component. const processor = new TemperatureProcessor(); processor.start();
C
#include <gg/buffer.h> #include <gg/error.h> #include <gg/ipc/client.h> #include <gg/map.h> #include <gg/object.h> #include <gg/sdk.h> #include <unistd.h> #include <stdbool.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #define SUBSCRIBE_TOPIC "sensors/temperature" #define PUBLISH_TOPIC "component/alerts" typedef struct { char sensor_id[64]; int64_t temperature; } AlertData; static pthread_mutex_t alert_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t alert_cond = PTHREAD_COND_INITIALIZER; static AlertData pending_alert; static bool has_pending_alert = false; static void *alert_publisher_thread(void *arg) { (void) arg; while (true) { pthread_mutex_lock(&alert_mutex); while (!has_pending_alert) { pthread_cond_wait(&alert_cond, &alert_mutex); } AlertData alert = pending_alert; has_pending_alert = false; pthread_mutex_unlock(&alert_mutex); GgBuffer sensor_id_buf = { .data = (uint8_t *)alert.sensor_id, .len = strlen(alert.sensor_id) }; GgMap payload = GG_MAP( gg_kv(GG_STR("sensor_id"), gg_obj_buf(sensor_id_buf)), gg_kv(GG_STR("temperature"), gg_obj_i64(alert.temperature)), gg_kv(GG_STR("alert"), gg_obj_buf(GG_STR("HIGH_TEMPERATURE"))) ); GgError ret = ggipc_publish_to_topic_json(GG_STR(PUBLISH_TOPIC), payload); if (ret != GG_ERR_OK) { fprintf(stderr, "Failed to publish alert\n"); } else { printf("Alert sent to AlertHandler component\n"); } } return NULL; } static void on_sensor_data( void *ctx, GgBuffer topic, GgObject payload, GgIpcSubscriptionHandle handle ) { (void) ctx; (void) topic; (void) handle; if (gg_obj_type(payload) != GG_TYPE_MAP) { fprintf(stderr, "Expected JSON message\n"); return; } GgMap map = gg_obj_into_map(payload); GgObject *sensor_id_obj; if (!gg_map_get(map, GG_STR("sensor_id"), &sensor_id_obj)) { fprintf(stderr, "Missing sensor_id field\n"); return; } GgBuffer sensor_id = gg_obj_into_buf(*sensor_id_obj); GgObject *temperature_obj; if (!gg_map_get(map, GG_STR("temperature"), &temperature_obj)) { fprintf(stderr, "Missing temperature field\n"); return; } int64_t temperature = gg_obj_into_i64(*temperature_obj); printf("Received from sensor %.*s: %lld°F\n", (int)sensor_id.len, sensor_id.data, (long long)temperature); if (temperature > 80) { pthread_mutex_lock(&alert_mutex); snprintf(pending_alert.sensor_id, sizeof(pending_alert.sensor_id), "%.*s", (int)sensor_id.len, sensor_id.data); pending_alert.temperature = temperature; has_pending_alert = true; pthread_cond_signal(&alert_cond); pthread_mutex_unlock(&alert_mutex); } } int main(void) { setvbuf(stdout, NULL, _IONBF, 0); printf("Temperature Processor component starting...\n"); gg_sdk_init(); GgError ret = ggipc_connect(); if (ret != GG_ERR_OK) { fprintf(stderr, "Failed to connect to Greengrass nucleus\n"); exit(1); } printf("Connected to Greengrass IPC\n"); // Start alert publisher thread. pthread_t alert_thread; if (pthread_create(&alert_thread, NULL, alert_publisher_thread, NULL) != 0) { fprintf(stderr, "Failed to create alert publisher thread\n"); exit(1); } GgIpcSubscriptionHandle handle; ret = ggipc_subscribe_to_topic( GG_STR(SUBSCRIBE_TOPIC), &on_sensor_data, NULL, &handle ); if (ret != GG_ERR_OK) { fprintf(stderr, "Failed to subscribe to topic\n"); exit(1); } printf("Subscribed to %s\n", SUBSCRIBE_TOPIC); printf("Waiting for sensor data...\n"); // Keep running. while (true) { sleep(1); } return 0; }
C++
#include <gg/ipc/client.hpp> #include <gg/buffer.hpp> #include <gg/object.hpp> #include <gg/types.hpp> #include <chrono> #include <condition_variable> #include <iostream> #include <mutex> #include <string> #include <string_view> #include <thread> struct AlertData { std::string sensor_id; int64_t temperature; }; static std::mutex alert_mutex; static std::condition_variable alert_cv; static AlertData pending_alert; static bool has_pending_alert = false; void alert_publisher_thread() { auto& client = gg::ipc::Client::get(); while (true) { std::unique_lock<std::mutex> lock(alert_mutex); alert_cv.wait(lock, [] { return has_pending_alert; }); AlertData alert = pending_alert; has_pending_alert = false; lock.unlock(); // Create alert payload as JSON string. std::string json_payload = "{\"sensor_id\":\"" + alert.sensor_id + "\",\"temperature\":" + std::to_string(alert.temperature) + ",\"alert\":\"HIGH_TEMPERATURE\"}"; // Convert to Buffer and publish. gg::Buffer buffer(json_payload); auto error = client.publish_to_topic("component/alerts", buffer); if (error) { std::cerr << "Failed to publish alert\n"; } else { std::cout << "Alert sent to AlertHandler component\n"; } } } class SensorCallback : public gg::ipc::LocalTopicCallback { void operator()( std::string_view topic, gg::Object payload, gg::ipc::Subscription& handle ) override { (void) topic; (void) handle; // Payload is a Buffer containing JSON string. if (payload.index() != GG_TYPE_BUF) { std::cerr << "Expected Buffer message\n"; return; } // Extract buffer using gg::get. auto buffer = gg::get<std::span<uint8_t>>(payload); std::string json_str(reinterpret_cast<const char*>(buffer.data()), buffer.size()); // Simple JSON parsing for demo. std::string sensor_id = "sensor1"; int64_t temperature = 0; // Extract temperature (simple string search). size_t temp_pos = json_str.find("\"temperature\":"); if (temp_pos != std::string::npos) { temp_pos += 14; // Skip "temperature". size_t end_pos = json_str.find_first_of(",}", temp_pos); if (end_pos != std::string::npos) { temperature = std::stoll(json_str.substr(temp_pos, end_pos - temp_pos)); } } std::cout << "Received from sensor " << sensor_id << ": " << temperature << "°F\n"; if (temperature > 80) { std::lock_guard<std::mutex> lock(alert_mutex); pending_alert = {sensor_id, temperature}; has_pending_alert = true; alert_cv.notify_one(); } } }; int main() { // Disable stdout buffering for real-time logging. std::cout.setf(std::ios::unitbuf); std::cout << "Temperature Processor component starting..." << std::endl; auto& client = gg::ipc::Client::get(); std::cout << "Got client instance" << std::endl; auto error = client.connect(); std::cout << "Connect returned, error code: " << error.value() << std::endl; if (error) { std::cerr << "Failed to connect to Greengrass nucleus: " << error.message() << std::endl; return 1; } std::cout << "Connected to Greengrass IPC" << std::endl; // Start alert publisher thread. std::thread alert_thread(alert_publisher_thread); alert_thread.detach(); // Handler must be static lifetime if subscription handle is not held. static SensorCallback handler; error = client.subscribe_to_topic("sensors/temperature", handler); if (error) { std::cerr << "Failed to subscribe to topic: " << error.message() << std::endl; return 1; } std::cout << "Subscribed to sensors/temperature" << std::endl; std::cout << "Waiting for sensor data..." << std::endl; // Keep running. while (true) { using namespace std::chrono_literals; std::this_thread::sleep_for(1s); } return 0; }
2. 建置和封裝元件

有些語言需要在部署之前建置或封裝。

Python

Python 不需要編譯。元件可以直接使用 .py 檔案。

Java

若要建置綁定所有相依性的可執行 JAR:

  1. 在專案目錄中建立pom.xml檔案:

    <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <!-- Basic project information: organization, component name, and version --> <groupId>com.example</groupId> <artifactId>temperature-processor</artifactId> <version>1.0.0</version> <properties> <!-- Java 11 LTS (Long Term Support) is recommended for Greengrass v2 components --> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> <dependencies> <!-- AWS IoT Device SDK for Java - provides IPC client for Greengrass v2 local communication --> <dependency> <groupId>software.amazon.awssdk.iotdevicesdk</groupId> <artifactId>aws-iot-device-sdk</artifactId> <version>1.25.1</version> </dependency> </dependencies> <build> <plugins> <!-- Maven Shade Plugin - creates a standalone JAR with all dependencies included for Greengrass deployment --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <!-- Set the main class for the executable JAR --> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>TemperatureProcessor</mainClass> </transformer> </transformers> <filters> <!-- Exclude signature files to avoid security exceptions --> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
  2. 建置 JAR:

    mvn clean package

    這會target/temperature-processor-1.0.0.jar建立包含所有相依性的 。

  3. 將 JAR 上傳至 S3 儲存貯體以進行部署。

JavaScript

若要使用所有相依性封裝 Node.js 元件:

  1. 建立package.json檔案:

    { "name": "temperature-processor", "version": "1.0.0", "description": "Temperature processor component for Greengrass v2", "main": "TemperatureProcessor.js", "dependencies": { "aws-iot-device-sdk-v2": "^1.21.0" }, "engines": { "node": ">=14.0.0" } }
  2. 在開發機器上安裝相依性:

    npm install

    這會建立包含 AWS AWS IoT Device SDK v2 的node_modules資料夾。

  3. 用於部署的套件:

    zip -r TemperatureProcessor.zip TemperatureProcessor.js node_modules/ package.json
  4. 將 zip 檔案上傳至 S3 儲存貯體以進行部署。

注意

Greengrass 裝置必須安裝 Node.js 執行期 (14 版或更新版本)。您不需要npm install在 Greengrass 核心裝置上執行 ,因為元件成品包含綁定node_modules資料夾中的所有相依性。

C

先決條件:

若要建置 SDK 和 元件,您需要下列建置相依性:

  • GCC 或 Clang

  • CMake (至少 3.22 版)

  • Make 或 Ninja

安裝建置相依性:

在 Ubuntu/Debian 上:

sudo apt install build-essential cmake

在 Amazon Linux 上:

sudo yum install gcc cmake make

為您的元件建立 CMakeLists.txt 檔案:

cmake_minimum_required(VERSION 3.10) project(TemperatureProcessor C) set(CMAKE_C_STANDARD 11) # Add AWS Greengrass Component SDK add_subdirectory(aws-greengrass-component-sdk) # Build your component executable add_executable(temperature_processor temperature_processor.c) target_link_libraries(temperature_processor gg-sdk)

建置步驟:

# Clone the AWS Greengrass Component SDK into your project git clone https://github.com/aws-greengrass/aws-greengrass-component-sdk.git # Build your component cmake -B build -D CMAKE_BUILD_TYPE=MinSizeRel make -C build -j$(nproc) # The binary 'temperature_processor' is in ./build/ # Upload this binary to S3 for deployment
C++

先決條件:

若要建置 SDK 和 元件,您需要下列建置相依性:

  • 支援 C++20 的 GCC 或 Clang

  • CMake (至少 3.22 版)

  • Make 或 Ninja

安裝建置相依性:

在 Ubuntu/Debian 上:

sudo apt install build-essential cmake

在 Amazon Linux 上:

sudo yum install gcc-c++ cmake make

為您的元件建立 CMakeLists.txt 檔案:

cmake_minimum_required(VERSION 3.10) project(TemperatureProcessor CXX) set(CMAKE_CXX_STANDARD 20) # Add SDK as subdirectory add_subdirectory(aws-greengrass-component-sdk) # Add C++ SDK subdirectory add_subdirectory(aws-greengrass-component-sdk/cpp) add_executable(temperature_processor temperature_processor.cpp) target_link_libraries(temperature_processor gg-sdk++)

建置步驟:

# Clone the AWS Greengrass Component SDK into your project git clone https://github.com/aws-greengrass/aws-greengrass-component-sdk.git # Build your component cmake -B build -D CMAKE_BUILD_TYPE=MinSizeRel make -C build -j$(nproc) # The binary 'temperature_processor' will be in ./build/ # Upload this binary to S3 for deployment
3. 元件配方

使用您元件使用的實際主題來更新「資源」陣列。

Python
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.TemperatureProcessor", "ComponentVersion": "1.0.0", "ComponentType": "aws.greengrass.generic", "ComponentDescription": "Receives sensor data and forwards alerts to AlertHandler", "ComponentPublisher": "[Your Company]", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.TemperatureProcessor:pubsub:1": { "policyDescription": "Allows access to subscribe to sensor topics", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "sensors/temperature" ] }, "com.example.TemperatureProcessor:pubsub:2": { "policyDescription": "Allows access to publish to alert topics", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "component/alerts" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "python3 -u {artifacts:path}/temperature_processor.py" }, "Artifacts": [ { "Uri": "s3://YOUR-BUCKET/artifacts/com.example.TemperatureProcessor/1.0.0/temperature_processor.py" } ] } ] }
Java
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.TemperatureProcessor", "ComponentVersion": "1.0.0", "ComponentType": "aws.greengrass.generic", "ComponentDescription": "Receives sensor data and forwards alerts to AlertHandler", "ComponentPublisher": "[Your Company]", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.TemperatureProcessor:pubsub:1": { "policyDescription": "Allows access to subscribe to sensor topics", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "sensors/temperature" ] }, "com.example.TemperatureProcessor:pubsub:2": { "policyDescription": "Allows access to publish to alert topics", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "component/alerts" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "java -jar {artifacts:path}/TemperatureProcessor.jar" }, "Artifacts": [ { "Uri": "s3://YOUR-BUCKET/artifacts/com.example.TemperatureProcessor/1.0.0/TemperatureProcessor.jar" } ] } ] }
JavaScript
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.TemperatureProcessor", "ComponentVersion": "1.0.0", "ComponentType": "aws.greengrass.generic", "ComponentDescription": "Receives sensor data and forwards alerts to AlertHandler", "ComponentPublisher": "[Your Company]", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.TemperatureProcessor:pubsub:1": { "policyDescription": "Allows access to subscribe to sensor topics", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "sensors/temperature" ] }, "com.example.TemperatureProcessor:pubsub:2": { "policyDescription": "Allows access to publish to alert topics", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "component/alerts" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "cd {artifacts:decompressedPath}/TemperatureProcessor && node TemperatureProcessor.js" }, "Artifacts": [ { "Uri": "s3://YOUR-BUCKET/artifacts/com.example.TemperatureProcessor/1.0.0/TemperatureProcessor.zip", "Unarchive": "ZIP" } ] } ] }
C/C++
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.TemperatureProcessor", "ComponentVersion": "1.0.0", "ComponentType": "aws.greengrass.generic", "ComponentDescription": "Receives sensor data and forwards alerts to AlertHandler", "ComponentPublisher": "[Your Company]", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.TemperatureProcessor:pubsub:1": { "policyDescription": "Allows access to subscribe to sensor topics", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "sensors/temperature" ] }, "com.example.TemperatureProcessor:pubsub:2": { "policyDescription": "Allows access to publish to alert topics", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "component/alerts" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/temperature_processor" }, "Artifacts": [ { "Uri": "s3://YOUR-BUCKET/artifacts/com.example.TemperatureProcessor/1.0.0/temperature_processor" } ] } ] }

案例 2:雲端通訊

此案例示範將與 通訊的 V1 Lambda 函數轉換為 V2 AWS IoT Core 一般元件。

應用程式架構

此範例使用雲端連線架構:

  • AWS IoT Core 將命令傳送至裝置

  • 控制器 Lambda 接收並處理命令

  • 控制器 Lambda 將遙測資料傳回至 AWS IoT Core

此範例著重於控制器 Lambda,示範從 接收訊息以及向其發佈訊息 AWS IoT Core。

V1 群組訂閱

在 中 AWS IoT Greengrass V1,下列群組訂閱可啟用 Lambda 函數與 之間的通訊 AWS IoT Core:

訂閱 1:IoT 雲端 → Lambda

  • 來源:IoT Cloud

  • 目標:Lambda (DeviceController)

  • 主題:命令/裝置 1

訂閱 2:Lambda → IoT 雲端

  • 來源:Lambda (DeviceController)

  • 目標:IoT 雲端

  • 主題:遙測/裝置1

控制器 Lambda 函數 (V1)

Python
import greengrasssdk import json import time iot_client = greengrasssdk.client('iot-data') def lambda_handler(event, context): """ Receives commands from IoT Core, processes them, and sends telemetry back to cloud """ # Receive command from IoT Core. command = event.get('command') device_id = event.get('device_id', 'device1') print(f"Received command from cloud: {command}") # Process command. if command == 'get_status': status = get_device_status() # Send telemetry back to IoT Core. telemetry_data = { 'device_id': device_id, 'status': status, 'timestamp': time.time() } iot_client.publish( topic=f'telemetry/{device_id}', payload=json.dumps(telemetry_data) ) print(f"Telemetry sent to cloud: {telemetry_data}") return {'statusCode': 200} def get_device_status(): """Get current device status""" # Simulate getting device status. return 'online'
Java
import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.greengrass.javasdk.IotDataClient; import com.amazonaws.greengrass.javasdk.model.PublishRequest; import com.google.gson.Gson; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; public class DeviceControllerLambda { private static final Gson gson = new Gson(); private final IotDataClient iotDataClient; public DeviceControllerLambda() { this.iotDataClient = new IotDataClient(); } public String handleRequest(Map<String, Object> event, Context context) { /* * Receives commands from IoT Core, * processes them, and sends telemetry back to cloud */ // Receive command from IoT Core. String command = (String) event.get("command"); String deviceId = event.containsKey("device_id") ? (String) event.get("device_id") : "device1"; System.out.println("Received command from cloud: " + command); // Process command. if ("get_status".equals(command)) { String status = getDeviceStatus(); // Send telemetry back to IoT Core. Map<String, Object> telemetryData = new HashMap<>(); telemetryData.put("device_id", deviceId); telemetryData.put("status", status); telemetryData.put("timestamp", System.currentTimeMillis() / 1000.0); String payload = gson.toJson(telemetryData); PublishRequest publishRequest = new PublishRequest() .withTopic("telemetry/" + deviceId) .withPayload(ByteBuffer.wrap(payload.getBytes(StandardCharsets.UTF_8))); iotDataClient.publish(publishRequest); System.out.println("Telemetry sent to cloud: " + telemetryData); } return "Success"; } private String getDeviceStatus() { // Simulate getting device status. return "online"; } }
JavaScript
const greengrasssdk = require('aws-greengrass-core-sdk'); const iotClient = new greengrasssdk.IotData(); /** * Receives commands from IoT Core and sends telemetry back. */ exports.handler = function(event, context) { console.log('Received command from IoT Core:', JSON.stringify(event)); const command = event.command; const deviceId = event.device_id || 'device1'; console.log(`Processing command: ${command}`); if (command === 'get_status') { const status = 'online'; const telemetryData = { device_id: deviceId, status: status, timestamp: Date.now() / 1000 }; // Publish telemetry to IoT Core using greengrasssdk. const params = { topic: `telemetry/${deviceId}`, payload: JSON.stringify(telemetryData) }; iotClient.publish(params, (err) => { if (err) { console.error('Error publishing telemetry:', err); context.fail(err); } else { console.log('Telemetry sent to IoT Core:', JSON.stringify(telemetryData)); context.succeed('Success'); } }); } else { context.succeed('Success'); } };
C
#include <aws/greengrass/greengrasssdk.h> #include <stdio.h> #include <string.h> #include <jansson.h> #include <time.h> static aws_greengrass_iot_data_client *iot_client = NULL; const char* get_device_status(void) { // Simulate getting device status. return "online"; } void on_cloud_command(const char *topic, const uint8_t *payload, size_t payload_len, void *user_data) { // Parse incoming command from IoT Core. char *payload_str = strndup((char *)payload, payload_len); json_error_t error; json_t *event = json_loads(payload_str, 0, &error); free(payload_str); if (!event) { fprintf(stderr, "Error parsing JSON: %s\n", error.text); return; } // Extract command and device_id. json_t *command_obj = json_object_get(event, "command"); json_t *device_id_obj = json_object_get(event, "device_id"); const char *command = json_string_value(command_obj); const char *device_id = device_id_obj ? json_string_value(device_id_obj) : "device1"; printf("Received command from cloud: %s\n", command); // Process command. if (command && strcmp(command, "get_status") == 0) { const char *status = get_device_status(); // Send telemetry back to IoT Core. json_t *telemetry_data = json_object(); json_object_set_new(telemetry_data, "device_id", json_string(device_id)); json_object_set_new(telemetry_data, "status", json_string(status)); json_object_set_new(telemetry_data, "timestamp", json_real(time(NULL))); char *telemetry_payload = json_dumps(telemetry_data, JSON_COMPACT); // Publish telemetry to IoT Core. char telemetry_topic[256]; snprintf(telemetry_topic, sizeof(telemetry_topic), "telemetry/%s", device_id); aws_greengrass_publish_params params = { .topic = telemetry_topic, .payload = (uint8_t *)telemetry_payload, .payload_len = strlen(telemetry_payload) }; aws_greengrass_iot_data_publish(iot_client, &params); printf("Telemetry sent to cloud: %s\n", telemetry_payload); free(telemetry_payload); json_decref(telemetry_data); } json_decref(event); } int main(int argc, char *argv[]) { // Initialize Greengrass SDK. iot_client = aws_greengrass_iot_data_client_new(); // Subscribe to commands from IoT Core. aws_greengrass_subscribe_params subscribe_params = { .topic = "commands/device1", .callback = on_cloud_command, .user_data = NULL }; aws_greengrass_iot_data_subscribe(iot_client, &subscribe_params); printf("Device Controller Lambda started\n"); printf("Subscribed to commands/device1\n"); printf("Waiting for commands from IoT Core...\n"); // Keep the Lambda running. while (1) { sleep(1); } return 0; }
C++
#include <aws/greengrass/greengrasssdk.h> #include <iostream> #include <string> #include <memory> #include <jansson.h> #include <ctime> #include <unistd.h> class DeviceController { private: std::unique_ptr<aws_greengrass_iot_data_client, decltype(&aws_greengrass_iot_data_client_destroy)> iot_client; static void message_callback_wrapper(const char *topic, const uint8_t *payload, size_t payload_len, void *user_data) { auto* controller = static_cast<DeviceController*>(user_data); controller->on_cloud_command(topic, payload, payload_len); } std::string get_device_status() { // Simulate getting device status. return "online"; } public: DeviceController() : iot_client(aws_greengrass_iot_data_client_new(), aws_greengrass_iot_data_client_destroy) { if (!iot_client) { throw std::runtime_error("Failed to create Greengrass IoT client"); } } void on_cloud_command(const char *topic, const uint8_t *payload, size_t payload_len) { // Parse incoming command from IoT Core. std::string payload_str(reinterpret_cast<const char*>(payload), payload_len); json_error_t error; json_t *event = json_loads(payload_str.c_str(), 0, &error); if (!event) { std::cerr << "Error parsing JSON: " << error.text << std::endl; return; } // Extract command and device_id. json_t *command_obj = json_object_get(event, "command"); json_t *device_id_obj = json_object_get(event, "device_id"); const char *command = json_string_value(command_obj); const char *device_id = device_id_obj ? json_string_value(device_id_obj) : "device1"; std::cout << "Received command from cloud: " << command << std::endl; // Process command. if (command && std::string(command) == "get_status") { std::string status = get_device_status(); // Send telemetry back to IoT Core. json_t *telemetry_data = json_object(); json_object_set_new(telemetry_data, "device_id", json_string(device_id)); json_object_set_new(telemetry_data, "status", json_string(status.c_str())); json_object_set_new(telemetry_data, "timestamp", json_real(std::time(nullptr))); char *telemetry_payload = json_dumps(telemetry_data, JSON_COMPACT); // Publish telemetry to IoT Core. std::string telemetry_topic = "telemetry/" + std::string(device_id); aws_greengrass_publish_params params = { .topic = telemetry_topic.c_str(), .payload = reinterpret_cast<uint8_t*>(telemetry_payload), .payload_len = strlen(telemetry_payload) }; aws_greengrass_iot_data_publish(iot_client.get(), &params); std::cout << "Telemetry sent to cloud: " << telemetry_payload << std::endl; free(telemetry_payload); json_decref(telemetry_data); } json_decref(event); } void subscribe_to_topic(const std::string& topic) { aws_greengrass_subscribe_params subscribe_params = { .topic = topic.c_str(), .callback = message_callback_wrapper, .user_data = this }; aws_greengrass_iot_data_subscribe(iot_client.get(), &subscribe_params); std::cout << "Device Controller Lambda started" << std::endl; std::cout << "Subscribed to " << topic << std::endl; std::cout << "Waiting for commands from IoT Core..." << std::endl; } void run() { // Keep the Lambda running. while (true) { sleep(1); } } }; int main(int argc, char *argv[]) { try { DeviceController controller; controller.subscribe_to_topic("commands/device1"); controller.run(); } catch (const std::exception& e) { std::cerr << "Error: " << e.what() << std::endl; return 1; } return 0; }

一般元件 (V2)

若要在 中實現相同的功能 AWS IoT Greengrass V2,請使用下列項目建立一般元件:

1. 元件程式碼
Python

先決條件:使用此元件程式碼之前,請在 Greengrass 裝置上安裝並驗證 AWS IoT Device SDK 適用於 Python 的 :

# Install the SDK pip3 install awsiotsdk # Verify installation python3 -c "import awsiot.greengrasscoreipc.clientv2; print('SDK installed successfully')"

如果您在安裝期間遇到相依性衝突,請嘗試安裝特定版本的 AWS IoT Device SDK。

如果驗證命令列印「SDK 安裝成功」,您就可以使用元件程式碼:

from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import QOS import json import time ipc_client = GreengrassCoreIPCClientV2() def on_command(event): """ Receives commands from IoT Core, processes them, and sends telemetry back to cloud """ try: # Receive command from IoT Core. data = json.loads(event.message.payload.decode('utf-8')) command = data.get('command') device_id = data.get('device_id', 'device1') print(f"Received command from cloud: {command}") # Process command. if command == 'get_status': status = get_device_status() # Send telemetry back to IoT Core. telemetry_data = { 'device_id': device_id, 'status': status, 'timestamp': time.time() } ipc_client.publish_to_iot_core( topic_name=f'telemetry/{device_id}', qos=QOS.AT_LEAST_ONCE, payload=json.dumps(telemetry_data).encode('utf-8') ) print(f"Telemetry sent to cloud: {telemetry_data}") except Exception as e: print(f"Error processing command: {e}") def get_device_status(): """Get current device status""" # Simulate getting device status. return 'online' def main(): print("Device Controller component starting...") # Subscribe to commands from IoT Core. ipc_client.subscribe_to_iot_core( topic_name='commands/device1', qos=QOS.AT_LEAST_ONCE, on_stream_event=on_command ) print("Subscribed to commands/device1 from IoT Core") print("Waiting for commands from cloud...") # Keep running. while True: time.sleep(1) if __name__ == '__main__': main()
Java
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.model.QOS; import software.amazon.awssdk.aws.greengrass.model.SubscribeToIoTCoreRequest; import software.amazon.awssdk.aws.greengrass.model.IoTCoreMessage; import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.regex.Pattern; import java.util.regex.Matcher; public class DeviceController { private static GreengrassCoreIPCClientV2 ipcClient; public static void main(String[] args) { System.out.println("Device Controller component starting..."); try (GreengrassCoreIPCClientV2 client = GreengrassCoreIPCClientV2.builder().build()) { ipcClient = client; SubscribeToIoTCoreRequest subscribeRequest = new SubscribeToIoTCoreRequest() .withTopicName("commands/device1") .withQos(QOS.AT_LEAST_ONCE); ipcClient.subscribeToIoTCore( subscribeRequest, DeviceController::onCommand, Optional.empty(), Optional.empty() ); System.out.println("Subscribed to commands/device1 from IoT Core"); System.out.println("Waiting for commands from cloud..."); while (true) { Thread.sleep(1000); } } catch (Exception e) { System.err.println("Error: " + e.getMessage()); e.printStackTrace(); } } public static void onCommand(IoTCoreMessage message) { try { String payload = new String(message.getMessage().getPayload(), StandardCharsets.UTF_8); // Simple JSON parsing. String command = extractJsonValue(payload, "command"); String deviceId = extractJsonValue(payload, "device_id"); if (deviceId == null || deviceId.isEmpty()) { deviceId = "device1"; } System.out.println("Received command from cloud: " + command); if ("get_status".equals(command)) { String status = getDeviceStatus(); // Build JSON manually. String telemetryJson = String.format( "{\"device_id\":\"%s\",\"status\":\"%s\",\"timestamp\":%.3f}", deviceId, status, System.currentTimeMillis() / 1000.0 ); byte[] telemetryBytes = telemetryJson.getBytes(StandardCharsets.UTF_8); PublishToIoTCoreRequest publishRequest = new PublishToIoTCoreRequest() .withTopicName("telemetry/" + deviceId) .withQos(QOS.AT_LEAST_ONCE) .withPayload(telemetryBytes); ipcClient.publishToIoTCore(publishRequest); System.out.println("Telemetry sent to cloud: " + telemetryJson); } } catch (Exception e) { System.err.println("Error processing command: " + e.getMessage()); } } private static String extractJsonValue(String json, String key) { Pattern pattern = Pattern.compile("\"" + Pattern.quote(key) + "\"\\s*:\\s*\"([^\"]+)\""); Matcher matcher = pattern.matcher(json); return matcher.find() ? matcher.group(1) : null; } private static String getDeviceStatus() { return "online"; } }
JavaScript
const greengrasscoreipc = require('aws-iot-device-sdk-v2').greengrasscoreipc; class DeviceController { constructor() { this.ipcClient = null; } async start() { console.log('Device Controller component starting...'); try { this.ipcClient = greengrasscoreipc.createClient(); await this.ipcClient.connect(); const subscribeRequest = { topicName: 'commands/device1', qos: 1 }; const streamingOperation = this.ipcClient.subscribeToIoTCore(subscribeRequest); streamingOperation.on('message', (message) => { this.onCommand(message); }); streamingOperation.on('streamError', (error) => { console.error('Stream error:', error); }); streamingOperation.on('ended', () => { console.log('Subscription stream ended'); }); await streamingOperation.activate(); console.log('Subscribed to commands/device1 from IoT Core'); console.log('Waiting for commands from cloud...'); } catch (error) { console.error('Error starting component:', error); process.exit(1); } } async onCommand(message) { try { const payload = message.message.payload.toString('utf-8'); const data = JSON.parse(payload); const command = data.command; const deviceId = data.device_id || 'device1'; console.log(`Received command from cloud: ${command}`); if (command === 'get_status') { const status = this.getDeviceStatus(); const telemetryData = { device_id: deviceId, status: status, timestamp: Date.now() / 1000 }; const telemetryJson = JSON.stringify(telemetryData); const publishRequest = { topicName: `telemetry/${deviceId}`, qos: 1, payload: Buffer.from(telemetryJson, 'utf-8') }; await this.ipcClient.publishToIoTCore(publishRequest); console.log(`Telemetry sent to cloud: ${telemetryJson}`); } } catch (error) { console.error('Error processing command:', error); } } getDeviceStatus() { return 'online'; } } // Start the component. const controller = new DeviceController(); controller.start();
C
#include <gg/buffer.h> #include <gg/error.h> #include <gg/ipc/client.h> #include <gg/map.h> #include <gg/object.h> #include <gg/sdk.h> #include <unistd.h> #include <stdbool.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <time.h> #include <pthread.h> #define COMMAND_TOPIC "commands/device1" #define TELEMETRY_TOPIC "telemetry/device1" typedef struct { char device_id[64]; char command[64]; } CommandData; static pthread_mutex_t command_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t command_cond = PTHREAD_COND_INITIALIZER; static CommandData pending_command; static bool has_pending_command = false; const char* get_device_status(void) { // Simulate getting device status. return "online"; } static void *telemetry_publisher_thread(void *arg) { (void) arg; while (true) { pthread_mutex_lock(&command_mutex); while (!has_pending_command) { pthread_cond_wait(&command_cond, &command_mutex); } CommandData cmd = pending_command; has_pending_command = false; pthread_mutex_unlock(&command_mutex); // Process command. if (strcmp(cmd.command, "get_status") == 0) { const char *status = get_device_status(); // Create telemetry JSON string. char telemetry_json[512]; snprintf(telemetry_json, sizeof(telemetry_json), "{\"device_id\":\"%s\",\"status\":\"%s\",\"timestamp\":%ld}", cmd.device_id, status, time(NULL)); GgBuffer telemetry_buf = { .data = (uint8_t *)telemetry_json, .len = strlen(telemetry_json) }; // Publish telemetry to IoT Core. GgError ret = ggipc_publish_to_iot_core(GG_STR(TELEMETRY_TOPIC), telemetry_buf, 0); if (ret != GG_ERR_OK) { fprintf(stderr, "Failed to publish telemetry to IoT Core\n"); } else { printf("Telemetry sent to cloud: device_id=%s, status=%s\n", cmd.device_id, status); } } } return NULL; } static void on_cloud_command( void *ctx, GgBuffer topic, GgBuffer payload, GgIpcSubscriptionHandle handle ) { (void) ctx; (void) topic; (void) handle; printf("Received command from IoT Core: %.*s\n", (int)payload.len, payload.data); // Parse JSON payload (comes as raw buffer from IoT Core). // For simplicity, we'll do basic string parsing. // Extract command and device_id from JSON string. char payload_str[512]; snprintf(payload_str, sizeof(payload_str), "%.*s", (int)payload.len, payload.data); // Simple JSON parsing (looking for "command":"get_status"). char *command_start = strstr(payload_str, "\"command\""); char *device_id_start = strstr(payload_str, "\"device_id\""); if (command_start) { pthread_mutex_lock(&command_mutex); char *cmd_value = strstr(command_start, ":"); if (cmd_value) { cmd_value = strchr(cmd_value, '"'); if (cmd_value) { cmd_value++; char *cmd_end = strchr(cmd_value, '"'); if (cmd_end) { size_t cmd_len = cmd_end - cmd_value; if (cmd_len < sizeof(pending_command.command)) { strncpy(pending_command.command, cmd_value, cmd_len); pending_command.command[cmd_len] = '\0'; } } } } // Extract device_id or use default. if (device_id_start) { char *dev_value = strstr(device_id_start, ":"); if (dev_value) { dev_value = strchr(dev_value, '"'); if (dev_value) { dev_value++; char *dev_end = strchr(dev_value, '"'); if (dev_end) { size_t dev_len = dev_end - dev_value; if (dev_len < sizeof(pending_command.device_id)) { strncpy(pending_command.device_id, dev_value, dev_len); pending_command.device_id[dev_len] = '\0'; } } } } } else { strcpy(pending_command.device_id, "device1"); } printf("Received command from cloud: %s\n", pending_command.command); has_pending_command = true; pthread_cond_signal(&command_cond); pthread_mutex_unlock(&command_mutex); } } int main(void) { setvbuf(stdout, NULL, _IONBF, 0); printf("Device Controller component starting...\n"); gg_sdk_init(); GgError ret = ggipc_connect(); if (ret != GG_ERR_OK) { fprintf(stderr, "Failed to connect to Greengrass nucleus\n"); exit(1); } printf("Connected to Greengrass IPC\n"); // Start telemetry publisher thread. pthread_t telemetry_thread; if (pthread_create(&telemetry_thread, NULL, telemetry_publisher_thread, NULL) != 0) { fprintf(stderr, "Failed to create telemetry publisher thread\n"); exit(1); } // Subscribe to commands from IoT Core. GgIpcSubscriptionHandle handle; ret = ggipc_subscribe_to_iot_core( GG_STR(COMMAND_TOPIC), 0, &on_cloud_command, NULL, &handle ); if (ret != GG_ERR_OK) { fprintf(stderr, "Failed to subscribe to IoT Core topic\n"); exit(1); } printf("Subscribed to %s\n", COMMAND_TOPIC); printf("Waiting for commands from IoT Core...\n"); while (true) { sleep(1); } return 0; }
C++
#include <gg/ipc/client.hpp> #include <gg/buffer.hpp> #include <gg/object.hpp> #include <gg/types.hpp> #include <chrono> #include <condition_variable> #include <ctime> #include <iostream> #include <mutex> #include <string> #include <string_view> #include <thread> constexpr std::string_view COMMAND_TOPIC = "commands/device1"; constexpr std::string_view TELEMETRY_TOPIC = "telemetry/device1"; struct CommandData { std::string device_id; std::string command; }; static std::mutex command_mutex; static std::condition_variable command_cv; static CommandData pending_command; static bool has_pending_command = false; std::string get_device_status() { // Simulate getting device status. return "online"; } void telemetry_publisher_thread() { auto& client = gg::ipc::Client::get(); while (true) { std::unique_lock<std::mutex> lock(command_mutex); command_cv.wait(lock, [] { return has_pending_command; }); CommandData cmd = pending_command; has_pending_command = false; lock.unlock(); // Process command. if (cmd.command == "get_status") { std::string status = get_device_status(); // Get current timestamp. auto now = std::time(nullptr); // Create telemetry JSON payload. std::string telemetry_payload = "{\"device_id\":\"" + cmd.device_id + "\",\"status\":\"" + status + "\",\"timestamp\":" + std::to_string(now) + "}"; // Publish telemetry to IoT Core. gg::Buffer telemetry_buffer(telemetry_payload); auto error = client.publish_to_iot_core(TELEMETRY_TOPIC, telemetry_buffer); if (error) { std::cerr << "Failed to publish telemetry to IoT Core: " << error.message() << std::endl; } else { std::cout << "Telemetry sent to cloud: device_id=" << cmd.device_id << ", status=" << status << std::endl; } } } } class CloudCommandCallback : public gg::ipc::IoTCoreTopicCallback { void operator()( std::string_view topic, gg::Object payload, gg::ipc::Subscription& handle ) override { (void) topic; (void) handle; // Payload is a Buffer containing JSON string from IoT Core. if (payload.index() != GG_TYPE_BUF) { std::cerr << "Expected Buffer message\n"; return; } // Extract buffer. auto buffer = gg::get<std::span<uint8_t>>(payload); std::string json_str(reinterpret_cast<const char*>(buffer.data()), buffer.size()); std::cout << "Received command from IoT Core: " << json_str << std::endl; // Simple JSON parsing for demo. std::string command; std::string device_id = "device1"; // Default // Extract command. size_t cmd_pos = json_str.find("\"command\":"); if (cmd_pos != std::string::npos) { size_t start = json_str.find("\"", cmd_pos + 10) + 1; size_t end = json_str.find("\"", start); if (end != std::string::npos) { command = json_str.substr(start, end - start); } } // Extract device_id if present. size_t dev_pos = json_str.find("\"device_id\":"); if (dev_pos != std::string::npos) { size_t start = json_str.find("\"", dev_pos + 12) + 1; size_t end = json_str.find("\"", start); if (end != std::string::npos) { device_id = json_str.substr(start, end - start); } } if (!command.empty()) { std::lock_guard<std::mutex> lock(command_mutex); pending_command = {device_id, command}; has_pending_command = true; command_cv.notify_one(); std::cout << "Received command from cloud: " << command << std::endl; } } }; int main() { // Disable stdout buffering for real-time logging in systemd/Greengrass. std::cout.setf(std::ios::unitbuf); std::cout << "Device Controller component starting..." << std::endl; auto& client = gg::ipc::Client::get(); auto error = client.connect(); if (error) { std::cerr << "Failed to connect to Greengrass nucleus: " << error.message() << std::endl; return 1; } std::cout << "Connected to Greengrass IPC" << std::endl; // Start telemetry publisher thread. std::thread telemetry_thread(telemetry_publisher_thread); telemetry_thread.detach(); // Subscribe to commands from IoT Core. static CloudCommandCallback handler; error = client.subscribe_to_iot_core(COMMAND_TOPIC, handler); if (error) { std::cerr << "Failed to subscribe to IoT Core topic: " << error.message() << std::endl; return 1; } std::cout << "Subscribed to " << COMMAND_TOPIC << std::endl; std::cout << "Waiting for commands from IoT Core..." << std::endl; // Keep running. while (true) { using namespace std::chrono_literals; std::this_thread::sleep_for(1s); } return 0; }
2. 建置和封裝元件

有些語言需要在部署之前建置或封裝。

Python

Python 不需要編譯。元件可以直接使用 .py 檔案。

Java

若要建置綁定所有相依性的可執行 JAR:

  1. 在專案目錄中建立 pom.xml 檔案:

    <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <!-- Basic project information: organization, component name, and version --> <groupId>com.example</groupId> <artifactId>device-controller</artifactId> <version>1.0.0</version> <properties> <!-- Java 11 LTS (Long Term Support) is recommended for Greengrass v2 components --> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> <dependencies> <!-- AWS IoT Device SDK for Java - provides IPC client for Greengrass v2 cloud communication --> <dependency> <groupId>software.amazon.awssdk.iotdevicesdk</groupId> <artifactId>aws-iot-device-sdk</artifactId> <version>1.25.1</version> </dependency> </dependencies> <build> <plugins> <!-- Maven Shade Plugin - creates a standalone JAR with all dependencies included for Greengrass deployment --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <!-- Set the main class for the executable JAR --> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>DeviceController</mainClass> </transformer> </transformers> <filters> <!-- Exclude signature files to avoid security exceptions --> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
  2. 建置 JAR:

    mvn clean package

    這會target/device-controller-1.0.0.jar建立包含所有相依性的 。

  3. 將 JAR 上傳至 S3 儲存貯體以進行部署。

JavaScript

若要使用所有相依性封裝 Node.js 元件:

  1. 建立package.json檔案:

    { "name": "device-controller", "version": "1.0.0", "description": "Device controller component for Greengrass v2", "main": "DeviceController.js", "dependencies": { "aws-iot-device-sdk-v2": "^1.21.0" }, "engines": { "node": ">=14.0.0" } }
  2. 在開發機器上安裝相依性:

    npm install

    這會建立包含 AWS AWS IoT Device SDK v2 的node_modules資料夾。

  3. 用於部署的套件:

    zip -r DeviceController.zip DeviceController.js node_modules/ package.json
  4. 將 zip 檔案上傳至 S3 儲存貯體以進行部署。

注意

Greengrass 裝置必須安裝 Node.js 執行期 (14 版或更新版本)。您不需要npm install在 Greengrass 核心裝置上執行 ,因為元件成品包含綁定node_modules資料夾中的所有相依性。

C

先決條件:

若要建置 SDK 和 元件,您需要下列建置相依性:

  • GCC 或 Clang

  • CMake (至少 3.22 版)

  • Make 或 Ninja

安裝建置相依性:

在 Ubuntu/Debian 上:

sudo apt install build-essential cmake

在 Amazon Linux 上:

sudo yum install gcc cmake make

為您的元件建立 CMakeLists.txt 檔案:

cmake_minimum_required(VERSION 3.10) project(DeviceController C) set(CMAKE_C_STANDARD 11) # Add AWS Greengrass Component SDK add_subdirectory(aws-greengrass-component-sdk) # Build your component executable add_executable(device_controller device_controller.c) target_link_libraries(device_controller gg-sdk)

建置步驟:

# Clone the AWS Greengrass Component SDK into your project git clone https://github.com/aws-greengrass/aws-greengrass-component-sdk.git # Build your component cmake -B build -D CMAKE_BUILD_TYPE=MinSizeRel make -C build -j$(nproc) # The binary 'device_controller' is in ./build/ # Upload this binary to S3 for deployment
C++

先決條件:

若要建置 SDK 和 元件,您需要下列建置相依性:

  • 支援 C++20 的 GCC 或 Clang

  • CMake (至少 3.22 版)

  • Make 或 Ninja

安裝建置相依性:

在 Ubuntu/Debian 上:

sudo apt install build-essential cmake

在 Amazon Linux 上:

sudo yum install gcc-c++ cmake make

為您的元件建立 CMakeLists.txt 檔案:

cmake_minimum_required(VERSION 3.10) project(DeviceController CXX) set(CMAKE_CXX_STANDARD 20) # Add SDK as subdirectory add_subdirectory(aws-greengrass-component-sdk) # Add C++ SDK subdirectory add_subdirectory(aws-greengrass-component-sdk/cpp) add_executable(device_controller device_controller.cpp) target_link_libraries(device_controller gg-sdk++)

建置步驟:

# Clone the AWS Greengrass Component SDK into your project git clone https://github.com/aws-greengrass/aws-greengrass-component-sdk.git # Build your component cmake -B build -D CMAKE_BUILD_TYPE=MinSizeRel make -C build -j$(nproc) # The binary 'device_controller' will be in ./build/ # Upload this binary to S3 for deployment
3. 元件配方

使用您元件使用的實際主題來更新「資源」陣列。

Python
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.DeviceController", "ComponentVersion": "1.0.0", "ComponentType": "aws.greengrass.generic", "ComponentDescription": "Receives commands from IoT Core and sends telemetry back to cloud", "ComponentPublisher": "[Your Company]", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.DeviceController:mqttproxy:1": { "policyDescription": "Allows access to subscribe to IoT Core topics", "operations": [ "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "commands/device1" ] }, "com.example.DeviceController:mqttproxy:2": { "policyDescription": "Allows access to publish to IoT Core topics", "operations": [ "aws.greengrass#PublishToIoTCore" ], "resources": [ "telemetry/device1" ] } } } } }, "ComponentDependencies": { "aws.greengrass.TokenExchangeService": { "VersionRequirement": ">=2.0.0", "DependencyType": "HARD" } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "python3 -u {artifacts:path}/device_controller.py" }, "Artifacts": [ { "Uri": "s3://YOUR-BUCKET/artifacts/com.example.DeviceController/1.0.0/device_controller.py" } ] } ] }
Java
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.DeviceController", "ComponentVersion": "1.0.0", "ComponentType": "aws.greengrass.generic", "ComponentDescription": "Receives commands from IoT Core and sends telemetry back to cloud", "ComponentPublisher": "[Your Company]", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.DeviceController:mqttproxy:1": { "policyDescription": "Allows access to subscribe to IoT Core topics", "operations": [ "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "commands/device1" ] }, "com.example.DeviceController:mqttproxy:2": { "policyDescription": "Allows access to publish to IoT Core topics", "operations": [ "aws.greengrass#PublishToIoTCore" ], "resources": [ "telemetry/device1" ] } } } } }, "ComponentDependencies": { "aws.greengrass.TokenExchangeService": { "VersionRequirement": ">=2.0.0", "DependencyType": "HARD" } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "java -jar {artifacts:path}/DeviceController.jar" }, "Artifacts": [ { "Uri": "s3://YOUR-BUCKET/artifacts/com.example.DeviceController/1.0.0/DeviceController.jar" } ] } ] }
JavaScript
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.DeviceController", "ComponentVersion": "1.0.0", "ComponentType": "aws.greengrass.generic", "ComponentDescription": "Receives commands from IoT Core and sends telemetry back to cloud", "ComponentPublisher": "[Your Company]", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.DeviceController:mqttproxy:1": { "policyDescription": "Allows access to subscribe to command topics from IoT Core", "operations": [ "aws.greengrass#SubscribeToIoTCore" ], "resources": [ "commands/device1" ] }, "com.example.DeviceController:mqttproxy:2": { "policyDescription": "Allows access to publish telemetry to IoT Core", "operations": [ "aws.greengrass#PublishToIoTCore" ], "resources": [ "telemetry/*" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "cd {artifacts:decompressedPath}/DeviceController && node DeviceController.js" }, "Artifacts": [ { "Uri": "s3://YOUR-BUCKET/artifacts/com.example.DeviceController/1.0.0/DeviceController.zip", "Unarchive": "ZIP" } ] } ] }
C/C++
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.DeviceController", "ComponentVersion": "1.0.0", "ComponentType": "aws.greengrass.generic", "ComponentDescription": "Receives commands from IoT Core and sends telemetry back to cloud", "ComponentPublisher": "[Your Company]", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.DeviceController:mqttproxy:1": { "policyDescription": "Allows access to subscribe to IoT Core topics", "operations": ["aws.greengrass#SubscribeToIoTCore"], "resources": ["commands/device1"] }, "com.example.DeviceController:mqttproxy:2": { "policyDescription": "Allows access to publish to IoT Core topics", "operations": ["aws.greengrass#PublishToIoTCore"], "resources": ["telemetry/device1"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/device_controller" }, "Artifacts": [ { "Uri": "s3://YOUR-BUCKET/artifacts/com.example.DeviceController/1.0.0/device_controller" } ] } ] }