로컬 메시지 게시/구독 - AWS IoT Greengrass

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

로컬 메시지 게시/구독

게시/구독(pubsub) 메시징을 사용하면 주제에 메시지를 보내고 주제에서 메시지를 받을 수 있습니다. 구성 요소가 주제에 메시지를 게시하여 다른 구성 요소에 메시지를 보낼 수 있습니다. 그러면 해당 주제를 구독하는 구성 요소가 수신하는 메시지에 대한 작업을 수행할 수 있습니다.

참고

이 게시/구독 IPC 서비스를 사용하여 AWS IoT Core MQTT를 게시하거나 구독할 수 없습니다. AWS IoT Core MQTT와 메시지를 교환하는 방법에 대한 자세한 내용은 AWS IoT Core MQTT 메시지 게시/구독 섹션을 참조하세요.

최소 SDK 버전

다음 표에는 로컬 주제에 대해 주고받는 메시지를 게시하고 구독하기 위해 사용해야 하는 AWS IoT Device SDK의 최소 버전이 나열되어 있습니다.

승인

사용자 지정 구성 요소에서 로컬 게시/구독 메시징을 사용하려면 구성 요소가 주제에 대한 메시지를 보내고 받을 수 있도록 허용하는 권한 부여 정책을 정의해야 합니다. 권한 부여 정책 정의에 대한 자세한 내용은 구성 요소에 IPC 작업을 수행할 수 있는 권한 부여 섹션을 참조하세요.

게시/구독 메시징에 대한 권한 부여 정책에는 다음 속성이 있습니다.

IPC 서비스 식별자: aws.greengrass.ipc.pubsub

작업 설명 리소스

aws.greengrass#PublishToTopic

구성 요소가 지정한 주제에 메시지를 게시할 수 있도록 허용합니다.

test/topic과 같은 주제 문자열입니다. 주제의 임의의 문자 조합과 일치시키려면 *를 사용합니다.

