本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
发布/订阅 AWS IoT Core MQTT 消息
AWS IoT Core MQTT 消息收发 IPC 服务允许您向 AWS IoT Core 发送 MQTT 消息以及从中接收 MQTT 消息。组件可以将消息发布到 AWS IoT Core,并订阅主题以处理来自其它来源的 MQTT 消息。有关 MQTT 的 AWS IoT Core 实现的更多信息,请参阅《AWS IoT Core 开发人员指南》中的 MQTT。
此 MQTT 消息收发 IPC 服务允许您与 AWS IoT Core 交换消息。有关如何在组件之间交换消息的更多信息,请参阅发布/订阅本地消息。
最低 SDK 版本
下表列出了与 AWS IoT Core 进行 MQTT 消息交互(发布和订阅)时必须使用的 AWS IoT Device SDK 最低版本。
授权
要在自定义组件中使用 AWS IoT Core MQTT 消息收发,您必须定义授权策略,允许您的组件发送和接收有关各个主题的消息。有关定义授权策略的信息,请参阅授权组件执行 IPC 操作。
AWS IoT Core MQTT 消息收发的授权策略具有以下属性。
IPC 服务标识符:aws.greengrass.ipc.mqttproxy
| 操作 |
描述 |
资源 |
|
aws.greengrass#PublishToIoTCore
|
允许组件向 AWS IoT Core 发布有关您指定的 MQTT 主题的消息。
|
允许访问所有主题的主题字符串(例如 test/topic 或 *)。您可以使用 MQTT 主题通配符(# 和 +)来匹配多个资源。
|
|
aws.greengrass#SubscribeToIoTCore
|
允许组件订阅来自 AWS IoT Core 的有关您指定的主题的消息。
|
允许访问所有主题的主题字符串(例如 test/topic 或 *)。您可以使用 MQTT 主题通配符(# 和 +)来匹配多个资源。
|
|
*
|
允许组件针对您指定的主题发布和订阅 AWS IoT Core MQTT 消息。
|
允许访问所有主题的主题字符串(例如 test/topic 或 *)。您可以使用 MQTT 主题通配符(# 和 +)来匹配多个资源。
|
AWS IoT Core MQTT 授权策略中的 MQTT 通配符
您可以在 AWS IoT Core MQTT IPC 授权策略中使用 MQTT 通配符。组件可以发布和订阅与您在授权策略中允许的主题筛选条件相匹配的主题。例如,如果组件的授权策略授予 test/topic/# 访问权限,则该组件可以订阅 test/topic/#,也可以发布和订阅 test/topic/filter。
AWS IoT Core MQTT 授权策略中的配方变量
如果您使用的是 Greengrass Nucleus v2.6.0 或更高版本,则可以在授权策略中使用 {iot:thingName} 配方变量。此功能使您可以为一组核心设备配置单个授权策略,其中每台核心设备只能访问包含自己名称的主题。例如,您可以允许组件访问以下主题资源。
devices/{iot:thingName}/messages
有关更多信息,请参阅配方变量和在合并更新中使用配方变量。
授权策略示例
您可以参考以下授权策略示例,帮助您为组件配置授权策略。
例 具有无限制访问权限的示例授权策略
以下示例授权策略允许组件发布和订阅所有主题。
- JSON
-
{
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.MyIoTCorePubSubComponent:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to all topics.",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"*"
]
}
}
}
}
- YAML
-
---
accessControl:
aws.greengrass.ipc.mqttproxy:
com.example.MyIoTCorePubSubComponent:mqttproxy:1:
policyDescription: Allows access to publish/subscribe to all topics.
operations:
- aws.greengrass#PublishToIoTCore
- aws.greengrass#SubscribeToIoTCore
resources:
- "*"
例 具有有限访问权限的示例授权策略
以下示例授权策略允许组件发布和订阅名为 factory/1/events 和 factory/1/actions 的两个主题。
- JSON
-
{
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.MyIoTCorePubSubComponent:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to factory 1 topics.",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"factory/1/actions",
"factory/1/events"
]
}
}
}
}
- YAML
-
---
accessControl:
aws.greengrass.ipc.mqttproxy:
"com.example.MyIoTCorePubSubComponent:mqttproxy:1":
policyDescription: Allows access to publish/subscribe to factory 1 topics.
operations:
- aws.greengrass#PublishToIoTCore
- aws.greengrass#SubscribeToIoTCore
resources:
- factory/1/actions
- factory/1/events
例 核心设备组的示例授权策略
以下示例授权策略允许组件发布和订阅包含运行该组件的核心设备名称的主题。
- JSON
-
{
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.MyIoTCorePubSubComponent:mqttproxy:1": {
"policyDescription": "Allows access to publish/subscribe to all topics.",
"operations": [
"aws.greengrass#PublishToIoTCore",
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"factory/1/devices/{iot:thingName}/controls"
]
}
}
}
}
- YAML
-
---
accessControl:
aws.greengrass.ipc.mqttproxy:
"com.example.MyIoTCorePubSubComponent:mqttproxy:1":
policyDescription: Allows access to publish/subscribe to all topics.
operations:
- aws.greengrass#PublishToIoTCore
- aws.greengrass#SubscribeToIoTCore
resources:
- factory/1/devices/{iot:thingName}/controls
PublishToIoTCore
向 AWS IoT Core 发布有关某个主题的 MQTT 消息。
当您向 AWS IoT Core 发布 MQTT 消息时,每秒有 100 个事务的配额。如果超出此配额,消息将在 Greengrass 设备上排队等候处理。还有每秒 512 Kb 的数据配额,整个账户的配额为每秒 20000 次发布(某些 AWS 区域有 2000 次)。有关 AWS IoT Core 中 MQTT 消息代理限制的更多信息,请参阅 AWS IoT Core 消息代理和协议限制以及配额。
如果超出这些配额,Greengrass 设备将限制向 AWS IoT Core 发布消息。消息存储在内存中的后台处理程序中。默认情况下,分配给后台处理程序的内存为 2.5 Mb。如果后台处理程序已满,则新消息将被拒绝。您可以增大后台处理程序的大小。有关更多信息,请参阅 配置文档中的 Greengrass Nucleus。为避免填满后台处理程序并且需要增加分配的内存,请将发布请求限制为每秒不超过 100 个请求。
当应用程序需要以更高的速率发送消息或发送更大的消息时,可以考虑使用 流管理器 向 Kinesis Data Streams 发送消息。流管理器组件旨在将大量数据传输到 AWS 云。有关更多信息,请参阅 管理 Greengrass 核心设备上的数据流。
请求
此操作的请求包含以下参数:
topicName(Python:topic_name)
-
要向其发布消息的主题。
qos
-
要使用的 MQTT QoS。此枚举 QOS 包含以下值:
payload
-
(可选)以 Blob 形式显示的消息有效载荷。
使用 MQTT 5 时,以下功能适用于 Greengrass Nucleus v2.10.0 及更高版本。如果使用的是 MQTT 3.1.1,则会忽略这些功能。下表列出了访问这些功能必须使用的 AWS IoT 设备 SDK 的最低版本。
payloadFormat
-
(可选)消息有效载荷的格式。如果您未设置 payloadFormat,则假定类型为 BYTES。枚举包含以下值:
retain
-
(可选)指示是否在发布时将 MQTT 保留选项设置为 true。
userProperties
-
(可选)要发送的应用程序特定 UserProperty 对象的列表。UserProperty 对象定义如下:
UserProperty:
key: string
value: string
messageExpiryIntervalSeconds
-
(可选)消息过期并被服务器删除前等待的秒数。如果您未设置此值,消息不会过期。
correlationData
-
(可选)已添加到请求中的信息,可用于将请求与响应相关联。
responseTopic
-
(可选)应用于响应消息的主题。
contentType
-
(可选)消息内容类型的应用程序特定标识符。
响应
此操作在其响应中未提供任何信息。
示例
以下示例演示了如何在自定义组件代码中调用该操作。
- Java (IPC client V2)
-
例 示例:发布消息
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest;
import software.amazon.awssdk.aws.greengrass.model.QOS;
import java.nio.charset.StandardCharsets;
public class PublishToIoTCore {
public static void main(String[] args) {
String topic = args[0];
String message = args[1];
QOS qos = QOS.get(args[2]);
try (GreengrassCoreIPCClientV2 ipcClientV2 = GreengrassCoreIPCClientV2.builder().build()) {
ipcClientV2.publishToIoTCore(new PublishToIoTCoreRequest()
.withTopicName(topic)
.withPayload(message.getBytes(StandardCharsets.UTF_8))
.withQos(qos));
System.out.println("Successfully published to topic: " + topic);
} catch (Exception e) {
System.err.println("Exception occurred.");
e.printStackTrace();
System.exit(1);
}
}
}
- Python (IPC client V2)
-
例 示例:发布消息
此示例假设您使用的是适用于 Python v2 的 AWS IoT Device SDK 1.5.4 版本或更高版本。
import awsiot.greengrasscoreipc.clientv2 as clientV2
topic = 'my/topic'
qos = '1'
payload = 'Hello, World'
ipc_client = clientV2.GreengrassCoreIPCClientV2()
resp = ipc_client.publish_to_iot_core(topic_name=topic, qos=qos, payload=payload)
ipc_client.close()
- Java (IPC client V1)
-
例 示例:发布消息
package com.aws.greengrass.docs.samples.ipc;
import com.aws.greengrass.docs.samples.ipc.util.IPCUtils;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
import software.amazon.awssdk.aws.greengrass.PublishToIoTCoreResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreRequest;
import software.amazon.awssdk.aws.greengrass.model.PublishToIoTCoreResponse;
import software.amazon.awssdk.aws.greengrass.model.QOS;
import software.amazon.awssdk.aws.greengrass.model.UnauthorizedError;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class PublishToIoTCore {
public static final int TIMEOUT_SECONDS = 10;
public static void main(String[] args) {
String topic = args[0];
String message = args[1];
QOS qos = QOS.get(args[2]);
try (EventStreamRPCConnection eventStreamRPCConnection =
IPCUtils.getEventStreamRpcConnection()) {
GreengrassCoreIPCClient ipcClient =
new GreengrassCoreIPCClient(eventStreamRPCConnection);
PublishToIoTCoreResponseHandler responseHandler =
PublishToIoTCore.publishBinaryMessageToTopic(ipcClient, topic, message, qos);
CompletableFuture<PublishToIoTCoreResponse> futureResponse =
responseHandler.getResponse();
try {
futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
System.out.println("Successfully published to topic: " + topic);
} catch (TimeoutException e) {
System.err.println("Timeout occurred while publishing to topic: " + topic);
} catch (ExecutionException e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while publishing to topic: " + topic);
} else {
throw e;
}
}
} catch (InterruptedException e) {
System.out.println("IPC interrupted.");
} catch (ExecutionException e) {
System.err.println("Exception occurred when using IPC.");
e.printStackTrace();
System.exit(1);
}
}
public static PublishToIoTCoreResponseHandler publishBinaryMessageToTopic(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, String message, QOS qos) {
PublishToIoTCoreRequest publishToIoTCoreRequest = new PublishToIoTCoreRequest();
publishToIoTCoreRequest.setTopicName(topic);
publishToIoTCoreRequest.setPayload(message.getBytes(StandardCharsets.UTF_8));
publishToIoTCoreRequest.setQos(qos);
return greengrassCoreIPCClient.publishToIoTCore(publishToIoTCoreRequest, Optional.empty());
}
}
- Python (IPC client V1)
-
例 示例:发布消息
此示例假设您使用的是适用于 Python v2 的 AWS IoT Device SDK 1.5.4 版本或更高版本。
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
QOS,
PublishToIoTCoreRequest
)
TIMEOUT = 10
ipc_client = awsiot.greengrasscoreipc.connect()
topic = "my/topic"
message = "Hello, World"
qos = QOS.AT_LEAST_ONCE
request = PublishToIoTCoreRequest()
request.topic_name = topic
request.payload = bytes(message, "utf-8")
request.qos = qos
operation = ipc_client.new_publish_to_iot_core()
operation.activate(request)
future_response = operation.get_response()
future_response.result(TIMEOUT)
- C++
-
例 示例:发布消息
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
// Handle connection to IPC service.
}
void OnDisconnectCallback(RpcError error) override {
// Handle disconnection from IPC service.
}
bool OnErrorCallback(RpcError error) override {
// Handle IPC service connection error.
return true;
}
};
int main() {
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
String message("Hello, World!");
String topic("my/topic");
QOS qos = QOS_AT_MOST_ONCE;
int timeout = 10;
PublishToIoTCoreRequest request;
Vector<uint8_t> messageData({message.begin(), message.end()});
request.SetTopicName(topic);
request.SetPayload(messageData);
request.SetQos(qos);
auto operation = ipcClient.NewPublishToIoTCore();
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (!response) {
// Handle error.
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
(void)error;
// Handle operation error.
} else {
// Handle RPC error.
}
}
return 0;
}
- JavaScript
-
例 示例:发布消息
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc";
import {QOS, PublishToIoTCoreRequest} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model";
class PublishToIoTCore {
private ipcClient: greengrasscoreipc.Client
private readonly topic: string;
constructor() {
// define your own constructor, e.g.
this.topic = "<define_your_topic>";
this.publishToIoTCore().then(r => console.log("Started workflow"));
}
private async publishToIoTCore() {
try {
const request: PublishToIoTCoreRequest = {
topicName: this.topic,
qos: QOS.AT_LEAST_ONCE, // you can change this depending on your use case
}
this.ipcClient = await getIpcClient();
await this.ipcClient.publishToIoTCore(request);
} catch (e) {
// parse the error depending on your use cases
throw e
}
}
}
export async function getIpcClient(){
try {
const ipcClient = greengrasscoreipc.createClient();
await ipcClient.connect()
.catch(error => {
// parse the error depending on your use cases
throw error;
});
return ipcClient
} catch (err) {
// parse the error depending on your use cases
throw err
}
}
// starting point
const publishToIoTCore = new PublishToIoTCore();
SubscribeToIoTCore
订阅来自 AWS IoT Core 的有关主题或主题筛选条件的 MQTT 消息。当组件的生命周期结束时,AWS IoT Greengrass Core 软件会移除订阅。
此操作是一种订阅操作,您可以在其中订阅事件消息流。要使用此操作,请定义一个流响应处理程序,其中包含处理事件消息、错误和流关闭的函数。有关更多信息,请参阅 订阅 IPC 事件流。
事件消息类型:IoTCoreMessage
请求
此操作的请求包含以下参数:
topicName(Python:topic_name)
-
要订阅的主题。您可以使用 MQTT 主题通配符(# 和 +)来订阅多个主题。
qos
-
要使用的 MQTT QoS。此枚举 QOS 包含以下值:
响应
此操作的响应包含以下信息:
messages
-
MQTT 消息流。此对象 IoTCoreMessage 包含以下信息:
message
-
MQTT 消息。此对象 MQTTMessage 包含以下信息:
topicName (Python: topic_name)
-
消息被发布到的主题。
payload
-
(可选)以 Blob 形式显示的消息有效载荷。
使用 MQTT 5 时,以下功能适用于 Greengrass Nucleus v2.10.0 及更高版本。如果使用的是 MQTT 3.1.1,则会忽略这些功能。下表列出了访问这些功能必须使用的 AWS IoT 设备 SDK 的最低版本。
payloadFormat
-
(可选)消息有效载荷的格式。如果您未设置 payloadFormat,则假定类型为 BYTES。枚举包含以下值:
retain
-
(可选)指示是否在发布时将 MQTT 保留选项设置为 true。
userProperties
-
(可选)要发送的应用程序特定 UserProperty 对象的列表。UserProperty 对象定义如下:
UserProperty:
key: string
value: string
messageExpiryIntervalSeconds
-
(可选)消息过期并被服务器删除前等待的秒数。如果您未设置此值,消息不会过期。
correlationData
-
(可选)已添加到请求中的信息,可用于将请求与响应相关联。
responseTopic
-
(可选)应用于响应消息的主题。
contentType
-
(可选)消息内容类型的应用程序特定标识符。
示例
以下示例演示了如何在自定义组件代码中调用该操作。
- Java (IPC client V2)
-
例 示例:订阅消息
package com.aws.greengrass.docs.samples.ipc;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClientV2;
import software.amazon.awssdk.aws.greengrass.SubscribeToIoTCoreResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.QOS;
import software.amazon.awssdk.aws.greengrass.model.IoTCoreMessage;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToIoTCoreRequest;
import software.amazon.awssdk.aws.greengrass.model.SubscribeToIoTCoreResponse;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
public class SubscribeToIoTCore {
public static void main(String[] args) {
String topic = args[0];
QOS qos = QOS.get(args[1]);
Consumer<IoTCoreMessage> onStreamEvent = ioTCoreMessage ->
System.out.printf("Received new message on topic %s: %s%n",
ioTCoreMessage.getMessage().getTopicName(),
new String(ioTCoreMessage.getMessage().getPayload(), StandardCharsets.UTF_8));
Optional<Function<Throwable, Boolean>> onStreamError =
Optional.of(e -> {
System.err.println("Received a stream error.");
e.printStackTrace();
return false;
});
Optional<Runnable> onStreamClosed = Optional.of(() ->
System.out.println("Subscribe to IoT Core stream closed."));
try (GreengrassCoreIPCClientV2 ipcClientV2 = GreengrassCoreIPCClientV2.builder().build()) {
SubscribeToIoTCoreRequest request = new SubscribeToIoTCoreRequest()
.withTopicName(topic)
.withQos(qos);
GreengrassCoreIPCClientV2.StreamingResponse<SubscribeToIoTCoreResponse, SubscribeToIoTCoreResponseHandler>
streamingResponse = ipcClientV2.subscribeToIoTCore(request, onStreamEvent, onStreamError, onStreamClosed);
streamingResponse.getResponse();
System.out.println("Successfully subscribed to topic: " + topic);
// Keep the main thread alive, or the process will exit.
while (true) {
Thread.sleep(10000);
}
// To stop subscribing, close the stream.
streamingResponse.getHandler().closeStream();
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
} catch (Exception e) {
System.err.println("Exception occurred.");
e.printStackTrace();
System.exit(1);
}
}
}
- Python (IPC client V2)
-
例 示例:订阅消息
此示例假设您使用的是适用于 Python v2 的 AWS IoT Device SDK 1.5.4 版本或更高版本。
import threading
import traceback
import awsiot.greengrasscoreipc.clientv2 as clientV2
topic = 'my/topic'
qos = '1'
def on_stream_event(event):
try:
topic_name = event.message.topic_name
message = str(event.message.payload, 'utf-8')
print(f'Received new message on topic {topic_name}: {message}')
except:
traceback.print_exc()
def on_stream_error(error):
# Return True to close stream, False to keep stream open.
return True
def on_stream_closed():
pass
ipc_client = clientV2.GreengrassCoreIPCClientV2()
resp, operation = ipc_client.subscribe_to_iot_core(
topic_name=topic,
qos=qos,
on_stream_event=on_stream_event,
on_stream_error=on_stream_error,
on_stream_closed=on_stream_closed
)
# Keep the main thread alive, or the process will exit.
event = threading.Event()
event.wait()
# To stop subscribing, close the operation stream.
operation.close()
ipc_client.close()
- Java (IPC client V1)
-
例 示例:订阅消息
package com.aws.greengrass.docs.samples.ipc;
import com.aws.greengrass.docs.samples.ipc.util.IPCUtils;
import software.amazon.awssdk.aws.greengrass.GreengrassCoreIPCClient;
import software.amazon.awssdk.aws.greengrass.SubscribeToIoTCoreResponseHandler;
import software.amazon.awssdk.aws.greengrass.model.*;
import software.amazon.awssdk.eventstreamrpc.EventStreamRPCConnection;
import software.amazon.awssdk.eventstreamrpc.StreamResponseHandler;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SubscribeToIoTCore {
public static final int TIMEOUT_SECONDS = 10;
public static void main(String[] args) {
String topic = args[0];
QOS qos = QOS.get(args[1]);
try (EventStreamRPCConnection eventStreamRPCConnection =
IPCUtils.getEventStreamRpcConnection()) {
GreengrassCoreIPCClient ipcClient =
new GreengrassCoreIPCClient(eventStreamRPCConnection);
StreamResponseHandler<IoTCoreMessage> streamResponseHandler =
new SubscriptionResponseHandler();
SubscribeToIoTCoreResponseHandler responseHandler =
SubscribeToIoTCore.subscribeToIoTCore(ipcClient, topic, qos,
streamResponseHandler);
CompletableFuture<SubscribeToIoTCoreResponse> futureResponse =
responseHandler.getResponse();
try {
futureResponse.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
System.out.println("Successfully subscribed to topic: " + topic);
} catch (TimeoutException e) {
System.err.println("Timeout occurred while subscribing to topic: " + topic);
} catch (ExecutionException e) {
if (e.getCause() instanceof UnauthorizedError) {
System.err.println("Unauthorized error while subscribing to topic: " + topic);
} else {
throw e;
}
}
// Keep the main thread alive, or the process will exit.
try {
while (true) {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
System.out.println("Subscribe interrupted.");
}
// To stop subscribing, close the stream.
responseHandler.closeStream();
} catch (InterruptedException e) {
System.out.println("IPC interrupted.");
} catch (ExecutionException e) {
System.err.println("Exception occurred when using IPC.");
e.printStackTrace();
System.exit(1);
}
}
public static SubscribeToIoTCoreResponseHandler subscribeToIoTCore(GreengrassCoreIPCClient greengrassCoreIPCClient, String topic, QOS qos, StreamResponseHandler<IoTCoreMessage> streamResponseHandler) {
SubscribeToIoTCoreRequest subscribeToIoTCoreRequest = new SubscribeToIoTCoreRequest();
subscribeToIoTCoreRequest.setTopicName(topic);
subscribeToIoTCoreRequest.setQos(qos);
return greengrassCoreIPCClient.subscribeToIoTCore(subscribeToIoTCoreRequest,
Optional.of(streamResponseHandler));
}
public static class SubscriptionResponseHandler implements StreamResponseHandler<IoTCoreMessage> {
@Override
public void onStreamEvent(IoTCoreMessage ioTCoreMessage) {
try {
String topic = ioTCoreMessage.getMessage().getTopicName();
String message = new String(ioTCoreMessage.getMessage().getPayload(),
StandardCharsets.UTF_8);
System.out.printf("Received new message on topic %s: %s%n", topic, message);
} catch (Exception e) {
System.err.println("Exception occurred while processing subscription response " +
"message.");
e.printStackTrace();
}
}
@Override
public boolean onStreamError(Throwable error) {
System.err.println("Received a stream error.");
error.printStackTrace();
return false;
}
@Override
public void onStreamClosed() {
System.out.println("Subscribe to IoT Core stream closed.");
}
}
}
- Python (IPC client V1)
-
例 示例:订阅消息
此示例假设您使用的是适用于 Python v2 的 AWS IoT Device SDK 1.5.4 版本或更高版本。
import time
import traceback
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
IoTCoreMessage,
QOS,
SubscribeToIoTCoreRequest
)
TIMEOUT = 10
ipc_client = awsiot.greengrasscoreipc.connect()
class StreamHandler(client.SubscribeToIoTCoreStreamHandler):
def __init__(self):
super().__init__()
def on_stream_event(self, event: IoTCoreMessage) -> None:
try:
message = str(event.message.payload, "utf-8")
topic_name = event.message.topic_name
# Handle message.
except:
traceback.print_exc()
def on_stream_error(self, error: Exception) -> bool:
# Handle error.
return True # Return True to close stream, False to keep stream open.
def on_stream_closed(self) -> None:
# Handle close.
pass
topic = "my/topic"
qos = QOS.AT_MOST_ONCE
request = SubscribeToIoTCoreRequest()
request.topic_name = topic
request.qos = qos
handler = StreamHandler()
operation = ipc_client.new_subscribe_to_iot_core(handler)
operation.activate(request)
future_response = operation.get_response()
future_response.result(TIMEOUT)
# Keep the main thread alive, or the process will exit.
while True:
time.sleep(10)
# To stop subscribing, close the operation stream.
operation.close()
- C++
-
例 示例:订阅消息
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler {
public:
virtual ~IoTCoreResponseHandler() {}
private:
void OnStreamEvent(IoTCoreMessage *response) override {
auto message = response->GetMessage();
if (message.has_value() && message.value().GetPayload().has_value()) {
auto messageBytes = message.value().GetPayload().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
std::string topicName = message.value().GetTopicName().value().c_str();
// Handle message.
}
}
bool OnStreamError(OperationError *error) override {
// Handle error.
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
// Handle close.
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
// Handle connection to IPC service.
}
void OnDisconnectCallback(RpcError error) override {
// Handle disconnection from IPC service.
}
bool OnErrorCallback(RpcError error) override {
// Handle IPC service connection error.
return true;
}
};
int main() {
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
String topic("my/topic");
QOS qos = QOS_AT_MOST_ONCE;
int timeout = 10;
SubscribeToIoTCoreRequest request;
request.SetTopicName(topic);
request.SetQos(qos);
auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (!response) {
// Handle error.
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
(void)error;
// Handle operation error.
} else {
// Handle RPC error.
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}
- JavaScript
-
例 示例:订阅消息
import * as greengrasscoreipc from "aws-iot-device-sdk-v2/dist/greengrasscoreipc";
import {IoTCoreMessage, QOS, SubscribeToIoTCoreRequest} from "aws-iot-device-sdk-v2/dist/greengrasscoreipc/model";
import {RpcError} from "aws-iot-device-sdk-v2/dist/eventstream_rpc";
class SubscribeToIoTCore {
private ipcClient: greengrasscoreipc.Client
private readonly topic: string;
constructor() {
// define your own constructor, e.g.
this.topic = "<define_your_topic>";
this.subscribeToIoTCore().then(r => console.log("Started workflow"));
}
private async subscribeToIoTCore() {
try {
const request: SubscribeToIoTCoreRequest = {
topicName: this.topic,
qos: QOS.AT_LEAST_ONCE, // you can change this depending on your use case
}
this.ipcClient = await getIpcClient();
const streamingOperation = this.ipcClient.subscribeToIoTCore(request);
streamingOperation.on('message', (message: IoTCoreMessage) => {
// parse the message depending on your use cases, e.g.
if (message.message && message.message.payload) {
const receivedMessage = message.message.payload.toString();
}
});
streamingOperation.on('streamError', (error : RpcError) => {
// define your own error handling logic
});
streamingOperation.on('ended', () => {
// define your own logic
});
await streamingOperation.activate();
// Keep the main thread alive, or the process will exit.
await new Promise((resolve) => setTimeout(resolve, 10000))
} catch (e) {
// parse the error depending on your use cases
throw e
}
}
}
export async function getIpcClient(){
try {
const ipcClient = greengrasscoreipc.createClient();
await ipcClient.connect()
.catch(error => {
// parse the error depending on your use cases
throw error;
});
return ipcClient
} catch (err) {
// parse the error depending on your use cases
throw err
}
}
// starting point
const subscribeToIoTCore = new SubscribeToIoTCore();
示例
使用以下示例了解如何在组件中使用 AWS IoT Core MQTT IPC 服务。
以下示例配方允许该组件发布至所有主题。
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.IoTCorePublisherCpp",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that publishes MQTT messages to IoT Core.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.IoTCorePublisherCpp:mqttproxy:1": {
"policyDescription": "Allows access to publish to all topics.",
"operations": [
"aws.greengrass#PublishToIoTCore"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"Run": "{artifacts:path}/greengrassv2_iotcore_publisher"
},
"Artifacts": [
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCorePublisherCpp/1.0.0/greengrassv2_iotcore_publisher",
"Permission": {
"Execute": "OWNER"
}
}
]
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.IoTCorePublisherCpp
ComponentVersion: 1.0.0
ComponentDescription: A component that publishes MQTT messages to IoT Core.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.mqttproxy:
com.example.IoTCorePublisherCpp:mqttproxy:1:
policyDescription: Allows access to publish to all topics.
operations:
- aws.greengrass#PublishToIoTCore
resources:
- "*"
Manifests:
- Lifecycle:
Run: "{artifacts:path}/greengrassv2_iotcore_publisher"
Artifacts:
- URI: s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCorePublisherCpp/1.0.0/greengrassv2_iotcore_publisher
Permission:
Execute: OWNER
以下示例 C++ 应用程序演示了如何使用 AWS IoT Core MQTT IPC 服务向 AWS IoT Core 发布消息。
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String message("Hello from the Greengrass IPC MQTT publisher (C++).");
String topic("test/topic/cpp");
QOS qos = QOS_AT_LEAST_ONCE;
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
while (true) {
PublishToIoTCoreRequest request;
Vector<uint8_t> messageData({message.begin(), message.end()});
request.SetTopicName(topic);
request.SetPayload(messageData);
request.SetQos(qos);
auto operation = ipcClient.NewPublishToIoTCore();
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully published to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to publish to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
std::this_thread::sleep_for(std::chrono::seconds(5));
}
return 0;
}
以下示例配方允许该组件订阅所有主题。
- JSON
-
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.IoTCoreSubscriberCpp",
"ComponentVersion": "1.0.0",
"ComponentDescription": "A component that subscribes to MQTT messages from IoT Core.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.IoTCoreSubscriberCpp:mqttproxy:1": {
"policyDescription": "Allows access to subscribe to all topics.",
"operations": [
"aws.greengrass#SubscribeToIoTCore"
],
"resources": [
"*"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"Run": "{artifacts:path}/greengrassv2_iotcore_subscriber"
},
"Artifacts": [
{
"URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/greengrassv2_iotcore_subscriber",
"Permission": {
"Execute": "OWNER"
}
}
]
}
]
}
- YAML
-
---
RecipeFormatVersion: '2020-01-25'
ComponentName: com.example.IoTCoreSubscriberCpp
ComponentVersion: 1.0.0
ComponentDescription: A component that subscribes to MQTT messages from IoT Core.
ComponentPublisher: Amazon
ComponentConfiguration:
DefaultConfiguration:
accessControl:
aws.greengrass.ipc.mqttproxy:
com.example.IoTCoreSubscriberCpp:mqttproxy:1:
policyDescription: Allows access to subscribe to all topics.
operations:
- aws.greengrass#SubscribeToIoTCore
resources:
- "*"
Manifests:
- Lifecycle:
Run: "{artifacts:path}/greengrassv2_iotcore_subscriber"
Artifacts:
- URI: s3://amzn-s3-demo-bucket/artifacts/com.example.IoTCoreSubscriberCpp/1.0.0/greengrassv2_iotcore_subscriber
Permission:
Execute: OWNER
以下示例 C++ 应用程序演示了如何使用 AWS IoT Core MQTT IPC 服务订阅来自 AWS IoT Core 的消息。
#include <iostream>
#include <aws/crt/Api.h>
#include <aws/greengrass/GreengrassCoreIpcClient.h>
using namespace Aws::Crt;
using namespace Aws::Greengrass;
class IoTCoreResponseHandler : public SubscribeToIoTCoreStreamHandler {
public:
virtual ~IoTCoreResponseHandler() {}
private:
void OnStreamEvent(IoTCoreMessage *response) override {
auto message = response->GetMessage();
if (message.has_value() && message.value().GetPayload().has_value()) {
auto messageBytes = message.value().GetPayload().value();
std::string messageString(messageBytes.begin(), messageBytes.end());
std::string messageTopic = message.value().GetTopicName().value().c_str();
std::cout << "Received new message on topic: " << messageTopic << std::endl;
std::cout << "Message: " << messageString << std::endl;
}
}
bool OnStreamError(OperationError *error) override {
std::cout << "Received an operation error: ";
if (error->GetMessage().has_value()) {
std::cout << error->GetMessage().value();
}
std::cout << std::endl;
return false; // Return true to close stream, false to keep stream open.
}
void OnStreamClosed() override {
std::cout << "Subscribe to IoT Core stream closed." << std::endl;
}
};
class IpcClientLifecycleHandler : public ConnectionLifecycleHandler {
void OnConnectCallback() override {
std::cout << "OnConnectCallback" << std::endl;
}
void OnDisconnectCallback(RpcError error) override {
std::cout << "OnDisconnectCallback: " << error.StatusToString() << std::endl;
exit(-1);
}
bool OnErrorCallback(RpcError error) override {
std::cout << "OnErrorCallback: " << error.StatusToString() << std::endl;
return true;
}
};
int main() {
String topic("test/topic/cpp");
QOS qos = QOS_AT_LEAST_ONCE;
int timeout = 10;
ApiHandle apiHandle(g_allocator);
Io::EventLoopGroup eventLoopGroup(1);
Io::DefaultHostResolver socketResolver(eventLoopGroup, 64, 30);
Io::ClientBootstrap bootstrap(eventLoopGroup, socketResolver);
IpcClientLifecycleHandler ipcLifecycleHandler;
GreengrassCoreIpcClient ipcClient(bootstrap);
auto connectionStatus = ipcClient.Connect(ipcLifecycleHandler).get();
if (!connectionStatus) {
std::cerr << "Failed to establish IPC connection: " << connectionStatus.StatusToString() << std::endl;
exit(-1);
}
SubscribeToIoTCoreRequest request;
request.SetTopicName(topic);
request.SetQos(qos);
auto streamHandler = MakeShared<IoTCoreResponseHandler>(DefaultAllocator());
auto operation = ipcClient.NewSubscribeToIoTCore(streamHandler);
auto activate = operation->Activate(request, nullptr);
activate.wait();
auto responseFuture = operation->GetResult();
if (responseFuture.wait_for(std::chrono::seconds(timeout)) == std::future_status::timeout) {
std::cerr << "Operation timed out while waiting for response from Greengrass Core." << std::endl;
exit(-1);
}
auto response = responseFuture.get();
if (response) {
std::cout << "Successfully subscribed to topic: " << topic << std::endl;
} else {
// An error occurred.
std::cout << "Failed to subscribe to topic: " << topic << std::endl;
auto errorType = response.GetResultType();
if (errorType == OPERATION_ERROR) {
auto *error = response.GetOperationError();
std::cout << "Operation error: " << error->GetMessage().value() << std::endl;
} else {
std::cout << "RPC error: " << response.GetRpcError() << std::endl;
}
exit(-1);
}
// Keep the main thread alive, or the process will exit.
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(10));
}
operation->Close();
return 0;
}