

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 导出支持的 AWS 云 目标的配置
<a name="stream-export-configurations"></a>

用户定义的 Greengrass 组件在流管理器 SDK 中使用 `StreamManagerClient` 与流管理器交互。当组件[创建流](work-with-streams.md#streammanagerclient-create-message-stream)或[更新流](work-with-streams.md#streammanagerclient-create-message-stream)时，它会传递一个表示流属性的 `MessageStreamDefinition` 对象，包括导出定义。`ExportDefinition` 对象包含为流定义的导出配置。流管理器使用这些导出配置来确定将流导出到何处以及如何导出。

![\[ExportDefinition 属性类型的对象模型图。\]](http://docs.aws.amazon.com/zh_cn/greengrass/v2/developerguide/images/stream-manager-exportconfigs.png)


您可以为一个流定义零个或多个导出配置，包括针对单个目标类型的多个导出配置。例如，您可以将一个流导出到两个 AWS IoT Analytics 通道和一个 Kinesis 数据流。

对于失败的导出尝试，流管理器会持续重试将数据导出到，间隔不超过五分钟。 AWS 云 重试次数没有最大限制。

**注意**  
<a name="streammanagerclient-http-config"></a>`StreamManagerClient` 还提供了一个可用于将流导出到 HTTP 服务器的目标。此目标仅用于测试目的。其不稳定，或不支持在生产环境中使用。

**Topics**
+ [AWS IoT Analytics 频道](#export-to-iot-analytics)
+ [Amazon Kinesis data streams](#export-to-kinesis)
+ [AWS IoT SiteWise 资产属性](#export-to-iot-sitewise)
+ [Amazon S3 对象](#export-to-s3)

您有责任维护这些 AWS 云 资源。

## AWS IoT Analytics 频道
<a name="export-to-iot-analytics"></a>

直播管理器支持自动导出到 AWS IoT Analytics。 <a name="ita-export-destination"></a>AWS IoT Analytics 允许您对数据进行高级分析，以帮助做出业务决策和改进机器学习模型。有关更多信息，请参阅[什么是 AWS IoT Analytics？](https://docs.aws.amazon.com/iotanalytics/latest/userguide/welcome.html) 在《*AWS IoT Analytics 用户指南》*中。

在流管理器 SDK 中，您的 Greengrass 组件使用 `IoTAnalyticsConfig` 来定义此目标类型的导出配置。有关更多信息，请参阅目标语言的开发工具包参考：
+ Python SDK 中的 [Io TAnalytics Config](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.IoTAnalyticsConfig)
+ Java SDK 中的 [Io TAnalytics Config](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/export/IoTAnalyticsConfig.html)
+ Node.js SDK 中的 [Io TAnalytics Config](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.IoTAnalyticsConfig.html)

### 要求
<a name="export-to-iot-analytics-reqs"></a>

此导出目的地具有以下要求：
+ 中的目标频道 AWS IoT Analytics 必须与 Greengrass 核心设备相同 AWS 账户 。 AWS 区域 
+ [授权核心设备与 AWS 服务交互](device-service-role.md) 必须允许对目标通道的 `iotanalytics:BatchPutMessage` 权限。例如：

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "iotanalytics:BatchPutMessage"
        ],
        "Resource": [
          "arn:aws:iotanalytics:us-east-1:123456789012:channel/channel_1_name",
          "arn:aws:iotanalytics:us-east-1:123456789012:channel/channel_2_name"
        ]
      }
    ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以授予对资源的具体或条件访问权限（例如，通过使用通配符 `*` 命名方案）。有关更多信息，请参阅 *IAM 用户指南*中的[添加和删除 IAM policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。

### 导出到 AWS IoT Analytics
<a name="export-streams-to-iot-analytics"></a>

要创建导出到的流 AWS IoT Analytics，Greengrass [组件会创建一个包含一个](work-with-streams.md#streammanagerclient-create-message-stream)或多个对象的导出定义的流。`IoTAnalyticsConfig`此对象定义导出设置，例如目标频道、批次大小、批次间隔和优先级。

当您的 Greengrass 组件从设备接收数据时，它们会[将包含大量数据的消息附加](work-with-streams.md#streammanagerclient-append-message)到目标流。

然后，流管理器根据流的导出配置中定义的批处理设置和优先级导出数据。

## Amazon Kinesis data streams
<a name="export-to-kinesis"></a>

流管理器支持自动导出到 Amazon Kinesis Data Streams。<a name="aks-export-destination"></a>Kinesis Data Streams 通常用于聚合大量数据并将其加载到数据仓库 MapReduce 或集群中。有关更多信息，请参阅 *Amazon Kinesis 开发人员指南*中的[什么是 Amazon Kinesis Data Streams？](https://docs.aws.amazon.com/streams/latest/dev/what-is-this-service.html)。

在流管理器 SDK 中，您的 Greengrass 组件使用 `KinesisConfig` 来定义此目标类型的导出配置。有关更多信息，请参阅目标语言的开发工具包参考：
+ [KinesisConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.KinesisConfig)在 Python 软件开发工具包中
+ [KinesisConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/export/KinesisConfig.html)在 Java 开发工具包中
+ [KinesisConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.KinesisConfig.html)在 Node.js SDK 中

### 要求
<a name="export-to-kinesis-reqs"></a>

此导出目的地具有以下要求：
+ Kinesis Data Streams 中的目标流必须与 Greengrass 核心设备 AWS 账户 相同 AWS 区域 。
+ （推荐）流管理器 v2.2.1 提高了将流导出至 Kinesis Data Streams 目的地的性能。要使用该最新版本中的改进之处，请将[流管理器组件](stream-manager-component.md)升级至 v2.2.1，然后使用 [Greengrass 令牌交换角色](device-service-role.md)中的 `kinesis:ListShards` 策略。
+ [授权核心设备与 AWS 服务交互](device-service-role.md) 必须允许对数据流的 `kinesis:PutRecords` 权限。例如：

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "kinesis:PutRecords"
        ],
        "Resource": [
          "arn:aws:kinesis:us-east-1:123456789012:stream/stream_1_name",
          "arn:aws:kinesis:us-east-1:123456789012:stream/stream_2_name"
        ]
      }
    ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以授予对资源的具体或条件访问权限（例如，通过使用通配符 `*` 命名方案）。有关更多信息，请参阅 *IAM 用户指南*中的[添加和删除 IAM policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。

### 导出到 Kinesis Data Streams
<a name="export-streams-to-kinesis"></a>

为创建导出到 Kinesis Data Streams 的流，您的 Greengrass 组件会使用包含一个或多个 `KinesisConfig` 对象的导出定义[创建流](work-with-streams.md#streammanagerclient-create-message-stream)。此对象定义导出设置，例如目标数据流、批次大小、批次间隔和优先级。

当您的 Greengrass 组件从设备接收数据时，它们会[将包含大量数据的消息附加](work-with-streams.md#streammanagerclient-append-message)到目标流。然后，流管理器根据流的导出配置中定义的批处理设置和优先级导出数据。

流管理器会为上传到 Amazon Kinesis 的每条记录生成一个唯一的随机 UUID 作为分区键。

## AWS IoT SiteWise 资产属性
<a name="export-to-iot-sitewise"></a>

直播管理器支持自动导出到 AWS IoT SiteWise。 <a name="itsw-export-destination"></a>AWS IoT SiteWise 允许您大规模收集、组织和分析来自工业设备的数据。有关更多信息，请参阅[什么是 AWS IoT SiteWise？](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/what-is-sitewise.html) 在《*AWS IoT SiteWise 用户指南》*中。

在流管理器 SDK 中，您的 Greengrass 组件使用 `IoTSiteWiseConfig` 来定义此目标类型的导出配置。有关更多信息，请参阅目标语言的开发工具包参考：
+ Python SDK TSite WiseConfig 中的 [Io](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.IoTSiteWiseConfig)
+ Java SDK TSite WiseConfig 中的 [Io](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/export/IoTSiteWiseConfig.html)
+ Node.js SDK TSite WiseConfig 中的 [Io](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.IoTSiteWiseConfig.html)

**注意**  
AWS 还提供了 AWS IoT SiteWise 组件，这些组件提供了一个预先构建的解决方案，可用于流式传输来自 OPC-UA 来源的数据。有关更多信息，请参阅 [物联网 SiteWise OPC UA 采集器](iotsitewise-opcua-collector-component.md)。

### 要求
<a name="export-to-iot-sitewise-reqs"></a>

此导出目的地具有以下要求：
+ 中的目标资产属性 AWS IoT SiteWise 必须与 Greengrass 核心设备相同 AWS 账户 。 AWS 区域 
**注意**  
有关 AWS IoT SiteWise 支持的列表，请参阅《 AWS 区域*AWS 一般参考*》中的[AWS IoT SiteWise 终端节点和配额](https://docs.aws.amazon.com/general/latest/gr/iot-sitewise.html#iot-sitewise_region)。
+ [授权核心设备与 AWS 服务交互](device-service-role.md) 必须允许对目标资产属性的 `iotsitewise:BatchPutAssetPropertyValue` 权限。以下示例策略使用 `iotsitewise:assetHierarchyPath` 条件键来授予对目标根资产及其子项的访问权限。您可以`Condition`从策略中删除，以允许访问您的所有 AWS IoT SiteWise 资产或指定单个 ARNs 资产。

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
         "Effect": "Allow",
         "Action": "iotsitewise:BatchPutAssetPropertyValue",
         "Resource": "*",
         "Condition": {
           "StringLike": {
             "iotsitewise:assetHierarchyPath": [
               "/root node asset ID",
               "/root node asset ID/*"
             ]
           }
         }
      }
    ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以授予对资源的具体或条件访问权限（例如，通过使用通配符 `*` 命名方案）。有关更多信息，请参阅 *IAM 用户指南*中的[添加和删除 IAM policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。

  有关重要的安全信息，请参阅《*AWS IoT SiteWise 用户指南*》中的[ BatchPutAssetPropertyValue 授权](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/security_iam_service-with-iam.html#security_iam_service-with-iam-id-based-policies-batchputassetpropertyvalue-action)。

### 导出到 AWS IoT SiteWise
<a name="export-streams-to-sitewise"></a>

要创建导出到的流 AWS IoT SiteWise，Greengrass [组件会创建一个包含一个](work-with-streams.md#streammanagerclient-create-message-stream)或多个对象的导出定义的流。`IoTSiteWiseConfig`此对象定义导出设置，例如批次大小、批次间隔和优先级。

当您的 Greengrass 组件从设备接收资产属性数据时，它们会将包含数据的消息附加到目标流。消息是 JSON 序列化的 `PutAssetPropertyValueEntry` 对象，其中包含一个或多个资产属性的属性值。有关更多信息，请参阅为 AWS IoT SiteWise 导出目标[追加消息](work-with-streams.md#streammanagerclient-append-message-sitewise)。

**注意**  
<a name="BatchPutAssetPropertyValue-data-reqs"></a>当您向发送数据时 AWS IoT SiteWise，您的数据必须满足`BatchPutAssetPropertyValue`操作的要求。有关更多信息，请参阅《AWS IoT SiteWise API Reference》**中的 [BatchPutAssetPropertyValue](https://docs.aws.amazon.com/iot-sitewise/latest/APIReference/API_BatchPutAssetPropertyValue.html)。

然后，流管理器根据流的导出配置中定义的批处理设置和优先级导出数据。

您可以调整流管理器设置和 Greengrass 组件逻辑来设计您的导出策略。例如：
+ 对于近乎实时的导出，请设置较低的批量大小和间隔设置，并在收到数据时将数据追加到流中。
+ 为了优化批处理、缓解带宽限制或最大限度地降低成本，Greengrass 组件可以在 timestamp-quality-value将数据追加到流之前，为单个资产属性汇集接收到的 (TQV) 数据点。一种策略是在一条消息中批量输入多达 10 种不同的财产资产组合或属性别名，而不是为同一个属性发送多个条目。这有助于流管理器保持在[AWS IoT SiteWise 配额](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/quotas.html)范围内。

## Amazon S3 对象
<a name="export-to-s3"></a>

流管理器支持自动导出到 Amazon S3。<a name="s3-export-destination"></a>您可以使用 Amazon S3 存储和检索大量的数据。有关更多信息，请参阅 [Amazon Simple Storage Service 开发人员指南](https://docs.aws.amazon.com/AmazonS3/latest/dev/Welcome.html)中的*什么是 Amazon S3？*。

在流管理器 SDK 中，您的 Greengrass 组件使用 `S3ExportTaskExecutorConfig` 来定义此目标类型的导出配置。有关更多信息，请参阅目标语言的开发工具包参考：
+ Python 软件开发工具包ExportTaskExecutorConfig中的 S@@ [3](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.S3ExportTaskExecutorConfig)
+ Java 开发工具包ExportTaskExecutorConfig中的 [S3](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/export/S3ExportTaskExecutorConfig.html)
+ Node.js 软件开发工具包ExportTaskExecutorConfig中的 [S3](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.S3ExportTaskExecutorConfig.html)

### 要求
<a name="export-to-s3-reqs"></a>

此导出目的地具有以下要求：
+ 目标 Amazon S3 存储桶必须与 Greengrass 核心设备 AWS 账户 相同。
+ 如果在 **Greengrass 容器**模式下运行的 Lambda 函数将输入文件写入输入文件目录，您必须将该目录作为卷挂载到具有写入权限的容器中。这样可以确保将文件写入根文件系统，并对容器外部运行的流管理器组件可见。
+ 如果 Docker 容器组件将输入文件写入输入文件目录，您必须将该目录作为卷挂载到具有写入权限的容器中。这样可以确保将文件写入根文件系统，并对容器外部运行的流管理器组件可见。
+ [授权核心设备与 AWS 服务交互](device-service-role.md) 必须允许对目标存储桶具有以下权限。例如：

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "s3:PutObject",
          "s3:AbortMultipartUpload",
          "s3:ListMultipartUploadParts"
        ],
        "Resource": [
          "arn:aws:s3:::bucket-1-name/*",
          "arn:aws:s3:::bucket-2-name/*"
        ]
      }
    ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以授予对资源的具体或条件访问权限（例如，通过使用通配符 `*` 命名方案）。有关更多信息，请参阅 *IAM 用户指南*中的[添加和删除 IAM policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。

### 导出到 Amazon S3
<a name="export-streams-to-s3"></a>

为创建导出到 Amazon S3 的流，您的 Greengrass 组件使用 `S3ExportTaskExecutorConfig` 对象来配置导出策略。该策略定义了导出设置，例如分段上传阈值和优先级。对于 Amazon S3 导出，流管理器会上传它从核心设备上的本地文件中读取的数据。要启动上传，您的 Greengrass 组件会将导出任务附加到目标流。导出任务包含有关输入文件和目标 Amazon S3 对象的信息。流管理器按照任务附加到流中的顺序运行任务。

**注意**  
 <a name="bucket-not-key-must-exist"></a>目标存储桶必须已经存在于您的中 AWS 账户。如果指定密钥的对象不存在，则流管理器会为您创建该对象。

Stream Manager 使用分段上传阈值属性、[最小分段大小](configure-stream-manager.md#stream-manager-minimum-part-size)设置和输入文件的大小来确定如何上传数据。分段上传阈值必须大于或等于最小分段大小。如果要并行上传数据，则可以创建多个流。

指定您的目标 Amazon S3 对象的密钥可以在`!{timestamp:value}`占位符中包含有效的 [Java DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html) 字符串。您可以使用这些时间戳占位符根据输入文件数据的上传时间对 Amazon S3 中的数据进行分区。例如，以下键名解析为诸如 `my-key/2020/12/31/data.txt` 之类的值。

```
my-key/!{timestamp:YYYY}/!{timestamp:MM}/!{timestamp:dd}/data.txt
```

**注意**  
如果要监控流的导出状态，请先创建一个状态流，然后将导出流配置为使用该状态流。有关更多信息，请参阅 [监控导出任务](#monitor-export-status-s3)。

#### 管理输入数据
<a name="manage-s3-input-data"></a>

您可以编写代码，供物联网应用用来管理输入数据的生命周期。以下示例工作流程显示了如何使用 Greengrass 组件来管理这些数据。

1. 本地进程从设备或外围设备接收数据，然后将数据写入核心设备上目录中的文件。这些是流管理器的输入文件。

1. Greengrass 组件会扫描该目录，并在创建新文件时[将导出任务附加](work-with-streams.md#streammanagerclient-append-message-export-task)到目标流。该任务是一个 JSON 序列化 `S3ExportTaskDefinition` 对象，用于指定输入文件的 URL、目标 Amazon S3 存储桶和密钥以及可选的用户元数据。

1. 流管理器读取输入文件并按照附加任务的顺序将数据导出到 Amazon S3。<a name="bucket-not-key-must-exist"></a>目标存储桶必须已经存在于您的中 AWS 账户。如果指定密钥的对象不存在，则流管理器会为您创建该对象。

1. Greengrass 组件从状态流[读取消息](work-with-streams.md#streammanagerclient-read-messages)以监控导出状态。导出任务完成后，Greengrass 组件可以删除相应的输入文件。有关更多信息，请参阅 [监控导出任务](#monitor-export-status-s3)。

### 监控导出任务
<a name="monitor-export-status-s3"></a>

您可以编写代码，让物联网应用程序监控您的 Amazon S3 导出状态。您的 Greengrass 组件必须创建状态流，然后配置导出流以将状态更新写入状态流。单个状态流可以接收来自导出到 Amazon S3 的多个流的状态更新。

首先，[创建一个流](work-with-streams.md#streammanagerclient-create-message-stream)以用作状态流。您可以为流配置大小和保留策略，以控制状态消息的生命周期。例如：
+ 如果您不想存储状态消息，请将 `Persistence` 设置为 `Memory`。
+ 将 `StrategyOnFull` 设置为 `OverwriteOldestData`，这样新的状态消息就不会丢失。

然后，创建或更新导出流以使用状态流。具体而言，设置流 `S3ExportTaskExecutorConfig` 导出配置的状态配置属性。此设置会告诉流管理器将有关导出任务的状态消息写入状态流。在 `StatusConfig` 对象中，指定状态流的名称和详细程度。以下支持的值范围从最低 verbose (`ERROR`) 到最长 verbose (`TRACE`) 不等。默认值为 `INFO`。
+ `ERROR`
+ `WARN`
+ `INFO`
+ `DEBUG`
+ `TRACE`

以下示例工作流程显示了 Greengrass 组件如何使用状态流来监控导出状态。

1. 如前面的工作流程所述，Greengrass 组件[将导出任务附加](work-with-streams.md#streammanagerclient-append-message-export-task)到流，后者配置为将有关导出任务的状态消息写入状态流。附加操作返回一个表示任务 ID 的序列号。

1. Greengrass 组件按顺序[读取](work-with-streams.md#streammanagerclient-read-messages)状态流中的消息，然后根据流名称和任务 ID 或消息上下文中的导出任务属性筛选消息。例如，Greengrass 组件可以按导出任务的输入文件 URL 进行筛选，该文件由消息上下文中的 `S3ExportTaskDefinition` 对象表示。

   以下状态代码指示导出任务已达到完成状态：
   + `Success`。上传已成功完成。
   + `Failure`。流管理器遇到错误，例如，指定的存储桶不存在。解决问题后，您可以再次将导出任务追加到流中。
   + `Canceled`。 由于流或导出定义已删除，或者任务的 time-to-live (TTL) 期限已过期，该任务已停止。
**注意**  
该任务的状态也可能为 `InProgress` 或 `Warning`。当事件返回不影响任务执行的错误时，流管理器会发出警告。例如，如果无法清理部分上传，则会返回警告。

1. 导出任务完成后，Greengrass 组件可以删除相应的输入文件。

以下示例显示 Greengrass 组件如何读取和处理状态消息。

------
#### [ Python ]

```
import time
from stream_manager import (
    ReadMessagesOptions,
    Status,
    StatusConfig,
    StatusLevel,
    StatusMessage,
    StreamManagerClient,
)
from stream_manager.util import Util

client = StreamManagerClient()
 
try:
    # Read the statuses from the export status stream
    is_file_uploaded_to_s3 = False
    while not is_file_uploaded_to_s3:
        try:
            messages_list = client.read_messages(
                "StatusStreamName", ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000)
            )
            for message in messages_list:
                # Deserialize the status message first.
                status_message = Util.deserialize_json_bytes_to_obj(message.payload, StatusMessage)

                # Check the status of the status message. If the status is "Success",
                # the file was successfully uploaded to S3.
                # If the status was either "Failure" or "Cancelled", the server was unable to upload the file to S3.
                # We will print the message for why the upload to S3 failed from the status message.
                # If the status was "InProgress", the status indicates that the server has started uploading
                # the S3 task.
                if status_message.status == Status.Success:
                    logger.info("Successfully uploaded file at path " + file_url + " to S3.")
                    is_file_uploaded_to_s3 = True
                elif status_message.status == Status.Failure or status_message.status == Status.Canceled:
                    logger.info(
                        "Unable to upload file at path " + file_url + " to S3. Message: " + status_message.message
                    )
                    is_file_uploaded_to_s3 = True
            time.sleep(5)
        except StreamManagerException:
            logger.exception("Exception while running")
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 参考：[阅读消息 \$1 [StatusMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.StatusMessage)](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.read_messages)

------
#### [ Java ]

```
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient;
import com.amazonaws.greengrass.streammanager.client.StreamManagerClientFactory;
import com.amazonaws.greengrass.streammanager.client.utils.ValidateAndSerialize;
import com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions;
import com.amazonaws.greengrass.streammanager.model.Status;
import com.amazonaws.greengrass.streammanager.model.StatusConfig;
import com.amazonaws.greengrass.streammanager.model.StatusLevel;
import com.amazonaws.greengrass.streammanager.model.StatusMessage;

 try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
    try {
        boolean isS3UploadComplete = false;
        while (!isS3UploadComplete) {
            try {
                // Read the statuses from the export status stream
                List<Message> messages = client.readMessages("StatusStreamName",
                    new ReadMessagesOptions().withMinMessageCount(1L).withReadTimeoutMillis(1000L));
                for (Message message : messages) {
                    // Deserialize the status message first.
                    StatusMessage statusMessage = ValidateAndSerialize.deserializeJsonBytesToObj(message.getPayload(), StatusMessage.class);
                    // Check the status of the status message. If the status is "Success", the file was successfully uploaded to S3.
                    // If the status was either "Failure" or "Canceled", the server was unable to upload the file to S3.
                    // We will print the message for why the upload to S3 failed from the status message.
                    // If the status was "InProgress", the status indicates that the server has started uploading the S3 task.
                    if (Status.Success.equals(statusMessage.getStatus())) {
                        System.out.println("Successfully uploaded file at path " + FILE_URL + " to S3.");
                        isS3UploadComplete = true;
                     } else if (Status.Failure.equals(statusMessage.getStatus()) || Status.Canceled.equals(statusMessage.getStatus())) {
                        System.out.println(String.format("Unable to upload file at path %s to S3. Message %s",
                            statusMessage.getStatusContext().getS3ExportTaskDefinition().getInputUrl(),
                            statusMessage.getMessage()));
                        sS3UploadComplete = true;
                    }
                }
            } catch (StreamManagerException ignored) {
            } finally {
                // Sleep for sometime for the S3 upload task to complete before trying to read the status message.
                Thread.sleep(5000);
            }
        } catch (e) {
        // Properly handle errors.
    }
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 开发工具包参考：[阅读消息 \$1 [StatusMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/StatusMessage.html)](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#readMessages-java.lang.String-com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions-)

------
#### [ Node.js ]

```
const {
    StreamManagerClient, ReadMessagesOptions,
    Status, StatusConfig, StatusLevel, StatusMessage,
    util,
} = require(*'aws-greengrass-stream-manager-sdk'*);

const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        let isS3UploadComplete = false;
        while (!isS3UploadComplete) {
            try {
                // Read the statuses from the export status stream
                const messages = await c.readMessages("StatusStreamName",
                    new ReadMessagesOptions()
                        .withMinMessageCount(1)
                        .withReadTimeoutMillis(1000));

                messages.forEach((message) => {
                    // Deserialize the status message first.
                    const statusMessage = util.deserializeJsonBytesToObj(message.payload, StatusMessage);
                    // Check the status of the status message. If the status is 'Success', the file was successfully uploaded to S3.
                    // If the status was either 'Failure' or 'Cancelled', the server was unable to upload the file to S3.
                    // We will print the message for why the upload to S3 failed from the status message.
                    // If the status was "InProgress", the status indicates that the server has started uploading the S3 task.
                    if (statusMessage.status === Status.Success) {
                        console.log(`Successfully uploaded file at path ${FILE_URL} to S3.`);
                        isS3UploadComplete = true;
                    } else if (statusMessage.status === Status.Failure || statusMessage.status === Status.Canceled) {
                        console.log(`Unable to upload file at path ${FILE_URL} to S3. Message: ${statusMessage.message}`);
                        isS3UploadComplete = true;
                    }
                });
                // Sleep for sometime for the S3 upload task to complete before trying to read the status message.
                await new Promise((r) => setTimeout(r, 5000));
            } catch (e) {
                // Ignored
            }
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 参考：[阅读消息 \$1 [StatusMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StatusMessage.html)](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#readMessages)

------