本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
發佈/訂閱 AWS IoT Core MQTT 訊息
AWS IoT Core MQTT 訊息 IPC 服務可讓您在 之間傳送和接收 MQTT 訊息 AWS IoT Core。元件可以將訊息發佈到 AWS IoT Core 並訂閱主題,以對來自其他來源的 MQTT 訊息採取行動。如需 MQTT AWS IoT Core 實作的詳細資訊,請參閱《 AWS IoT Core 開發人員指南》中的 MQTT。
注意
此 MQTT 訊息 IPC 服務可讓您與 交換訊息 AWS IoT Core。如需如何在元件之間交換訊息的詳細資訊,請參閱 發佈/訂閱本機訊息。
最低 SDK 版本
下表列出 AWS IoT Device SDK 您必須用來發佈和訂閱 MQTT 訊息的最小版本 AWS IoT Core。
| SDK | 最低版本 |
|---|---|
|
v1.2.10 |
|
|
1.5.3 版 |
|
|
1.17.0 版 |
|
|
v1.12.0 |
Authorization
若要在自訂元件中使用 AWS IoT Core MQTT 訊息,您必須定義授權政策,以允許您的元件傳送和接收主題的訊息。如需定義授權政策的資訊,請參閱 授權元件執行 IPC 操作。
AWS IoT Core MQTT 訊息的授權政策具有下列屬性。
IPC 服務識別符: aws.greengrass.ipc.mqttproxy
| 作業 | Description | Resources |
|---|---|---|
|
|
允許元件在您指定的 MQTT 主題 AWS IoT Core 上發佈訊息至 。 |
主題字串,例如 |
|
|
允許元件在您指定的主題 AWS IoT Core 上訂閱來自 的訊息。 |
主題字串,例如 |
|
|
允許元件發佈和訂閱您指定主題的 AWS IoT Core MQTT 訊息。 |
主題字串,例如 |
MQTT 授權政策中的 AWS IoT Core MQTT 萬用字元
您可以在 MQTT IPC 授權政策中使用 AWS IoT Core MQTT 萬用字元。元件可以發佈和訂閱與您在授權政策中允許的主題篩選條件相符的主題。例如,如果元件的授權政策授予 的存取權test/topic/#,則該元件可以訂閱 test/topic/#,而且可以發佈和訂閱 test/topic/filter。
AWS IoT Core MQTT 授權政策中的配方變數
如果您使用 Greengrass 核的 v2.6.0 或更新版本,您可以在授權政策中使用{iot:thingName}配方變數。此功能可讓您為一組核心裝置設定單一授權政策,其中每個核心裝置只能存取包含自己的名稱的主題。例如,您可以允許元件存取下列主題資源。
devices/{iot:thingName}/messages
如需詳細資訊,請參閱配方變數及在合併更新中使用配方變數。
授權政策範例
您可以參考下列授權政策範例,協助您設定元件的授權政策。
範例具有無限制存取的授權政策範例
下列範例授權政策允許元件發佈和訂閱所有主題。
範例具有有限存取權的授權政策範例
下列範例授權政策允許元件發佈和訂閱兩個名為 factory/1/events和 的主題factory/1/actions。
範例一組核心裝置的範例授權政策
重要
此範例使用適用於 Greengrass 核元件 v2.6.0 和更新版本的功能。Greengrass 核 v2.6.0 在元件組態中新增了對大多數配方變數的支援{iot:thingName},例如 。
下列範例授權政策允許元件發佈和訂閱主題,其中包含執行元件的核心裝置名稱。
PublishToIoTCore
在主題 AWS IoT Core 上將 MQTT 訊息發佈至 。
當您發佈 MQTT 訊息到 時 AWS IoT Core,每秒有 100 筆交易的配額。如果您超過此配額,訊息會排入佇列,以便在 Greengrass 裝置上處理。也有每秒 512 Kb 的資料配額,以及每秒 20,000 個發佈的全帳戶配額 (部分為 2,000 AWS 區域)。如需 中 MQTT 訊息中介裝置限制的詳細資訊 AWS IoT Core,請參閱AWS IoT Core 訊息中介裝置和通訊協定限制和配額。
如果您超過這些配額,Greengrass 裝置會將訊息發佈限制為 AWS IoT Core。訊息會存放在記憶體中的多工緩衝處理程式中。根據預設,配置給多工緩衝處理站的記憶體為 2.5 Mb。如果多工緩衝處理程式填滿,則會拒絕新訊息。您可以增加多工緩衝處理的大小。如需詳細資訊,請參閱 Greengrass 核 文件中的 Configuration。為了避免填入多工緩衝處理程式並需要增加配置的記憶體,請將發佈請求限制為每秒不超過 100 個請求。
當您的應用程式需要以更高的速率或更大的訊息傳送訊息時,請考慮使用 串流管理員 傳送訊息至 Kinesis Data Streams。串流管理員元件旨在將大量資料傳輸至 AWS 雲端。如需詳細資訊,請參閱管理 Greengrass 核心裝置上的資料串流。
請求
此操作的請求具有下列參數:
topicName(Python:topic_name)-
要發佈訊息的主題。
qos-
要使用的 MQTT QoS。此列舉
QOS具有下列值:-
AT_MOST_ONCE– QoS 0。MQTT 訊息最多傳遞一次。 -
AT_LEAST_ONCE– QoS 1。MQTT 訊息至少傳遞一次。
-
payload-
(選用) 作為 Blob 的訊息承載。
使用 MQTT 5 Greengrass 核時,下列功能適用於 2.10.0 版和更新版本。當您使用 MQTT 3.1.1 時,會忽略這些功能。下表列出您必須用來存取這些功能的 AWS IoT 裝置 SDK 最低版本。
| SDK | 最低版本 |
|---|---|
| 適用於 Python 的 AWS IoT Device SDK |
1.15.0 版 |
| 適用於 JAVA 的 AWS IoT Device SDK |
1.13.0 版 |
| 適用於 C++ 的 AWS IoT Device SDK |
1.24.0 版 |
| 適用於 JavaScript 的 AWS IoT Device SDK |
1.13.0 版 |
payloadFormat-
(選用) 訊息承載的格式。如果您未設定
payloadFormat,則會假設類型為BYTES。列舉具有下列值:-
BYTES– 承載的內容是二進位 Blob。 -
UTF8– 承載的內容是 UTF8 字元字串。
-
retain-
(選用) 指出是否要在發佈
true時將 MQTT 保留選項設定為 。 userProperties-
(選用) 要傳送的應用程式特定
UserProperty物件清單。UserProperty物件的定義如下:UserProperty: key: string value: string messageExpiryIntervalSeconds-
(選用) 訊息過期並由伺服器刪除之前的秒數。如果未設定此值,則訊息不會過期。
correlationData-
(選用) 新增至請求的資訊,可用於將請求與回應建立關聯。
responseTopic-
(選用) 應該用於回應訊息的主題。
contentType-
(選用) 訊息內容類型的應用程式特定識別符。
回應
此操作不會在其回應中提供任何資訊。
範例
下列範例示範如何在自訂元件程式碼中呼叫此操作。
SubscribeToIoTCore
在主題或主題篩選條件 AWS IoT Core 上,從 訂閱 MQTT 訊息。 AWS IoT Greengrass 核心軟體會在元件生命週期結束時移除訂閱。
此操作是您訂閱事件訊息串流的訂閱操作。若要使用此操作,請使用處理事件訊息、錯誤和串流關閉的函數來定義串流回應處理常式。如需詳細資訊,請參閱訂閱 IPC 事件串流。
事件訊息類型: IoTCoreMessage
請求
此操作的請求具有下列參數:
topicName(Python:topic_name)-
要訂閱的主題。您可以使用 MQTT 主題萬用字元 (
#和+) 來訂閱多個主題。 qos-
要使用的 MQTT QoS。此列舉
QOS具有下列值:-
AT_MOST_ONCE– QoS 0。MQTT 訊息最多傳遞一次。 -
AT_LEAST_ONCE– QoS 1。MQTT 訊息至少傳遞一次。
-
回應
此操作的回應包含下列資訊:
messages-
MQTT 訊息的串流。此物件
IoTCoreMessage包含下列資訊:message-
MQTT 訊息。此物件
MQTTMessage包含下列資訊:topicName(Python:topic_name)-
發佈訊息的主題。
payload-
(選用) 作為 Blob 的訊息承載。
使用 MQTT 5 Greengrass 核時,下列功能適用於 2.10.0 版和更新版本。當您使用 MQTT 3.1.1 時,會忽略這些功能。下表列出您必須用來存取這些功能的 AWS IoT 裝置 SDK 最低版本。
SDK 最低版本 適用於 Python 的 AWS IoT Device SDK v2 1.15.0 版 適用於 JAVA 的 AWS IoT Device SDK v2 1.13.0 版 適用於 C++ 的 AWS IoT Device SDK v2 1.24.0 版 適用於 JavaScript 的 AWS IoT Device SDK v2 1.13.0 版 payloadFormat-
(選用) 訊息承載的格式。如果您未設定
payloadFormat,則會假設類型為BYTES。列舉具有下列值:-
BYTES– 承載的內容是二進位 Blob。 -
UTF8– 承載的內容是 UTF8 字元字串。
-
retain-
(選用) 指出是否要在發佈
true時將 MQTT 保留選項設定為 。 userProperties-
(選用) 要傳送的應用程式特定
UserProperty物件清單。UserProperty物件的定義如下:UserProperty: key: string value: string messageExpiryIntervalSeconds-
(選用) 訊息過期並由伺服器刪除之前的秒數。如果未設定此值,則訊息不會過期。
correlationData-
(選用) 新增至請求的資訊,可用於將請求與回應建立關聯。
responseTopic-
(選用) 應該用於回應訊息的主題。
contentType-
(選用) 訊息內容類型的應用程式特定識別符。
範例
下列範例示範如何在自訂元件程式碼中呼叫此操作。
範例
使用下列範例來了解如何在元件中使用 AWS IoT Core MQTT IPC 服務。
下列範例配方允許 元件發佈至所有主題。
下列 C++ 應用程式範例示範如何使用 AWS IoT Core MQTT IPC 服務來發佈訊息 AWS IoT Core。
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String message("Hello from the Greengrass IPC MQTT publisher (C++)."); String topic("test/topic/cpp"); QOS qos = QOS_AT_LEAST_ONCE; int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } while (true) { PublishToIoTCoreRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); request.SetTopicName(topic); request.SetPayload(messageData); request.SetQos(qos); auto operation = ipcClient.NewPublishToIoTCore(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } std::this_thread::sleep_for(std::chrono::seconds(5)); } return 0; }
下列範例配方允許 元件訂閱所有主題。
下列 C++ 應用程式範例示範如何使用 AWS IoT Core MQTT IPC 服務來訂閱訊息 AWS IoT Core。
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler { public: virtual ~IoTCoreResponseHandler() {} private: void OnStreamEvent(IoTCoreMessage *response) override { auto message = response->GetMessage(); if (message.has_value() && message.value().GetPayload().has_value()) { auto messageBytes = message.value().GetPayload().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::string messageTopic = message.value().GetTopicName().value().c_str(); std::cout << "Received new message on topic: " << messageTopic << std::endl; std::cout << "Message: " << messageString << std::endl; } } bool OnStreamError(OperationError *error) override { std::cout << "Received an operation error: "; if (error->GetMessage().has_value()) { std::cout << error->GetMessage().value(); } std::cout << std::endl; return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { std::cout << "Subscribe to IoT Core stream closed." << std::endl; } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String topic("test/topic/cpp"); QOS qos = QOS_AT_LEAST_ONCE; int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } SubscribeToIoTCoreRequest request; request.SetTopicName(topic); request.SetQos(qos); auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully subscribed to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to subscribe to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
下列範例配方允許 元件發佈至所有主題。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCorePublisherRust", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes MQTT messages to IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCorePublisherRust:mqttproxy:1": { "policyDescription": "Allows access to publish to all topics.", "operations": ["aws.greengrass#PublishToIoTCore"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/publish_to_iot_core" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCorePublisherRust/1.0.0/publish_to_iot_core", "Permission": { "Execute": "OWNER" } } ] } ] }
下列範例 Rust 應用程式示範如何使用 AWS IoT Core MQTT IPC 服務來發佈訊息 AWS IoT Core。
use gg_sdk::{Qos, Sdk}; fn main() { let sdk = Sdk::init(); sdk.connect().expect("Failed to establish IPC connection"); let message = b"Hello, World"; let topic = "my/topic"; let qos = Qos::AtLeastOnce; sdk.publish_to_iot_core(topic, message, qos) .expect("Failed to publish to topic"); println!("Successfully published to topic: {topic}"); }
下列範例配方允許 元件訂閱所有主題。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCoreSubscriberRust", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to MQTT messages from IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCoreSubscriberRust:mqttproxy:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": ["aws.greengrass#SubscribeToIoTCore"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/subscribe_to_iot_core" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCoreSubscriberRust/1.0.0/subscribe_to_iot_core", "Permission": { "Execute": "OWNER" } } ] } ] }
下列範例 Rust 應用程式示範如何使用 AWS IoT Core MQTT IPC 服務來訂閱訊息 AWS IoT Core。
use gg_sdk::{Qos, Sdk}; use std::{thread, time::Duration}; fn main() { let sdk = Sdk::init(); sdk.connect().expect("Failed to establish IPC connection"); let topic = "my/topic"; let qos = Qos::AtLeastOnce; let callback = |topic: &str, payload: &[u8]| { let message = String::from_utf8_lossy(payload); println!("Received new message on topic {topic}: {message}"); }; let _sub = sdk .subscribe_to_iot_core(topic, qos, &callback) .expect("Failed to subscribe to topic"); println!("Successfully subscribed to topic: {topic}"); // Keep the main thread alive, or the process will exit. loop { thread::sleep(Duration::from_secs(10)); } }
下列範例配方允許 元件發佈至所有主題。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCorePublisherC", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes MQTT messages to IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCorePublisherC:mqttproxy:1": { "policyDescription": "Allows access to publish to all topics.", "operations": ["aws.greengrass#PublishToIoTCore"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/sample_publish_to_iot_core" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCorePublisherC/1.0.0/sample_publish_to_iot_core", "Permission": { "Execute": "OWNER" } } ] } ] }
下列範例 C 應用程式示範如何使用 AWS IoT Core MQTT IPC 服務來發佈訊息 AWS IoT Core。
#include <gg/error.h> #include <gg/ipc/client.h> #include <gg/sdk.h> #include <stdio.h> #include <stdlib.h> int main(void) { gg_sdk_init(); GgError err = ggipc_connect(); if (err != GG_ERR_OK) { fprintf(stderr, "Failed to establish IPC connection.\n"); exit(-1); } GgBuffer message = GG_STR("Hello, World"); GgBuffer topic = GG_STR("my/topic"); uint8_t qos = 1; err = ggipc_publish_to_iot_core(topic, message, qos); if (err != GG_ERR_OK) { fprintf( stderr, "Failed to publish to topic: %.*s\n", (int) topic.len, topic.data ); exit(-1); } printf( "Successfully published to topic: %.*s\n", (int) topic.len, topic.data ); }
下列範例配方允許 元件訂閱所有主題。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCoreSubscriberC", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to MQTT messages from IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCoreSubscriberC:mqttproxy:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": ["aws.greengrass#SubscribeToIoTCore"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/sample_subscribe_to_iot_core" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCoreSubscriberC/1.0.0/sample_subscribe_to_iot_core", "Permission": { "Execute": "OWNER" } } ] } ] }
下列範例 C 應用程式示範如何使用 AWS IoT Core MQTT IPC 服務來訂閱訊息 AWS IoT Core。
#include <gg/error.h> #include <gg/ipc/client.h> #include <gg/sdk.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> static void on_subscription_response( void *ctx, GgBuffer topic, GgBuffer payload, GgIpcSubscriptionHandle handle ) { (void) ctx; (void) handle; printf( "Received new message on topic %.*s: %.*s\n", (int) topic.len, topic.data, (int) payload.len, payload.data ); } int main(void) { gg_sdk_init(); GgError err = ggipc_connect(); if (err != GG_ERR_OK) { fprintf(stderr, "Failed to establish IPC connection.\n"); exit(-1); } GgBuffer topic = GG_STR("my/topic"); uint8_t qos = 1; GgIpcSubscriptionHandle handle; err = ggipc_subscribe_to_iot_core( topic, qos, on_subscription_response, NULL, &handle ); if (err != GG_ERR_OK) { fprintf( stderr, "Failed to subscribe to topic: %.*s\n", (int) topic.len, topic.data ); exit(-1); } printf( "Successfully subscribed to topic: %.*s\n", (int) topic.len, topic.data ); // Keep the main thread alive, or the process will exit. while (1) { sleep(10); } // To stop subscribing, close the subscription handle. ggipc_close_subscription(handle); }
下列範例配方允許 元件發佈至所有主題。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCorePublisherCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes MQTT messages to IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCorePublisherCpp:mqttproxy:1": { "policyDescription": "Allows access to publish to all topics.", "operations": ["aws.greengrass#PublishToIoTCore"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/sample_cpp_publish_to_iot_core" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCorePublisherCpp/1.0.0/sample_cpp_publish_to_iot_core", "Permission": { "Execute": "OWNER" } } ] } ] }
下列 C++ 應用程式範例示範如何使用 AWS IoT Core MQTT IPC 服務來發佈訊息 AWS IoT Core。
#include <gg/ipc/client.hpp> #include <iostream> int main() { auto &client = gg::ipc::Client::get(); auto error = client.connect(); if (error) { std::cerr << "Failed to establish IPC connection.\n"; exit(-1); } std::string_view message = "Hello, World"; std::string_view topic = "my/topic"; uint8_t qos = 1; error = client.publish_to_iot_core(topic, message, qos); if (error) { std::cerr << "Failed to publish to topic: " << topic << "\n"; exit(-1); } std::cout << "Successfully published to topic: " << topic << "\n"; }
下列範例配方允許 元件訂閱所有主題。
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.IoTCoreSubscriberCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to MQTT messages from IoT Core.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.IoTCoreSubscriberCpp:mqttproxy:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": ["aws.greengrass#SubscribeToIoTCore"], "resources": ["*"] } } } } }, "Manifests": [ { "Platform": { "os": "linux", "runtime": "*" }, "Lifecycle": { "run": "{artifacts:path}/sample_cpp_subscribe_to_iot_core" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/sample_cpp_subscribe_to_iot_core", "Permission": { "Execute": "OWNER" } } ] } ] }
下列 C++ 應用程式範例示範如何使用 AWS IoT Core MQTT IPC 服務來訂閱訊息 AWS IoT Core。
#include <gg/ipc/client.hpp> #include <unistd.h> #include <iostream> class ResponseHandler : public gg::ipc::IotTopicCallback { void operator()( std::string_view topic, gg::Buffer payload, gg::ipc::Subscription &handle ) override { (void) handle; std::cout << "Received new message on topic " << topic << ": " << payload << "\n"; } }; int main() { auto &client = gg::ipc::Client::get(); auto error = client.connect(); if (error) { std::cerr << "Failed to establish IPC connection.\n"; exit(-1); } std::string_view topic = "my/topic"; uint8_t qos = 1; static ResponseHandler handler; error = client.subscribe_to_iot_core(topic, qos, handler); if (error) { std::cerr << "Failed to subscribe to topic: " << topic << "\n"; exit(-1); } std::cout << "Successfully subscribed to topic: " << topic << "\n"; // Keep the main thread alive, or the process will exit. while (1) { sleep(10); } }