이 주제 문자열에서는 MQTT 주제 와일드카드(#+)를 지원하지 않습니다.

aws.greengrass#SubscribeToTopic

구성 요소가 지정하는 주제에 대한 메시지를 구독할 수 있도록 허용합니다.

test/topic과 같은 주제 문자열입니다. 주제의 임의의 문자 조합과 일치시키려면 *를 사용합니다.

Greengrass nucleus v2.6.0 이상에서는 MQTT 주제 와일드카드(#+)가 포함된 주제를 구독할 수 있습니다. 이 주제 문자열은 MQTT 주제 와일드카드를 리터럴 문자로 지원합니다. 예를 들어 구성 요소의 권한 부여 정책에서 test/topic/#에 대한 액세스 권한을 부여하는 경우 구성 요소는 test/topic/#는 구독할 수 있지만 test/topic/filter는 구독할 수 없습니다.

*

구성 요소가 지정하는 주제에 대한 메시지를 게시하고 구독할 수 있도록 허용합니다.

test/topic과 같은 주제 문자열입니다. 주제의 임의의 문자 조합과 일치시키려면 *를 사용합니다.

Greengrass nucleus v2.6.0 이상에서는 MQTT 주제 와일드카드(#+)가 포함된 주제를 구독할 수 있습니다. 이 주제 문자열은 MQTT 주제 와일드카드를 리터럴 문자로 지원합니다. 예를 들어 구성 요소의 권한 부여 정책에서 test/topic/#에 대한 액세스 권한을 부여하는 경우 구성 요소는 test/topic/#는 구독할 수 있지만 test/topic/filter는 구독할 수 없습니다.

권한 부여 정책 예제

다음 권한 부여 정책 예제를 참조하면 구성 요소의 권한 부여 정책을 구성하는 데 도움이 됩니다.

예 권한 부여 정책 예제

다음 권한 부여 정책 예제에서는 구성 요소가 모든 주제를 게시하고 구독할 수 있도록 허용합니다.

{ "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.MyLocalPubSubComponent:pubsub:1": { "policyDescription": "Allows access to publish/subscribe to all topics.", "operations": [ "aws.greengrass#PublishToTopic", "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } }

PublishToTopic

주제에 메시지를 게시합니다.

요청

이 작업의 요청에서는 다음 파라미터를 사용합니다.

topic

메시지를 게시할 주제입니다.

publishMessage/(Python:publish_message /)

게시할 메시지입니다. 이 객체 PublishMessage에는 다음 정보가 포함됩니다. jsonMessagebinaryMessage 중 하나를 지정해야 합니다.

jsonMessage/(Python:json_message /)

(선택 사항) JSON 메시지입니다. 이 객체 JsonMessage에는 다음 정보가 포함됩니다.

message

객체인 JSON 메시지입니다.

context

메시지가 게시된 주제와 같은 메시지의 컨텍스트입니다.

이 기능은 Greengrass nucleus 구성 요소의 v2.6.0 이상에서 사용할 수 있습니다. 다음 표에는 메시지 컨텍스트에 액세스하기 위해 사용해야 하는 AWS IoT Device SDK의 최소 버전이 나열되어 있습니다.

참고

AWS IoT Greengrass 코어 소프트웨어는 PublishToTopicSubscribeToTopic 작업에서 동일한 메시지 객체를 사용합니다. AWS IoT Greengrass 코어 소프트웨어는 구독할 때 메시지에서 이 컨텍스트 객체를 설정하고 게시하는 메시지에서는 이 컨텍스트 객체를 무시합니다.

이 객체 MessageContext에는 다음 정보가 포함됩니다.

topic

메시지가 게시된 주제입니다.

binaryMessage/(Python:binary_message /)

(선택 사항) 이진 메시지입니다. 이 객체 BinaryMessage에는 다음 정보가 포함됩니다.

message

Blob인 이진 메시지입니다.

context

메시지가 게시된 주제와 같은 메시지의 컨텍스트입니다.

이 기능은 Greengrass nucleus 구성 요소의 v2.6.0 이상에서 사용할 수 있습니다. 다음 표에는 메시지 컨텍스트에 액세스하기 위해 사용해야 하는 AWS IoT Device SDK의 최소 버전이 나열되어 있습니다.

참고

AWS IoT Greengrass 코어 소프트웨어는 PublishToTopicSubscribeToTopic 작업에서 동일한 메시지 객체를 사용합니다. AWS IoT Greengrass 코어 소프트웨어는 구독할 때 메시지에서 이 컨텍스트 객체를 설정하고 게시하는 메시지에서는 이 컨텍스트 객체를 무시합니다.

이 객체 MessageContext에는 다음 정보가 포함됩니다.

topic

메시지가 게시된 주제입니다.

응답

이 작업의 응답에는 어떠한 정보도 제공하지 않습니다.

예시

다음 예제에서는 사용자 지정 구성 요소 코드에서 이 작업을 직접 호출하는 방법을 보여줍니다.

Java (IPC client V2)
예: 이진 메시지 게시
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.model.BinaryMessage; import software.amazon.awssdk.aws.greengrass.model.PublishMessage; import software.amazon.awssdk.aws.greengrass.model.PublishToTopicRequest; import software.amazon.awssdk.aws.greengrass.model.PublishToTopicResponse; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import java.nio.charset.StandardCharsets; public class PublishToTopicV2 { public static void main(String[] args) { String topic = args[0]; String message = args[1]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { PublishToTopicV2.publishBinaryMessageToTopic(ipcClient, topic, message); System.out.println("Successfully published to topic: " + topic); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static PublishToTopicResponse publishBinaryMessageToTopic( GreengrassCoreIPCClientV2 ipcClient, String topic, String message) throws InterruptedException { BinaryMessage binaryMessage = new BinaryMessage().withMessage(message.getBytes(StandardCharsets.UTF_8)); PublishMessage publishMessage = new PublishMessage().withBinaryMessage(binaryMessage); PublishToTopicRequest publishToTopicRequest = new PublishToTopicRequest().withTopic(topic).withPublishMessage(publishMessage); return ipcClient.publishToTopic(publishToTopicRequest); } }
Python (IPC client V2)
예: 이진 메시지 게시
import sys import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( PublishMessage, BinaryMessage ) def main(): args = sys.argv[1:] topic = args[0] message = args[1] try: ipc_client = GreengrassCoreIPCClientV2() publish_binary_message_to_topic(ipc_client, topic, message) print('Successfully published to topic: ' + topic) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def publish_binary_message_to_topic(ipc_client, topic, message): binary_message = BinaryMessage(message=bytes(message, 'utf-8')) publish_message = PublishMessage(binary_message=binary_message) return ipc_client.publish_to_topic(topic=topic, publish_message=publish_message) if __name__ == '__main__': main()
C++
예: 이진 메시지 게시
#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); String message("Hello, World!"); int timeout = 10; PublishToTopicRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); BinaryMessage binaryMessage; binaryMessage.SetMessage(messageData); PublishMessage publishMessage; publishMessage.SetBinaryMessage(binaryMessage); request.SetTopic(topic); request.SetPublishMessage(publishMessage); auto operation = ipcClient.NewPublishToTopic(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } } return 0; }
JavaScript
예: 이진 메시지 게시
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {BinaryMessage, PublishMessage, PublishToTopicRequest} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; class PublishToTopic { private ipcClient : greengrasscoreipc.Client private readonly topic : string; private readonly messageString : string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.messageString = "<define_your_message_string>"; this.publishToTopic().then(r => console.log("Started workflow")); } private async publishToTopic() { try { this.ipcClient = await getIpcClient(); const binaryMessage : BinaryMessage = { message: this.messageString } const publishMessage : PublishMessage = { binaryMessage: binaryMessage } const request : PublishToTopicRequest = { topic: this.topic, publishMessage: publishMessage } this.ipcClient.publishToTopic(request).finally(() => console.log(`Published message ${publishMessage.binaryMessage?.message} to topic`)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const publishToTopic = new PublishToTopic();

SubscribeToTopic

주제에 대한 메시지를 구독합니다.

이 작업은 이벤트 메시지 스트림을 구독하는 구독 작업입니다. 이 작업을 사용하려면 이벤트 메시지, 오류 및 스트림 종료를 처리하는 함수를 사용하여 스트림 응답 핸들러를 정의합니다. 자세한 내용은 IPC 이벤트 스트림 구독 섹션을 참조하세요.

이벤트 메시지 유형: SubscriptionResponseMessage

요청

이 작업의 요청에서는 다음 파라미터를 사용합니다.

topic

구독할 주제입니다.

참고

Greengrass nucleus v2.6.0 이상에서 이 주제는 MQTT 주제 와일드카드(#+)를 지원합니다.

receiveMode/(Python:receive_mode /)

(선택 사항) 구성 요소가 자체에서 메시지를 수신하는지 여부를 지정하는 동작입니다. 이 동작을 변경하여 구성 요소가 자체 메시지에 대해 조치하도록 허용할 수 있습니다. 기본 동작은 주제에 MQTT 와일드카드가 포함되어 있는지 여부에 따라 달라집니다. 다음 옵션 중 하나를 선택합니다.

  • RECEIVE_ALL_MESSAGES - 구독하는 구성 요소의 메시지를 포함하여 주제와 일치하는 모든 메시지를 수신합니다.

    이 모드는 MQTT 와일드카드가 포함되지 않은 주제를 구독할 때 기본 옵션입니다.

  • RECEIVE_MESSAGES_FROM_OTHERS - 구독하는 구성 요소의 메시지를 제외하고 주제와 일치하는 모든 메시지를 수신합니다.

    이 모드는 MQTT 와일드카드가 포함된 주제를 구독할 때 기본 옵션입니다.

이 기능은 Greengrass nucleus 구성 요소의 v2.6.0 이상에서 사용할 수 있습니다. 다음 표에는 수신 모드를 설정하기 위해 사용해야 하는 AWS IoT Device SDK의 최소 버전이 나열되어 있습니다.

응답

이 작업의 응답에는 다음 정보가 포함됩니다.

messages

메시지 스트림입니다. 이 객체 SubscriptionResponseMessage에는 다음 정보가 포함됩니다. 각 메시지에는 jsonMessage 또는 binaryMessage가 포함됩니다.

jsonMessage/(Python:json_message /)

(선택 사항) JSON 메시지입니다. 이 객체 JsonMessage에는 다음 정보가 포함됩니다.

message

객체인 JSON 메시지입니다.

context

메시지가 게시된 주제와 같은 메시지의 컨텍스트입니다.

이 기능은 Greengrass nucleus 구성 요소의 v2.6.0 이상에서 사용할 수 있습니다. 다음 표에는 메시지 컨텍스트에 액세스하기 위해 사용해야 하는 AWS IoT Device SDK의 최소 버전이 나열되어 있습니다.

참고

AWS IoT Greengrass 코어 소프트웨어는 PublishToTopicSubscribeToTopic 작업에서 동일한 메시지 객체를 사용합니다. AWS IoT Greengrass 코어 소프트웨어는 구독할 때 메시지에서 이 컨텍스트 객체를 설정하고 게시하는 메시지에서는 이 컨텍스트 객체를 무시합니다.

이 객체 MessageContext에는 다음 정보가 포함됩니다.

topic

메시지가 게시된 주제입니다.

binaryMessage/(Python:binary_message /)

(선택 사항) 이진 메시지입니다. 이 객체 BinaryMessage에는 다음 정보가 포함됩니다.

message

Blob인 이진 메시지입니다.

context

메시지가 게시된 주제와 같은 메시지의 컨텍스트입니다.

이 기능은 Greengrass nucleus 구성 요소의 v2.6.0 이상에서 사용할 수 있습니다. 다음 표에는 메시지 컨텍스트에 액세스하기 위해 사용해야 하는 AWS IoT Device SDK의 최소 버전이 나열되어 있습니다.

참고

AWS IoT Greengrass 코어 소프트웨어는 PublishToTopicSubscribeToTopic 작업에서 동일한 메시지 객체를 사용합니다. AWS IoT Greengrass 코어 소프트웨어는 구독할 때 메시지에서 이 컨텍스트 객체를 설정하고 게시하는 메시지에서는 이 컨텍스트 객체를 무시합니다.

이 객체 MessageContext에는 다음 정보가 포함됩니다.

topic

메시지가 게시된 주제입니다.

topicName/(Python:topic_name /)

메시지가 게시된 주제입니다.

참고

이 속성은 현재 사용되지 않습니다. Greengrass nucleus v2.6.0 이상에서는 SubscriptionResponseMessage에서 (jsonMessage|binaryMessage).context.topic 값을 가져와 메시지가 게시된 주제를 가져올 수 있습니다.

예시

다음 예제에서는 사용자 지정 구성 요소 코드에서 이 작업을 직접 호출하는 방법을 보여줍니다.

Java (IPC client V2)
예: 로컬 게시/구독 메시지 구독
package com.aws.greengrass.docs.samples.ipc; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.*; import java.nio.charset.StandardCharsets; import java.util.Optional; public class SubscribeToTopicV2 { public static void main(String[] args) { String topic = args[0]; try (GreengrassCoreIPCClientV2 ipcClient = GreengrassCoreIPCClientV2.builder().build()) { SubscribeToTopicRequest request = new SubscribeToTopicRequest().withTopic(topic); GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToTopicResponse, SubscribeToTopicResponseHandler> response = ipcClient.subscribeToTopic(request, SubscribeToTopicV2::onStreamEvent, Optional.of(SubscribeToTopicV2::onStreamError), Optional.of(SubscribeToTopicV2::onStreamClosed)); SubscribeToTopicResponseHandler responseHandler = response.getHandler(); System.out.println("Successfully subscribed to topic: " + topic); // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } // To stop subscribing, close the stream. responseHandler.closeStream(); } catch (Exception e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Exception occurred when using IPC."); } e.printStackTrace(); System.exit(1); } } public static void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { BinaryMessage binaryMessage = subscriptionResponseMessage.getBinaryMessage(); String message = new String(binaryMessage.getMessage(), StandardCharsets.UTF_8); String topic = binaryMessage.getContext().getTopic(); System.out.printf("Received new message on topic %s: %s%n", topic, message); } catch (Exception e) { System.err.println("Exception occurred while processing subscription response " + "message."); e.printStackTrace(); } } public static boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } public static void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } }
Python (IPC client V2)
예: 로컬 게시/구독 메시지 구독
import sys import time import traceback from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2 from awsiot.greengrasscoreipc.model import ( SubscriptionResponseMessage, UnauthorizedError ) def main(): args = sys.argv[1:] topic = args[0] try: ipc_client = GreengrassCoreIPCClientV2() # Subscription operations return a tuple with the response and the operation. _, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event, on_stream_error=on_stream_error, on_stream_closed=on_stream_closed) print('Successfully subscribed to topic: ' + topic) # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') # To stop subscribing, close the stream. operation.close() except UnauthorizedError: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) traceback.print_exc() exit(1) except Exception: print('Exception occurred', file=sys.stderr) traceback.print_exc() exit(1) def on_stream_event(event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, 'utf-8') topic = event.binary_message.context.topic print('Received new message on topic %s: %s' % (topic, message)) except: traceback.print_exc() def on_stream_error(error: Exception) -> bool: print('Received a stream error.', file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed() -> None: print('Subscribe to topic stream closed.') if __name__ == '__main__': main()
C++
예: 로컬 게시/구독 메시지 구독
#include <iostream> #include </crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); // Handle JSON message. } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); // Handle binary message. } } } bool OnStreamError(OperationError *error) override { // Handle error. return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { // Handle close. } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { // Handle connection to IPC service. } void OnDisconnectCallback(RpcError error) override { // Handle disconnection from IPC service. } bool OnErrorCallback(RpcError error) override { // Handle IPC service connection error. return true; } }; int main() { ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } String topic("my/topic"); int timeout = 10; SubscribeToTopicRequest request; request.SetTopic(topic); //SubscribeResponseHandler streamHandler; auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (!response) { // Handle error. auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); (void)error; // Handle operation error. } else { // Handle RPC error. } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }
JavaScript
예: 로컬 게시/구독 메시지 구독
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc"; import {SubscribeToTopicRequest, SubscriptionResponseMessage} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model"; import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc"; class SubscribeToTopic { private ipcClient : greengrasscoreipc.Client private readonly topic : string; constructor() { // define your own constructor, e.g. this.topic = "<define_your_topic>"; this.subscribeToTopic().then(r => console.log("Started workflow")); } private async subscribeToTopic() { try { this.ipcClient = await getIpcClient(); const subscribeToTopicRequest : SubscribeToTopicRequest = { topic: this.topic, } const streamingOperation = this.ipcClient.subscribeToTopic(subscribeToTopicRequest, undefined); // conditionally apply options streamingOperation.on("message", (message: SubscriptionResponseMessage) => { // parse the message depending on your use cases, e.g. if(message.binaryMessage && message.binaryMessage.message) { const receivedMessage = message.binaryMessage?.message.toString(); } }); streamingOperation.on("streamError", (error : RpcError) => { // define your own error handling logic }) streamingOperation.on("ended", () => { // define your own logic }) await streamingOperation.activate(); // Keep the main thread alive, or the process will exit. await new Promise((resolve) => setTimeout(resolve, 10000)) } catch (e) { // parse the error depending on your use cases throw e } } } export async function getIpcClient(){ try { const ipcClient = greengrasscoreipc.createClient(); await ipcClient.connect() .catch(error => { // parse the error depending on your use cases throw error; }); return ipcClient } catch (err) { // parse the error depending on your use cases throw err } } // starting point const subscribeToTopic = new SubscribeToTopic();

예시

다음 예제를 사용하여 구성 요소에서 게시시/구독 IPC 서비스를 사용하는 방법을 알아봅니다.

다음 예제 레시피에서는 구성 요소가 모든 주제에 게시할 수 있도록 허용합니다.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherJava", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherJava:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "java -jar {artifacts:path}/PubSubPublisher.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubPublisherJava ComponentVersion: '1.0.0' ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: 'com.example.PubSubPublisherJava:pubsub:1': policyDescription: Allows access to publish to all topics. operations: - 'aws.greengrass#PublishToTopic' resources: - '*' Manifests: - Lifecycle: Run: |- java -jar {artifacts:path}/PubSubPublisher.jar

다음 예제 Java 애플리케이션에서는 게시/구독 IPC 서비스를 사용하여 메시지를 다른 구성 요소에 게시하는 방법을 보여줍니다.

/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.ipc.pubsub; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.model.*; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PubSubPublisher { public static void main(String[] args) { String message = "Hello from the pub/sub publisher (Java)."; String topic = "test/topic/java"; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); while (true) { PublishToTopicRequest publishRequest = new PublishToTopicRequest(); PublishMessage publishMessage = new PublishMessage(); BinaryMessage binaryMessage = new BinaryMessage(); binaryMessage.setMessage(message.getBytes(StandardCharsets.UTF_8)); publishMessage.setBinaryMessage(binaryMessage); publishRequest.setPublishMessage(publishMessage); publishRequest.setTopic(topic); CompletableFuture<PublishToTopicResponse> futureResponse = ipcClient .publishToTopic(publishRequest, Optional.empty()).getResponse(); try { futureResponse.get(10, TimeUnit.SECONDS); System.out.println("Successfully published to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while publishing to topic: " + topic); } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while publishing to topic: " + topic); } else { System.err.println("Execution exception while publishing to topic: " + topic); } throw e; } Thread.sleep(5000); } } catch (InterruptedException e) { System.out.println("Publisher interrupted."); } catch (Exception e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } }

다음 예제 레시피에서는 구성 요소가 모든 주제를 구독하도록 허용합니다.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberJava", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberJava:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "java -jar {artifacts:path}/PubSubSubscriber.jar" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubSubscriberJava ComponentVersion: '1.0.0' ComponentDescription: A component that subscribes to messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: 'com.example.PubSubSubscriberJava:pubsub:1': policyDescription: Allows access to subscribe to all topics. operations: - 'aws.greengrass#SubscribeToTopic' resources: - '*' Manifests: - Lifecycle: Run: |- java -jar {artifacts:path}/PubSubSubscriber.jar

다음 예제 Java 애플리케이션에서는 게시/구독 IPC 서비스를 사용하여 다른 구성 요소의 메시지를 구독하는 방법을 보여줍니다.

/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ package com.example.ipc.pubsub; import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient; import software.amazon.awssdk.aws.greengrass.SubscribeToTopicResponseHandler; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicRequest; import software.amazon.awssdk.aws.greengrass.model.SubscribeToTopicResponse; import software.amazon.awssdk.aws.greengrass.model.SubscriptionResponseMessage; import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError; import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection; import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler; import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PubSubSubscriber { public static void main(String[] args) { String topic = "test/topic/java"; try (EventStreamRPCConnection eventStreamRPCConnection = IPCUtils.getEventStreamRpcConnection()) { GreengrassCoreIPCClient ipcClient = new GreengrassCoreIPCClient(eventStreamRPCConnection); SubscribeToTopicRequest subscribeRequest = new SubscribeToTopicRequest(); subscribeRequest.setTopic(topic); SubscribeToTopicResponseHandler operationResponseHandler = ipcClient .subscribeToTopic(subscribeRequest, Optional.of(new SubscribeResponseHandler())); CompletableFuture<SubscribeToTopicResponse> futureResponse = operationResponseHandler.getResponse(); try { futureResponse.get(10, TimeUnit.SECONDS); System.out.println("Successfully subscribed to topic: " + topic); } catch (TimeoutException e) { System.err.println("Timeout occurred while subscribing to topic: " + topic); throw e; } catch (ExecutionException e) { if (e.getCause() instanceof UnauthorizedError) { System.err.println("Unauthorized error while subscribing to topic: " + topic); } else { System.err.println("Execution exception while subscribing to topic: " + topic); } throw e; } // Keep the main thread alive, or the process will exit. try { while (true) { Thread.sleep(10000); } } catch (InterruptedException e) { System.out.println("Subscribe interrupted."); } } catch (Exception e) { System.err.println("Exception occurred when using IPC."); e.printStackTrace(); System.exit(1); } } private static class SubscribeResponseHandler implements StreamResponseHandler<SubscriptionResponseMessage> { @Override public void onStreamEvent(SubscriptionResponseMessage subscriptionResponseMessage) { try { String message = new String(subscriptionResponseMessage.getBinaryMessage() .getMessage(), StandardCharsets.UTF_8); System.out.println("Received new message: " + message); } catch (Exception e) { e.printStackTrace(); } } @Override public boolean onStreamError(Throwable error) { System.err.println("Received a stream error."); error.printStackTrace(); return false; // Return true to close stream, false to keep stream open. } @Override public void onStreamClosed() { System.out.println("Subscribe to topic stream closed."); } } }

다음 예제 레시피에서는 구성 요소가 모든 주제에 게시할 수 있도록 허용합니다.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherPython", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherPython:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "python3 -m pip install --user awsiotsdk", "Run": "python3 -u {artifacts:path}/pubsub_publisher.py" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "install": "py -3 -m pip install --user awsiotsdk", "Run": "py -3 -u {artifacts:path}/pubsub_publisher.py" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubPublisherPython ComponentVersion: 1.0.0 ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubPublisherPython:pubsub:1: policyDescription: Allows access to publish to all topics. operations: - aws.greengrass#PublishToTopic resources: - "*" Manifests: - Platform: os: linux Lifecycle: install: python3 -m pip install --user awsiotsdk Run: python3 -u {artifacts:path}/pubsub_publisher.py - Platform: os: windows Lifecycle: install: py -3 -m pip install --user awsiotsdk Run: py -3 -u {artifacts:path}/pubsub_publisher.py

다음 예제 Python 애플리케이션에서는 게시/구독 IPC 서비스를 사용하여 메시지를 다른 구성 요소에 게시하는 방법을 보여줍니다.

import concurrent.futures import sys import time import traceback import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import ( PublishToTopicRequest, PublishMessage, BinaryMessage, UnauthorizedError ) topic = "test/topic/python" message = "Hello from the pub/sub publisher (Python)." TIMEOUT = 10 try: ipc_client = awsiot.greengrasscoreipc.connect() while True: request = PublishToTopicRequest() request.topic = topic publish_message = PublishMessage() publish_message.binary_message = BinaryMessage() publish_message.binary_message.message = bytes(message, "utf-8") request.publish_message = publish_message operation = ipc_client.new_publish_to_topic() operation.activate(request) future_response = operation.get_response() try: future_response.result(TIMEOUT) print('Successfully published to topic: ' + topic) except concurrent.futures.TimeoutError: print('Timeout occurred while publishing to topic: ' + topic, file=sys.stderr) except UnauthorizedError as e: print('Unauthorized error while publishing to topic: ' + topic, file=sys.stderr) raise e except Exception as e: print('Exception while publishing to topic: ' + topic, file=sys.stderr) raise e time.sleep(5) except InterruptedError: print('Publisher interrupted.') except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

다음 예제 레시피에서는 구성 요소가 모든 주제를 구독하도록 허용합니다.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberPython", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberPython:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Platform": { "os": "linux" }, "Lifecycle": { "install": "python3 -m pip install --user awsiotsdk", "Run": "python3 -u {artifacts:path}/pubsub_subscriber.py" } }, { "Platform": { "os": "windows" }, "Lifecycle": { "install": "py -3 -m pip install --user awsiotsdk", "Run": "py -3 -u {artifacts:path}/pubsub_subscriber.py" } } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubSubscriberPython ComponentVersion: 1.0.0 ComponentDescription: A component that subscribes to messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubSubscriberPython:pubsub:1: policyDescription: Allows access to subscribe to all topics. operations: - aws.greengrass#SubscribeToTopic resources: - "*" Manifests: - Platform: os: linux Lifecycle: install: python3 -m pip install --user awsiotsdk Run: python3 -u {artifacts:path}/pubsub_subscriber.py - Platform: os: windows Lifecycle: install: py -3 -m pip install --user awsiotsdk Run: py -3 -u {artifacts:path}/pubsub_subscriber.py

다음 예제 Python 애플리케이션에서는 게시/구독 IPC 서비스를 사용하여 다른 구성 요소의 메시지를 구독하는 방법을 보여줍니다.

import concurrent.futures import sys import time import traceback import awsiot.greengrasscoreipc import awsiot.greengrasscoreipc.client as client from awsiot.greengrasscoreipc.model import ( SubscribeToTopicRequest, SubscriptionResponseMessage, UnauthorizedError ) topic = "test/topic/python" TIMEOUT = 10 class StreamHandler(client.SubscribeToTopicStreamHandler): def __init__(self): super().__init__() def on_stream_event(self, event: SubscriptionResponseMessage) -> None: try: message = str(event.binary_message.message, "utf-8") print("Received new message: " + message) except: traceback.print_exc() def on_stream_error(self, error: Exception) -> bool: print("Received a stream error.", file=sys.stderr) traceback.print_exc() return False # Return True to close stream, False to keep stream open. def on_stream_closed(self) -> None: print('Subscribe to topic stream closed.') try: ipc_client = awsiot.greengrasscoreipc.connect() request = SubscribeToTopicRequest() request.topic = topic handler = StreamHandler() operation = ipc_client.new_subscribe_to_topic(handler) operation.activate(request) future_response = operation.get_response() try: future_response.result(TIMEOUT) print('Successfully subscribed to topic: ' + topic) except concurrent.futures.TimeoutError as e: print('Timeout occurred while subscribing to topic: ' + topic, file=sys.stderr) raise e except UnauthorizedError as e: print('Unauthorized error while subscribing to topic: ' + topic, file=sys.stderr) raise e except Exception as e: print('Exception while subscribing to topic: ' + topic, file=sys.stderr) raise e # Keep the main thread alive, or the process will exit. try: while True: time.sleep(10) except InterruptedError: print('Subscribe interrupted.') except Exception: print('Exception occurred when using IPC.', file=sys.stderr) traceback.print_exc() exit(1)

다음 예제 레시피에서는 구성 요소가 모든 주제에 게시할 수 있도록 허용합니다.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubPublisherCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that publishes messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubPublisherCpp:pubsub:1": { "policyDescription": "Allows access to publish to all topics.", "operations": [ "aws.greengrass#PublishToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_pubsub_publisher" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubPublisherCpp/1.0.0/greengrassv2_pubsub_publisher", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubPublisherCpp ComponentVersion: 1.0.0 ComponentDescription: A component that publishes messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubPublisherCpp:pubsub:1: policyDescription: Allows access to publish to all topics. operations: - aws.greengrass#PublishToTopic resources: - "*" Manifests: - Lifecycle: Run: "{artifacts:path}/greengrassv2_pubsub_publisher" Artifacts: - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubPublisherCpp/1.0.0/greengrassv2_pubsub_publisher Permission: Execute: OWNER

다음 예제 C++ 애플리케이션에서는 게시/구독 IPC 서비스를 사용하여 메시지를 다른 구성 요소에 게시하는 방법을 보여줍니다.

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String message("Hello from the pub/sub publisher (C++)."); String topic("test/topic/cpp"); int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } while (true) { PublishToTopicRequest request; Vector<uint8_t> messageData({message.begin(), message.end()}); BinaryMessage binaryMessage; binaryMessage.SetMessage(messageData); PublishMessage publishMessage; publishMessage.SetBinaryMessage(binaryMessage); request.SetTopic(topic); request.SetPublishMessage(publishMessage); auto operation = ipcClient.NewPublishToTopic(); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully published to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to publish to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } std::this_thread::sleep_for(std::chrono::seconds(5)); } return 0; }

다음 예제 레시피에서는 구성 요소가 모든 주제를 구독하도록 허용합니다.

JSON
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PubSubSubscriberCpp", "ComponentVersion": "1.0.0", "ComponentDescription": "A component that subscribes to messages.", "ComponentPublisher": "Amazon", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.pubsub": { "com.example.PubSubSubscriberCpp:pubsub:1": { "policyDescription": "Allows access to subscribe to all topics.", "operations": [ "aws.greengrass#SubscribeToTopic" ], "resources": [ "*" ] } } } } }, "Manifests": [ { "Lifecycle": { "Run": "{artifacts:path}/greengrassv2_pub_sub_subscriber" }, "Artifacts": [ { "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pub_sub_subscriber", "Permission": { "Execute": "OWNER" } } ] } ] }
YAML
--- RecipeFormatVersion: '2020-01-25' ComponentName: com.example.PubSubSubscriberCpp ComponentVersion: 1.0.0 ComponentDescription: A component that subscribes to messages. ComponentPublisher: Amazon ComponentConfiguration: DefaultConfiguration: accessControl: aws.greengrass.ipc.pubsub: com.example.PubSubSubscriberCpp:pubsub:1: policyDescription: Allows access to subscribe to all topics. operations: - aws.greengrass#SubscribeToTopic resources: - "*" Manifests: - Lifecycle: Run: "{artifacts:path}/greengrassv2_pub_sub_subscriber" Artifacts: - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.PubSubSubscriberCpp/1.0.0/greengrassv2_pub_sub_subscriber Permission: Execute: OWNER

다음 예제 C++ 애플리케이션에서는 게시/구독 IPC 서비스를 사용하여 다른 구성 요소의 메시지를 구독하는 방법을 보여줍니다.

#include <iostream> #include <aws/crt/Api.h> #include <aws/greengrass/GreengrassCoreIpcClient.h> using namespace Aws::Crt; using namespace Aws::Greengrass; class SubscribeResponseHandler : public SubscribeToTopicStreamHandler { public: virtual ~SubscribeResponseHandler() {} private: void OnStreamEvent(SubscriptionResponseMessage *response) override { auto jsonMessage = response->GetJsonMessage(); if (jsonMessage.has_value() && jsonMessage.value().GetMessage().has_value()) { auto messageString = jsonMessage.value().GetMessage().value().View().WriteReadable(); std::cout << "Received new message: " << messageString << std::endl; } else { auto binaryMessage = response->GetBinaryMessage(); if (binaryMessage.has_value() && binaryMessage.value().GetMessage().has_value()) { auto messageBytes = binaryMessage.value().GetMessage().value(); std::string messageString(messageBytes.begin(), messageBytes.end()); std::cout << "Received new message: " << messageString << std::endl; } } } bool OnStreamError(OperationError *error) override { std::cout << "Received an operation error: "; if (error->GetMessage().has_value()) { std::cout << error->GetMessage().value(); } std::cout << std::endl; return false; // Return true to close stream, false to keep stream open. } void OnStreamClosed() override { std::cout << "Subscribe to topic stream closed." << std::endl; } }; class IpcClientLifecycleHandler : public ConnectionLifecycleHandler { void OnConnectCallback() override { std::cout << "OnConnectCallback" << std::endl; } void OnDisconnectCallback(RpcError error) override { std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl; exit(-1); } bool OnErrorCallback(RpcError error) override { std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl; return true; } }; int main() { String topic("test/topic/cpp"); int timeout = 10; ApiHandle apiHandle(g_allocator); Io::EventLoopGroup eventLoopGroup(1); Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30); Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver); IpcClientLifecycleHandler ipcLifecycleHandler; GreengrassCoreIpcClient ipcClient(bootstrap); auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get(); if (!connectionStatus) { std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl; exit(-1); } SubscribeToTopicRequest request; request.SetTopic(topic); auto streamHandler = MakeShared<SubscribeResponseHandler>(DefaultAllocator()); auto operation = ipcClient.NewSubscribeToTopic(streamHandler); auto activate = operation->Activate(request, nullptr); activate.wait(); auto responseFuture = operation->GetResult(); if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) { std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl; exit(-1); } auto response = responseFuture.get(); if (response) { std::cout << "Successfully subscribed to topic: " << topic << std::endl; } else { // An error occurred. std::cout << "Failed to subscribe to topic: " << topic << std::endl; auto errorType = response.GetResultType(); if (errorType == OPERATION_ERROR) { auto *error = response.GetOperationError(); std::cout << "Operation error: " << error->GetMessage().value() << std::endl; } else { std::cout << "RPC error: " << response.GetRpcError() << std::endl; } exit(-1); } // Keep the main thread alive, or the process will exit. while (true) { std::this_thread::sleep_for(std::chrono::seconds(10)); } operation->Close(); return 0; }