

终止支持通知：2026 年 10 月 7 日， AWS 将停止对的支持。 AWS IoT Greengrass Version 1 2026 年 10 月 7 日之后，您将无法再访问这些 AWS IoT Greengrass V1 资源。如需了解更多信息，请访问[迁移自 AWS IoT Greengrass Version 1](https://docs.aws.amazon.com/greengrass/v2/developerguide/migrate-from-v1.html)。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 管理 AWS IoT Greengrass 核心上的数据流
<a name="stream-manager"></a>

AWS IoT Greengrass 流管理器可以更轻松、更可靠地将大量物联网数据传输到. AWS 云流管理器在本地处理数据流并 AWS 云 自动将其导出到。此功能与常见的边缘场景集成，例如机器学习 (ML) 推理，在这种场景中，在将数据导出到 AWS 云 或本地存储目标之前，先在本地处理和分析数据。

流管理器简化了应用程序开发。IoT 应用程序可以使用标准化机制来处理大容量流和管理本地数据保留策略，而不是构建自定义流管理功能。IoT 应用程序可以读取和写入流。它们可以在每个流的基础之上定义存储类型、大小和数据的保留策略，以控制流管理器处理和导出流的方式。

流管理器设计为在间歇性或有限连接的环境中工作。您可以定义带宽使用、超时行为以及当核心连接或断开连接时如何处理流数据。对于关键数据，您可以设置优先级以控制流导出到 AWS 云的顺序。

您可以配置自动导出到以 AWS 云 进行存储或进一步处理和分析。直播管理器支持导出到以下 AWS 云 目的地。<a name="supported-export-destinations"></a>
+ 频道进来 AWS IoT Analytics。 <a name="ita-export-destination"></a>AWS IoT Analytics 允许您对数据进行高级分析，以帮助做出业务决策和改进机器学习模型。有关更多信息，请参阅[什么是 AWS IoT Analytics？](https://docs.aws.amazon.com/iotanalytics/latest/userguide/welcome.html) 在《*AWS IoT Analytics 用户指南》*中。
+ Kinesis Data Streams 中的流。<a name="aks-export-destination"></a>Kinesis Data Streams 通常用于聚合大量数据并将其加载到数据仓库或 map-reduce 集群中。有关更多信息，请参阅 *Amazon Kinesis 开发人员指南*中的[什么是 Amazon Kinesis Data Streams？](https://docs.aws.amazon.com/streams/latest/dev/what-is-this-service.html)。
+ 中的资产属性 AWS IoT SiteWise。 <a name="itsw-export-destination"></a>AWS IoT SiteWise 允许您大规模收集、组织和分析来自工业设备的数据。有关更多信息，请参阅[什么是 AWS IoT SiteWise？](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/what-is-sitewise.html) 在《*AWS IoT SiteWise 用户指南》*中。
+ Amazon S3 中的对象。<a name="s3-export-destination"></a>您可以使用 Amazon S3 存储和检索大量的数据。有关更多信息，请参阅 *Amazon Simple Storage Service 开发人员指南*中的[什么是 Amazon S3？](https://docs.aws.amazon.com/AmazonS3/latest/dev/Welcome.html)。

## 流管理工作流
<a name="stream-manager-workflow"></a>

您的物联网应用通过 C AWS IoT Greengrass ore SDK 与流管理器进行交互。在简单工作流中，在 Greengrass 核心上运行的用户定义的 Lambda 函数会消耗 IoT 数据，如时间序列温度和压力指标。Lambda 函数可能会筛选或压缩数据，然后调用 AWS IoT Greengrass Core SDK 将数据写入流管理器中的流。流管理器可以根据为流定义的策略自动将流导出到 AWS 云 。用户定义的 Lambda 函数还可以将数据直接发送到本地数据库或存储库。

您的 IoT 应用程序可以包含多个用户定义的 Lambda 函数用于读取或写入流。这些本地 Lambda 函数可以读取和写入流，以便在本地过滤、聚合和分析数据。这样可以在数据从核心传输到云或本地目的地之前快速响应本地事件并提取有价值的信息。

下图显示了工作流程示例。

![\[流管理器工作流程图。\]](http://docs.aws.amazon.com/zh_cn/greengrass/v1/developerguide/images/stream-manager-architecture.png)


要使用流管理器，请首先配置流管理器参数来定义组级别的运行时设置，以应用到 Greengrass 核心上的所有流中。这些可自定义设置允许您根据业务需求和环境约束控制流管理器存储、处理和导出流的方式。有关更多信息，请参阅 [配置 AWS IoT Greengrass 直播管理器](configure-stream-manager.md)。

配置流管理器后，您可以创建和部署 IoT 应用程序。这些通常是用户定义的 Lambda 函数，用于`StreamManagerClient`在 AWS IoT Greengrass Core SDK 中创建流并与之交互。在创建流期间，Lambda 函数会定义每个流的策略，例如导出目标、优先级和持久性。有关更多信息（包括 `StreamManagerClient` 操作的代码片段），请参阅[StreamManagerClient 用于处理直播](work-with-streams.md)。

有关配置简单工作流程的教程，请参阅[将数据流导出到 AWS 云 （控制台）](stream-manager-console.md)或[将数据流导出到 AWS 云 (CLI)](stream-manager-cli.md)。

## 要求
<a name="stream-manager-requirements"></a>

以下要求适用于使用流管理器：
+ 您必须使用 AWS IoT Greengrass 核心软件 v1.10 或更高版本，并启用直播管理器。有关更多信息，请参阅 [配置 AWS IoT Greengrass 直播管理器](configure-stream-manager.md)。

  <a name="stream-manager-not-supported-openwrt-para"></a> OpenWrt 发行版不支持直播管理器。
+ 核心设备上必须安装 Java 8 运行时 (JDK 8)。<a name="install-java8-runtime-general"></a>
  + 对于基于 Debian 的发行版（包括 Raspbian）或基于 Ubuntui 的发行版，运行以下命令：

    ```
    sudo apt install openjdk-8-jdk
    ```
  + 对于基于 Red Hat 的发行版（包括 Amazon Linux），请运行以下命令：

    ```
    sudo yum install java-1.8.0-openjdk
    ```

    有关更多信息，请参阅 OpenJDK 文档中的[如何下载并安装预先构建的 OpenJDK 程序包](https://openjdk.java.net/install/)。

   
+ 除了基本的 AWS IoT Greengrass 核心软件外，直播管理器还需要至少 70 MB 的内存。您的总内存需求取决于您的工作负载。

   
+ 用户定义的 Lambda 函数必须使用 [AWS IoT Greengrass 核心开发工具包](lambda-functions.md#lambda-sdks-core)与流管理器交互。 AWS IoT Greengrass Core SDK 有多种语言版本，但只有以下版本支持直播管理器操作：<a name="streammanagerclient-sdk-versions"></a>
  + Java SDK（v1.4.0 或更高版本）
  + Python SDK（v1.5.0 或更高版本）
  + Node.js SDK（v1.6.0 或更高版本）

  下载与 Lambda 函数运行时对应的开发工具包版本，并将其包含在 Lambda 函数部署包中。
**注意**  
适用于 Python 的 AWS IoT Greengrass 核心开发工具包需要 Python 3.7 或更高版本，并且还有其他软件包依赖关系。有关更多信息，请参阅[创建 Lambda 函数部署包（控制台）](stream-manager-console.md#stream-manager-console-create-deployment-package)或[创建 Lambda 函数部署包 (CLI)](stream-manager-cli.md#stream-manager-cli-create-deployment-package)。
+ 如果您为直播定义 AWS 云 导出目标，则必须创建导出目标并以 Greengrass 组角色授予访问权限。根据不同的目的地，也可能适用其他要求。有关更多信息，请参阅：<a name="export-destinations-links"></a>
  + [AWS IoT Analytics 频道](stream-export-configurations.md#export-to-iot-analytics)
  + [Amazon Kinesis data streams](stream-export-configurations.md#export-to-kinesis)
  + [AWS IoT SiteWise 资产属性](stream-export-configurations.md#export-to-iot-sitewise)
  + [Amazon S3 对象](stream-export-configurations.md#export-to-s3)

  您有责任维护这些 AWS 云 资源。

## 数据安全性
<a name="stream-manager-security"></a>

使用流管理器时，请注意以下安全注意事项。

### 本地数据安全性
<a name="stream-manager-security-stream-data"></a>

AWS IoT Greengrass 不加密核心设备上组件间的静态数据或本地传输的流数据。
+ **静态数据**。流数据存储在本地存储在 Greengrass 核心的存储目录中。为了确保数据安全， AWS IoT Greengrass 依赖于 Unix 文件权限和全盘加密（如果启用）。您可以使用可选的 [STREAM\$1MANAGER\$1STORE\$1ROOT\$1DIR](configure-stream-manager.md#STREAM_MANAGER_STORE_ROOT_DIR) 参数指定存储目录。如果稍后将此参数更改为使用其他存储目录，则 AWS IoT Greengrass 不会删除以前的存储目录或其内容。

   
+ **数据在本地传输**。 AWS IoT Greengrass 不会对数据源、Lambda 函数、Core SDK 和流管理器之间在 AWS IoT Greengrass 核心上传输的流数据进行加密。

   
+ **数据正在传输到 AWS 云**. 由流管理器导出到的数据流 AWS 云 使用带有传输层安全 (TLS) 的标准 AWS 服务客户端加密。

有关更多信息，请参阅 [数据加密](data-encryption.md)。

### 客户端身份验证
<a name="stream-manager-security-client-authentication"></a>

直播管理器客户端使用 AWS IoT Greengrass Core SDK 与直播管理器通信。启用客户端身份验证后，只有 Greengrass 组中的 Lambda 函数才能与流管理器中的流交互。禁用客户端身份验证时，Greengrass 核心上运行的任何进程（如 [Docker 容器](docker-app-connector.md)）都可以与流管理器中的流进行交互。只有在您的业务案例需要时才应禁用身份验证。

您可以使用 [STREAM\$1MANAGER\$1AUTHENTICATE\$1CLIENT](configure-stream-manager.md#STREAM_MANAGER_AUTHENTICATE_CLIENT) 参数来设置客户端身份验证模式。您可以通过控制台或 AWS IoT Greengrass API 配置此参数。更改在部署组后生效。


****  

|   | 已启用 | 已禁用 | 
| --- | --- | --- | 
| 参数值 | `true`（默认值和推荐值） | `false` | 
| 允许的客户端 | Greengrass 组中的用户定义 Lambda 函数 | Greengrass 组中的用户定义 Lambda 函数 Greengrass 核心设备上运行的其他进程 | 

## 另请参阅
<a name="stream-manager-see-also"></a>
+ [配置 AWS IoT Greengrass 直播管理器](configure-stream-manager.md)
+ [StreamManagerClient 用于处理直播](work-with-streams.md)
+ [导出支持的 AWS 云 目标的配置](stream-export-configurations.md)
+ [将数据流导出到 AWS 云 （控制台）](stream-manager-console.md)
+ [将数据流导出到 AWS 云 (CLI)](stream-manager-cli.md)

# 配置 AWS IoT Greengrass 直播管理器
<a name="configure-stream-manager"></a>

在 AWS IoT Greengrass 核心上，流管理器可以存储、处理和导出物联网设备数据。流管理器提供用于配置组级运行时设置的参数。这些设置适用于 Greengrass 核心上的所有流。您可以使用 AWS IoT 控制台或 AWS IoT Greengrass API 来配置直播管理器设置。更改在部署组后生效。

**注意**  
配置流管理器后，您可以创建和部署在 Greengrass 核心上运行并与流管理器交互的 IoT 应用程序。这些 IoT 应用程序通常是用户定义的 Lambda 函数。有关更多信息，请参阅 [StreamManagerClient 用于处理直播](work-with-streams.md)。

## 流管理器参数
<a name="stream-manager-parameters"></a>

流管理器提供以下允许您定义组级别设置的参数。所有参数都是可选的。

**存储目录**  <a name="STREAM_MANAGER_STORE_ROOT_DIR"></a>
参数名称: `STREAM_MANAGER_STORE_ROOT_DIR`  
用于存储流的本地目录的绝对路径。此值必须以正斜杠开头（例如， `/data`）。  
有关保护流数据安全的信息，请参阅[本地数据安全性](stream-manager.md#stream-manager-security-stream-data)。  
最低 AWS IoT Greengrass 核心版本：1.10.0

**服务器端口**  
参数名称: `STREAM_MANAGER_SERVER_PORT`  
用于与流管理器通信的本地端口号。默认值为 `8088`。  
最低 AWS IoT Greengrass 核心版本：1.10.0

**验证客户端身份**  <a name="STREAM_MANAGER_AUTHENTICATE_CLIENT"></a>
参数名称: `STREAM_MANAGER_AUTHENTICATE_CLIENT`  
指示客户端是否必须通过身份验证才能与流管理器交互。客户端和直播管理器之间的所有交互均由 AWS IoT Greengrass Core SDK 控制。此参数确定哪些客户端可以调用 AWS IoT Greengrass 核心开发工具包来处理流。有关更多信息，请参阅 [客户端身份验证](stream-manager.md#stream-manager-security-client-authentication)。  
有效值为 `true` 或 `false`。默认值为 `true`（推荐）。  
+ `true`。仅允许 Greengrass Lambda 函数作为客户端。Lambda 函数客户端使用内部 AWS IoT Greengrass 核心协议通过核心 SDK 进行身份验证。 AWS IoT Greengrass 
+ `false`。 允许在 AWS IoT Greengrass 核心上运行的任何进程成为客户端。除非您的业务案例需要，否则请勿设置为 `false`。例如，仅当核心设备上的非 Lambda 进程必须直接与流管理器（例如在核心上运行的 [Docker 容器](docker-app-connector.md)）通信时，才将此值设置为 `false`。
最低 AWS IoT Greengrass 核心版本：1.10.0

**最大带宽**  
参数名称: `STREAM_MANAGER_EXPORTER_MAX_BANDWIDTH`  
可用于导出数据的平均最大带宽（以千位/秒为单位）。默认设置允许无限制使用可用带宽。  
最低 AWS IoT Greengrass 核心版本：1.10.0

**线程池大小**  
参数名称: `STREAM_MANAGER_EXPORTER_THREAD_POOL_SIZE`  
可用于导出数据的最大活动线程数。默认值为 `5`。  
最佳大小取决于您的硬件、流的量和计划的导出流数量。如果导出速度较慢，您可以调整此设置以找出适合您的硬件和业务案例的最佳大小。核心设备硬件的 CPU 和内存是限制因素。首先，您可以尝试将此值设置为等于设备上的处理器核心数。  
请注意，不要设置大于硬件可以支持的大小。每个流都会消耗硬件资源，因此您应该尝试限制受约束设备上的导出流的数量。  
最低 AWS IoT Greengrass 核心版本：1.10.0

**JVM 参数**  
参数名称: `JVM_ARGS`  
在启动时传递给流管理器的自定义 Java 虚拟机参数。多个参数应该用空格分隔。  
仅当您必须覆盖 JVM 使用的默认设置时才使用此参数。例如，如果计划导出大量的流，则可能需要增加默认堆大小。  
最低 AWS IoT Greengrass 核心版本：1.10.0

**只读输入文件目录**  <a name="stream-manager-read-only-directories"></a>
参数名称: `STREAM_MANAGER_READ_ONLY_DIRS`  
以逗号分隔的列表，列出了根文件系统以外存储输入文件的目录的绝对路径。流管理器读取文件并将其上传到 Amazon S3，并将这些目录挂载为只读。有关更多关于导出至 Amazon S3 的信息，请参阅 [Amazon S3 对象](stream-export-configurations.md#export-to-s3)。  
仅当满足以下条件时才使用此参数：  
+ 导出到 Amazon S3 的流的输入文件目录位于以下位置之一：
  + 根文件系统以外的分区。
  + 在根文件系统中的 `/tmp` 下。
+ Greengrass 组的[默认容器化](lambda-group-config.md#lambda-containerization-groupsettings)是 **Greengrass 容器**。
示例值：`/mnt/directory-1,/mnt/directory-2,/tmp`  
最低 AWS IoT Greengrass 核心版本：1.11.0

**分段上传的最小大小**  <a name="stream-manager-minimum-part-size"></a>
参数名称: `STREAM_MANAGER_EXPORTER_S3_DESTINATION_MULTIPART_UPLOAD_MIN_PART_SIZE_BYTES`  
向 Amazon S3 进行分段上传的分段最小大小（以字节为单位）。流管理器使用此设置和输入文件的大小来确定如何对多部分 PUT 请求中的数据进行批处理。默认最小值为 `5242880` 字节 (5 MB)。  
流管理器使用流的 `sizeThresholdForMultipartUploadBytes` 属性来确定是以单段上传还是分段上传的形式导出到 Amazon S3。用户定义的 Lambda 函数在创建导出到 Amazon S3 的流时会设置此阈值。默认阈值为 5 MB。
最低 AWS IoT Greengrass 核心版本：1.11.0

## 配置流管理器设置（控制台）
<a name="configure-stream-manager-console"></a>

您可以使用 AWS IoT 控制台执行以下管理任务：
+ [检查是否已启用流管理器](#check-stream-manager-console)
+ [在组创建过程中启用或禁用流管理器](#enable-stream-manager-console-new-group)
+ [为现有组启用或禁用流管理器](#enable-stream-manager-console-existing-group)
+ [更改流管理器设置](#change-stream-manager-console)

更改将在部署 Greengrass 组后生效。请参阅[将数据流导出到 AWS 云 （控制台）](stream-manager-console.md)，其中的教程演示了如何部署一个包含可与流管理器交互的 Lambda 函数的 Greengrass 组。

**注意**  <a name="ggstreammanager-function-config-console"></a>
使用控制台启用流管理器并部署组时，流管理器的内存大小默认设置为 4194304 KB (4 GB)。建议您将内存大小设置为至少 128000 KB。

 

### 检查流管理器是否已启用（控制台）
<a name="check-stream-manager-console"></a>

1. <a name="console-gg-groups"></a>**在 AWS IoT 控制台导航窗格的**管理**下，展开 **Greengrass** 设备，然后选择群组 (V1)。**

1. <a name="group-choose-target-group"></a>选择目标组。

1. 选择 **Lambda 函数选项卡**。

1. 在**系统 Lambda 函数**下，选择**流管理器**，然后选择**编辑**。

1. 检查启用或禁用状态。还会显示所有配置的自定义流管理器设置。

 

### 在组创建过程中启用或禁用流管理器（控制台）
<a name="enable-stream-manager-console-new-group"></a>

1. <a name="console-gg-groups"></a>**在 AWS IoT 控制台导航窗格的**管理**下，展开 **Greengrass** 设备，然后选择群组 (V1)。**

1. 选择**创建组**。您在下一页上的选择决定了如何为组配置流管理器。

1. 继续完成**命名您的组**，然后选择 **Greengrass 核心**页面。

1. 选择**创建群组**。

1. 在组配置页面上，选择 **Lambda 函数**选项卡，选择**流管理器**，然后选择**编辑**。
   + 要使用默认设置启用流管理器，请选择 **启用默认设置**。

      
   + 要使用自定义设置启用流管理器，请选择 **Customize settings (自定义设置)**。

     1. 在**配置流管理器**页面上，选择**启用自定义设置**。

     1. 在 **Custom settings (自定义设置)** 下，输入流管理器参数的值。有关更多信息，请参阅 [流管理器参数](#stream-manager-parameters)。将字段留空 AWS IoT Greengrass 以允许使用其默认值。

         
   + 要禁用直播管理器，请选择**禁用**。

     1. 在 **Configure stream manager (配置流管理器)** 页面上，选择 **Disable (禁用)**。

         

1. 选择**保存**。

1. <a name="continue-create-group"></a>继续浏览剩余页面以创建您的组。

1. 在**客户端设备**页面上，下载安全资源，查看信息，然后选择**完成**。
**注意**  
启用流管理器后，必须在核心设备上[安装 Java 8 运行时](stream-manager.md#stream-manager-requirements)，然后再部署组。

 

### 为现有组启用或禁用流管理器（控制台）
<a name="enable-stream-manager-console-existing-group"></a>

1. <a name="console-gg-groups"></a>**在 AWS IoT 控制台导航窗格的**管理**下，展开 **Greengrass** 设备，然后选择群组 (V1)。**

1. <a name="group-choose-target-group"></a>选择目标组。

1. 选择 **Lambda 函数选项卡**。

1. 在**系统 Lambda 函数**下，选择**流管理器**，然后选择**编辑**。

1. 检查启用或禁用状态。还会显示所有配置的自定义流管理器设置。

 

### 更改流管理器设置（控制台）
<a name="change-stream-manager-console"></a>

1. <a name="console-gg-groups"></a>**在 AWS IoT 控制台导航窗格的**管理**下，展开 **Greengrass** 设备，然后选择群组 (V1)。**

1. <a name="group-choose-target-group"></a>选择目标组。

1. 选择 **Lambda 函数选项卡**。

1. 在**系统 Lambda 函数**下，选择**流管理器**，然后选择**编辑**。

1. 检查启用或禁用状态。还会显示所有配置的自定义流管理器设置。

1. 选择**保存**。

## 配置流管理器设置 (CLI)
<a name="configure-stream-manager-cli"></a>

在中 AWS CLI，使用系统 `GGStreamManager` Lambda 函数配置流管理器。系统 Lambda 函数是 AWS IoT Greengrass 核心软件的组件。对于流管理器和其他一些系统 Lambda 函数，您可以通过管理 Greengrass 组中的相应 `Function` 和 `FunctionDefinitionVersion` 对象来配置 Greengrass 功能。有关更多信息，请参阅 [AWS IoT Greengrass 群组对象模型概述](deployments.md#api-overview)。

您可以使用 API 执行以下管理任务。本节中的示例说明了如何使用 AWS CLI，但您也可以直接调用 AWS IoT Greengrass API 或使用 S AWS DK。
+ [检查是否已启用流管理器](#check-stream-manager-cli)
+ [启用、禁用或配置流管理器](#enable-stream-manager-cli)

更改在部署组后生效。请参阅 [将数据流导出到 AWS 云 (CLI)](stream-manager-cli.md)，其中的教程演示了如何部署一个包含可与流管理器交互的 Lambda 函数的 Greengrass 组。

**提示**  
要查看核心设备流管理器是否启用并正在运行，您可以在设备上的终端中运行以下命令。  

```
ps aux | grep -i 'streammanager'
```

 

### 检查流管理器是否启用 (CLI)
<a name="check-stream-manager-cli"></a>

如果部署的函数定义版本包含系统 `GGStreamManager` Lambda 函数，则启用了流管理器。要进行检查，请执行以下操作;

1. <a name="get-group-id-latestversion"></a>获取目标 Greengrass 群 IDs 组和群组版本。此过程假定这是最新的组和组版本。以下查询将返回最近创建的组。

   ```
   aws greengrass list-groups --query "reverse(sort_by(Groups, &CreationTimestamp))[0]"
   ```

   或者，您也可以按名称查询。系统不要求组名称是唯一的，所以可能会返回多个组。

   ```
   aws greengrass list-groups --query "Groups[?Name=='MyGroup']"
   ```
**注意**  
<a name="find-group-ids-console"></a>您也可以在 AWS IoT 控制台中找到这些值。组 ID 显示在组的**设置**页面上。群组版本显示 IDs 在群组的 “**部署**” 选项卡上。

1. <a name="copy-group-id-latestversion"></a>从输出中的目标组复制 `Id` 和 `LatestVersion` 值。

1. <a name="get-latest-group-version"></a>获取最新的组版本。
   + 将 *group-id* 替换为复制的 `Id`。
   + 将 *latest-group-version-id* 替换为复制的 `LatestVersion`。

   ```
   aws greengrass get-group-version \
   --group-id group-id \
   --group-version-id latest-group-version-id
   ```

1. 从输出`FunctionDefinitionVersionArn`中，获取函数定义和函数定义版本的。 IDs 
   + 函数定义 ID 是 Amazon 资源名称（ARN）中 `functions` 段后面的 GUID。
   + 函数定义版本 ID 是 ARN 中 `versions` 段后面的 GUID。

   ```
   arn:aws:greengrass:us-west-2:123456789012:/greengrass/definition/functions/function-definition-id/versions/function-definition-version-id
   ```

1. 获取函数定义版本。
   + *function-definition-id*替换为函数定义 ID。
   + *function-definition-version-id*替换为函数定义版本 ID。

   ```
   aws greengrass get-function-definition-version \
   --function-definition-id function-definition-id \
   --function-definition-version-id function-definition-version-id
   ```

如果输出中的 `functions` 数组包含 `GGStreamManager` 函数，则启用了流管理器。为函数定义的任何环境变量都表示流管理器的自定义设置。

### 启用、禁用或配置流管理器 (CLI)
<a name="enable-stream-manager-cli"></a>

在中 AWS CLI，使用系统 `GGStreamManager` Lambda 函数配置流管理器。更改在部署组后生效。
+ 要启用流管理器，请在函数定义版本的 `functions` 数组中包含 `GGStreamManager`。要配置自定义设置，请为相应的[流管理器参数](#stream-manager-parameters)定义环境变量。
+ 要禁用流管理器，请从函数定义版本的 `functions` 数组中删除 `GGStreamManager`。

**带默认设置的流管理器**  
以下示例配置使用默认设置启用流管理器。它将任意函数 ID 设置为 `streamManager`。  

```
{
    "FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1",
    "FunctionConfiguration": {
        "MemorySize": 4194304,
        "Pinned": true,
        "Timeout": 3
    },
    "Id": "streamManager"
}
```
对于 `FunctionConfiguration` 属性，您可能知道以下内容：  
+ 在默认设置下，`MemorySize` 设置为 4194304 KB (4 GB)。您可随时更改此值。建议您将 `MemorySize` 设置为至少 128000 KB。
+ `Pinned` 必须设置为 `true`。
+ `Timeout` 是函数定义版本所需的，但 `GGStreamManager` 不使用它。

**带自定义设置的流管理器**  <a name="enable-stream-manager-custom-settings"></a>
以下示例配置使用针对存储目录、服务器端口和线程池大小参数的自定义值来启用流管理器。  

```
{
    "FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1",
    "FunctionConfiguration": {
        "Environment": {
            "Variables": {
                "STREAM_MANAGER_STORE_ROOT_DIR": "/data",
                "STREAM_MANAGER_SERVER_PORT": "1234",
                "STREAM_MANAGER_EXPORTER_THREAD_POOL_SIZE": "4"
            }
        },
        "MemorySize": 4194304,
        "Pinned": true,
        "Timeout": 3
    },
    "Id": "streamManager"
}
```
AWS IoT Greengrass 对未指定为环境变量的[流管理器参数](#stream-manager-parameters)使用默认值。

**带有 Amazon S3 自定义导出设置的流管理器**  <a name="enable-stream-manager-custom-settings-s3"></a>
以下示例配置为流管理器启用了上传目录的自定义值和最小分段上传大小参数。  

```
{
    "FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1",
    "FunctionConfiguration": {
        "Environment": {
            "Variables": {
                "STREAM_MANAGER_READ_ONLY_DIRS": "/mnt/directory-1,/mnt/directory-2,/tmp",
                "STREAM_MANAGER_EXPORTER_S3_DESTINATION_MULTIPART_UPLOAD_MIN_PART_SIZE_BYTES": "10485760"
            }
        },
        "MemorySize": 4194304,
        "Pinned": true,
        "Timeout": 3
    },
    "Id": "streamManager"
}
```

 

**启用、禁用或配置流管理器 (CLI)**

1. <a name="get-group-id-latestversion"></a>获取目标 Greengrass 群 IDs 组和群组版本。此过程假定这是最新的组和组版本。以下查询将返回最近创建的组。

   ```
   aws greengrass list-groups --query "reverse(sort_by(Groups, &CreationTimestamp))[0]"
   ```

   或者，您也可以按名称查询。系统不要求组名称是唯一的，所以可能会返回多个组。

   ```
   aws greengrass list-groups --query "Groups[?Name=='MyGroup']"
   ```
**注意**  
<a name="find-group-ids-console"></a>您也可以在 AWS IoT 控制台中找到这些值。组 ID 显示在组的**设置**页面上。群组版本显示 IDs 在群组的 “**部署**” 选项卡上。

1. <a name="copy-group-id-latestversion"></a>从输出中的目标组复制 `Id` 和 `LatestVersion` 值。

1. <a name="get-latest-group-version"></a>获取最新的组版本。
   + 将 *group-id* 替换为复制的 `Id`。
   + 将 *latest-group-version-id* 替换为复制的 `LatestVersion`。

   ```
   aws greengrass get-group-version \
   --group-id group-id \
   --group-version-id latest-group-version-id
   ```

1.  ARNs从输出中复制`CoreDefinitionVersionArn`和所有其他版本，除`FunctionDefinitionVersionArn`了。在创建组版本时，将使用这些值。

1. <a name="parse-function-def-id"></a>在输出的 `FunctionDefinitionVersionArn` 中，复制函数定义的 ID。该 ID 是 ARN 中的 `functions` 段后面的 GUID，如以下示例所示。

   ```
   arn:aws:greengrass:us-west-2:123456789012:/greengrass/definition/functions/bcfc6b49-beb0-4396-b703-6dEXAMPLEcu5/versions/0f7337b4-922b-45c5-856f-1aEXAMPLEsf6
   ```
**注意**  
或者，您可以通过运行 [https://docs.aws.amazon.com/cli/latest/reference/greengrass/create-function-definition.html](https://docs.aws.amazon.com/cli/latest/reference/greengrass/create-function-definition.html) 命令以创建一个函数定义，然后从输出中复制该 ID。

1. <a name="enable-stream-manager-function-definition-version"></a>在函数定义中添加一个函数定义版本。
   + *function-definition-id*替换`Id`为你为函数定义复制的。
   + 在 `functions` 数组中，包括要在 Greengrass 核心上提供的所有其他函数。您可以使用 `get-function-definition-version` 命令获取现有函数的列表。

      
**使用默认设置启用流管理器**  
以下示例通过在 `functions` 数组中包含 `GGStreamManager` 函数来启用流管理器。此示例使用[流管理器参数](#stream-manager-parameters)的默认值。  

   ```
   aws greengrass create-function-definition-version \
   --function-definition-id function-definition-id \
   --functions '[
           {
               "FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1",
               "FunctionConfiguration": {
                   "MemorySize":  4194304,
                   "Pinned": true,
                   "Timeout": 3
               },
               "Id": "streamManager"
           },
           {    
               "FunctionArn": "arn:aws:lambda:us-west-2:123456789012:function:MyLambdaFunction:MyAlias",
               "FunctionConfiguration": {
                   "Executable": "myLambdaFunction.function_handler",
                   "MemorySize": 16000,
                   "Pinned": true,
                   "Timeout": 5
               },
               "Id": "myLambdaFunction"
           },
           ... more user-defined functions
       ]
   }'
   ```
示例中的 `myLambdaFunction` 函数表示用户定义的 Lambda 函数之一。  
**使用自定义设置启用流管理器**  
以下示例通过在 `functions` 数组中包含 `GGStreamManager` 函数来启用流管理器。除非要更改默认值，否则所有流管理器设置都是可选的。此示例演示如何使用环境变量来设置自定义值。  

   ```
   aws greengrass create-function-definition-version \
   --function-definition-id function-definition-id \
   --functions '[
           {
               "FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1",
               "FunctionConfiguration": {
                   "Environment": {
                       "Variables": {
                           "STREAM_MANAGER_STORE_ROOT_DIR": "/data",
                           "STREAM_MANAGER_SERVER_PORT": "1234",
                           "STREAM_MANAGER_EXPORTER_THREAD_POOL_SIZE": "4"
                       }
                   },
                   "MemorySize":  4194304,
                   "Pinned": true,
                   "Timeout": 3
               },
               "Id": "streamManager"
           },
           {    
               "FunctionArn": "arn:aws:lambda:us-west-2:123456789012:function:MyLambdaFunction:MyAlias",
               "FunctionConfiguration": {
                   "Executable": "myLambdaFunction.function_handler",
                   "MemorySize": 16000,
                   "Pinned": true,
                   "Timeout": 5
               },
               "Id": "myLambdaFunction"
           },
           ... more user-defined functions
       ]
   }'
   ```
对于 `FunctionConfiguration` 属性，您可能知道以下内容：  
   + 在默认设置下，`MemorySize` 设置为 4194304 KB (4 GB)。您可随时更改此值。建议您将 `MemorySize` 设置为至少 128000 KB。
   + `Pinned` 必须设置为 `true`。
   + `Timeout` 是函数定义版本所需的，但 `GGStreamManager` 不使用它。  
**禁用流管理器**  
以下示例省略了用于禁用流管理器的 `GGStreamManager` 函数。  

   ```
   aws greengrass create-function-definition-version \
   --function-definition-id function-definition-id \
   --functions '[
           {       
               "FunctionArn": "arn:aws:lambda:us-west-2:123456789012:function:MyLambdaFunction:MyAlias",
               "FunctionConfiguration": {
                   "Executable": "myLambdaFunction.function_handler",
                   "MemorySize": 16000,
                   "Pinned": true,
                   "Timeout": 5
               },
               "Id": "myLambdaFunction"
           },
           ... more user-defined functions
       ]
   }'
   ```
如果不需要部署任何 Lambda 函数，则可以完全省略函数定义版本。

1. <a name="copy-function-def-version-arn"></a>从输出中复制函数定义版本的 `Arn`。

1. <a name="create-group-version-with-sys-lambda"></a>创建一个包含系统 Lambda 函数的组版本。
   + *group-id*替换`Id`为该群组的。
   + *core-definition-version-arn*替换为您从最新群组版本中复制的版本。`CoreDefinitionVersionArn`
   + *function-definition-version-arn*替换`Arn`为你为新函数定义版本复制的。
   + 替换您从最新组版本中复制的其他组组件（例如`SubscriptionDefinitionVersionArn`或`DeviceDefinitionVersionArn`）。 ARNs 
   + 删除任何未使用的参数。例如，如果组版本不包含任何资源，请删除 `--resource-definition-version-arn`。

   ```
   aws greengrass create-group-version \
   --group-id group-id \
   --core-definition-version-arn core-definition-version-arn \
   --function-definition-version-arn function-definition-version-arn \
   --device-definition-version-arn device-definition-version-arn \
   --logger-definition-version-arn logger-definition-version-arn \
   --resource-definition-version-arn resource-definition-version-arn \
   --subscription-definition-version-arn subscription-definition-version-arn
   ```

1. <a name="copy-group-version-id"></a>从输出中复制 `Version`。这是新组版本的 ID。

1. <a name="create-group-deployment"></a>用新组版本替换组。
   + *group-id*替换为你为`Id`该群组复制的。
   + *group-version-id*替换为你`Version`为新群组版本复制的版本。

   ```
   aws greengrass create-deployment \
   --group-id group-id \
   --group-version-id group-version-id \
   --deployment-type NewDeployment
   ```

 

如果您想稍后再次编辑流管理器设置，请按照以下步骤进行操作。请确保创建一个函数定义版本，其中包括具有更新配置的 `GGStreamManager` 函数。组版本必须引用 ARNs 要部署到核心的所有组件版本。更改在部署组后生效。

## 另请参阅
<a name="configure-stream-manager-see-also"></a>
+ [管理 AWS IoT Greengrass 核心上的数据流](stream-manager.md)
+ [StreamManagerClient 用于处理直播](work-with-streams.md)
+ [导出支持的 AWS 云 目标的配置](stream-export-configurations.md)
+ [将数据流导出到 AWS 云 （控制台）](stream-manager-console.md)
+ [将数据流导出到 AWS 云 (CLI)](stream-manager-cli.md)

# StreamManagerClient 用于处理直播
<a name="work-with-streams"></a>

在 AWS IoT Greengrass 核心上运行的用户定义的 Lambda 函数可以使用 [AWS IoT Greengrass Core SDK](lambda-functions.md#lambda-sdks) 中的`StreamManagerClient`对象在流[管理器中创建流，然后与流](stream-manager.md)交互。当 Lambda 函数创建流时，它会定义流的 AWS 云 目标、优先级以及其他导出和数据保留策略。要将数据发送到流管理器，Lambda 函数会将数据附加到流中。如果为流定义了导出目标，流管理器会自动导出流。

**注意**  
<a name="stream-manager-clients"></a>通常，流管理器的客户端是用户定义的 Lambda 函数。如果您的业务案例需要它，您也可以允许在 Greengrass 核心上运行的非 Lambda 进程（例如 Docker 容器）与流管理器交互。有关更多信息，请参阅 [客户端身份验证](stream-manager.md#stream-manager-security-client-authentication)。

本主题中的代码段向您展示客户端如何调用 `StreamManagerClient` 方式处理流。有关方法及其参数的实现详细信息，请使用指向每个代码片段后面列出的开发工具包参考的链接。有关使用完整 Python Lambda 函数的教程，请参阅 [将数据流导出到 AWS 云 （控制台）](stream-manager-console.md) 或 [将数据流导出到 AWS 云 (CLI)](stream-manager-cli.md)。

您的 Lambda 函数应该在函数处理程序之外实例化 `StreamManagerClient`。如果在处理程序中进行实例化，该函数每次被调用时都会创建一个 `client` 并连接到流管理器。

**注意**  
如果在处理程序中实例化 `StreamManagerClient`，则必须在 `client` 完成其工作时显式调用 `close()` 方法。否则，`client` 会保持连接打开，并且另一个线程一直运行，直到脚本退出。

`StreamManagerClient` 支持以下操作：
+ [创建消息流](#streammanagerclient-create-message-stream)
+ [附加消息](#streammanagerclient-append-message)
+ [读取消息](#streammanagerclient-read-messages)
+ [列出流](#streammanagerclient-list-streams)
+ [描述消息流](#streammanagerclient-describe-message-stream)
+ [更新消息流](#streammanagerclient-update-message-stream)
+ [删除消息流](#streammanagerclient-delete-message-stream)

## 创建消息流
<a name="streammanagerclient-create-message-stream"></a>

要创建流，用户定义的 Lambda 函数会调用 create 方法并传入一个 `MessageStreamDefinition` 对象。此对象指定流的唯一名称，并定义当达到最大流大小时，流管理器应如何处理新数据。您可以使用 `MessageStreamDefinition` 及其数据类型（如 `ExportDefinition`、`StrategyOnFull` 和 `Persistence`）来定义其他流属性。这些方法包括：
+ 自动导出的目标 AWS IoT Analytics Kinesis Data Stream AWS IoT SiteWise s 和 Amazon S3 目的地。有关更多信息，请参阅 [导出支持的 AWS 云 目标的配置](stream-export-configurations.md)。
+ 导出优先级。流管理器先导出优先级较高的流，然后导出优先级较低的流。
+ Kinesis Data Streams 和 AWS IoT Analytics目标的最大批处理大小 AWS IoT SiteWise 和批处理间隔。当满足任一条件时，流管理器导出消息。
+ Time-to-live (TTL)。保证流数据可用于处理的时间量。您应确保数据可以在此时间段内使用。这不是删除策略。TTL 期限后可能不会立即删除数据。
+ 流持久性。选择将流保存到文件系统，以便在核心重新启动期间保留数据或将流保存在内存中。
+ 起始序列号。指定要在导出中用作起始消息的消息的序列号。

有关 `MessageStreamDefinition` 的更多信息，请参阅目标语言的开发工具包参考：
+ [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html)在 Java 开发工具包中
+ [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html)在 Node.js 软件开发工具包中
+ [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.MessageStreamDefinition)在 Python 软件开发工具包中

**注意**  
<a name="streammanagerclient-http-config"></a>`StreamManagerClient` 还提供了一个可用于将流导出到 HTTP 服务器的目标。此目标仅用于测试目的。其不稳定，或不支持在生产环境中使用。

创建流后，您的 Lambda 函数可以[将消息附加](#streammanagerclient-append-message)到流中以发送数据以供导出，并从流中[读取消息](#streammanagerclient-append-message)以进行本地处理。您创建的流数量取决于您的硬件功能和业务案例。一种策略是为 AWS IoT Analytics 或 Kinesis 数据流中的每个目标频道创建一个流，但您可以为一个流定义多个目标。流具有持久的使用寿命。

### 要求
<a name="streammanagerclient-create-message-stream-reqs"></a>

此操作具有以下要求：
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心版本：1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a> AWS IoT Greengrass 核心软件开发工具包最低版本：Python：1.5.0 \$1 Java：1.4.0 \$1 Node.js：1.6.0

**注意**  
使用 AWS IoT SiteWise 或 Amazon S3 导出目标创建直播需要满足以下要求：  
<a name="streammanagerclient-min-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心版本：1.11.0
<a name="streammanagerclient-min-sdk-ggc-1.11.0"></a> AWS IoT Greengrass 核心 SDK 最低版本：Python：1.6.0 \$1 Java：1.5.0 \$1 Node.js：1.7.0

### 示例
<a name="streammanagerclient-create-message-stream-examples"></a>

以下代码段创建一个名为 `StreamName` 的流。它定义了 `MessageStreamDefinition` 中的流属性和从属的数据类型。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    client.create_message_stream(MessageStreamDefinition(
        name="StreamName",  # Required.
        max_size=268435456,  # Default is 256 MB.
        stream_segment_size=16777216,  # Default is 16 MB.
        time_to_live_millis=None,  # By default, no TTL is enabled.
        strategy_on_full=StrategyOnFull.OverwriteOldestData,  # Required.
        persistence=Persistence.File,  # Default is File.
        flush_on_write=False,  # Default is false.
        export_definition=ExportDefinition(  # Optional. Choose where/how the stream is exported to the AWS 云.
            kinesis=None,
            iot_analytics=None,
            iot_sitewise=None,
            s3_task_executor=None
        )
    ))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 参考：[创建消息流 \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.MessageStreamDefinition)](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.create_message_stream)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    client.createMessageStream(
            new MessageStreamDefinition()
                    .withName("StreamName") // Required.
                    .withMaxSize(268435456L)  // Default is 256 MB.
                    .withStreamSegmentSize(16777216L)  // Default is 16 MB.
                    .withTimeToLiveMillis(null)  // By default, no TTL is enabled.
                    .withStrategyOnFull(StrategyOnFull.OverwriteOldestData)  // Required.
                    .withPersistence(Persistence.File)  // Default is File.
                    .withFlushOnWrite(false)  // Default is false.
                    .withExportDefinition(  // Optional. Choose where/how the stream is exported to the AWS 云.
                            new ExportDefinition()
                                    .withKinesis(null)
                                    .withIotAnalytics(null)
                                    .withIotSitewise(null)
                                    .withS3TaskExecutor(null)
                    )
 
    );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 开发工具包参考：[createMessageStream](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#createMessageStream-com.amazonaws.greengrass.streammanager.model.MessageStreamDefinition-)\$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        await client.createMessageStream(
            new MessageStreamDefinition()
                .withName("StreamName") // Required.
                .withMaxSize(268435456)  // Default is 256 MB.
                .withStreamSegmentSize(16777216)  // Default is 16 MB.
                .withTimeToLiveMillis(null)  // By default, no TTL is enabled.
                .withStrategyOnFull(StrategyOnFull.OverwriteOldestData)  // Required.
                .withPersistence(Persistence.File)  // Default is File.
                .withFlushOnWrite(false)  // Default is false.
                .withExportDefinition(  // Optional. Choose where/how the stream is exported to the AWS 云.
                    new ExportDefinition()
                        .withKinesis(null)
                        .withIotAnalytics(null)
                        .withIotSitewise(null)
                        .withS3TaskExecutor(null)
                )
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 参考：[createMessageStream](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#createMessageStream)\$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html)

------

有关配置导出目标的更多信息，请参阅[导出支持的 AWS 云 目标的配置](stream-export-configurations.md)。

 

## 附加消息
<a name="streammanagerclient-append-message"></a>

要将数据发送到流管理器进行导出，您的 Lambda 函数会将数据附加到目标流。导出目标决定要传递给此方法的数据类型。

### 要求
<a name="streammanagerclient-append-message-reqs"></a>

此操作具有以下要求：
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心版本：1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a> AWS IoT Greengrass 核心软件开发工具包最低版本：Python：1.5.0 \$1 Java：1.4.0 \$1 Node.js：1.6.0

**注意**  
添加带有 AWS IoT SiteWise 或 Amazon S3 导出目标的邮件需要满足以下要求：  
<a name="streammanagerclient-min-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心版本：1.11.0
<a name="streammanagerclient-min-sdk-ggc-1.11.0"></a> AWS IoT Greengrass 核心 SDK 最低版本：Python：1.6.0 \$1 Java：1.5.0 \$1 Node.js：1.7.0

### 示例
<a name="streammanagerclient-append-message-examples"></a>

#### AWS IoT Analytics 或 Kinesis Data Streams 导出目的地
<a name="streammanagerclient-append-message-blob"></a>

以下代码段将消息附加到名为 `StreamName` 的流。对于我们 AWS IoT Analytics 的 Kinesis Data Streams 目标，你的 Lambda 函数会附加一个数据块。

此代码段具有以下要求：
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心版本：1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a> AWS IoT Greengrass 核心软件开发工具包最低版本：Python：1.5.0 \$1 Java：1.4.0 \$1 Node.js：1.6.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data')
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python 开发工具包参考：[append\$1message](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.append_message)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes());
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 开发工具包参考：[appendMessage](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array"));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js 开发工具包参考：[appendMessage](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage)

------

#### AWS IoT SiteWise 出口目的地
<a name="streammanagerclient-append-message-sitewise"></a>

以下代码段将消息附加到名为 `StreamName` 的流。对于 AWS IoT SiteWise 目标，您的 Lambda 函数会附加一个序列化对象。`PutAssetPropertyValueEntry`有关更多信息，请参阅 [导出到 AWS IoT SiteWise](stream-export-configurations.md#export-streams-to-sitewise)。

**注意**  
<a name="BatchPutAssetPropertyValue-data-reqs"></a>当您向发送数据时 AWS IoT SiteWise，您的数据必须满足`BatchPutAssetPropertyValue`操作的要求。有关更多信息，请参阅《AWS IoT SiteWise API Reference》**中的 [BatchPutAssetPropertyValue](https://docs.aws.amazon.com/iot-sitewise/latest/APIReference/API_BatchPutAssetPropertyValue.html)。

此代码段具有以下要求：
+ <a name="streammanagerclient-min-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心版本：1.11.0
+ <a name="streammanagerclient-min-sdk-ggc-1.11.0"></a> AWS IoT Greengrass 核心 SDK 最低版本：Python：1.6.0 \$1 Java：1.5.0 \$1 Node.js：1.7.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    # SiteWise requires unique timestamps in all messages. Add some randomness to time and offset.

    # Note: To create a new asset property data, you should use the classes defined in the
    # greengrasssdk.stream_manager module.

    time_in_nanos = TimeInNanos(
        time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
    )
    variant = Variant(double_value=random.random())
    asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
    putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset)
    sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 参考：[追加消息 \$1 [PutAssetPropertyValueEntry](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.PutAssetPropertyValueEntry)](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.append_message)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    Random rand = new Random();
    // Note: To create a new asset property data, you should use the classes defined in the
    // com.amazonaws.greengrass.streammanager.model.sitewise package.
    List<AssetPropertyValue> entries = new ArrayList<>() ;

    // IoTSiteWise requires unique timestamps in all messages. Add some randomness to time and offset.
    final int maxTimeRandomness = 60;
    final int maxOffsetRandomness = 10000;
    double randomValue = rand.nextDouble();
    TimeInNanos timestamp = new TimeInNanos()
            .withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness))
            .withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness)));
    AssetPropertyValue entry = new AssetPropertyValue()
            .withValue(new Variant().withDoubleValue(randomValue))
            .withQuality(Quality.GOOD)
            .withTimestamp(timestamp);
    entries.add(entry);

    PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
            .withEntryId(UUID.randomUUID().toString())
            .withPropertyAlias("PropertyAlias")
            .withPropertyValues(entries);
    long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK 参考：[附加消息 \$1 [PutAssetPropertyValueEntry](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/sitewise/PutAssetPropertyValueEntry.html)](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const maxTimeRandomness = 60;
        const maxOffsetRandomness = 10000;
        const randomValue = Math.random();
        // Note: To create a new asset property data, you should use the classes defined in the
        // aws-greengrass-core-sdk StreamManager module.
        const timestamp = new TimeInNanos()
            .withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness))
            .withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness));
        const entry = new AssetPropertyValue()
            .withValue(new Variant().withDoubleValue(randomValue))
            .withQuality(Quality.GOOD)
            .withTimestamp(timestamp);

        const putAssetPropertyValueEntry =  new PutAssetPropertyValueEntry()
            .withEntryId(`${ENTRY_ID_PREFIX}${i}`)
            .withPropertyAlias("PropertyAlias")
            .withPropertyValues([entry]);
        const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 参考：[追加消息 \$1 [PutAssetPropertyValueEntry](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.PutAssetPropertyValueEntry.html)](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage)

------

#### Amazon S3 导出目标
<a name="streammanagerclient-append-message-export-task"></a>

以下代码段将导出任务附加到名为 `StreamName` 的流。对于 Amazon S3 目标，您的 Lambda 函数会附加一个序列化 `S3ExportTaskDefinition` 对象，其中包含有关源输入文件和目标 Amazon S3 对象的信息。如果指定的对象不存在，流管理器会为您创建。有关更多信息，请参阅 [导出到 Amazon S3](stream-export-configurations.md#export-streams-to-s3)。

此代码段具有以下要求：
+ <a name="streammanagerclient-min-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心版本：1.11.0
+ <a name="streammanagerclient-min-sdk-ggc-1.11.0"></a> AWS IoT Greengrass 核心 SDK 最低版本：Python：1.6.0 \$1 Java：1.5.0 \$1 Node.js：1.7.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    # Append an Amazon S3 Task definition and print the sequence number.
    s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName")
    sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

[Python SDK 参考：[追加消息 \$1 S3](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.append_message) ExportTaskDefinition](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.S3ExportTaskDefinition)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    // Append an Amazon S3 export task definition and print the sequence number.
    S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition()
        .withBucket("BucketName")
        .withKey("KeyName")
        .withInputUrl("URLToFile");
    long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition));
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

[Java SDK 参考：[附加消息 \$1](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-) S3 ExportTaskDefinition](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/S3ExportTaskDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
     // Append an Amazon S3 export task definition and print the sequence number.
     const taskDefinition = new S3ExportTaskDefinition()
        .withBucket("BucketName")
        .withKey("KeyName")
        .withInputUrl("URLToFile");
        const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition)));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

[Node.js SDK 参考：[追加消息 \$1](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage) S3 ExportTaskDefinition](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.S3ExportTaskDefinition.html)

------

 

## 读取消息
<a name="streammanagerclient-read-messages"></a>

从流读取消息。

### 要求
<a name="streammanagerclient-read-messages-reqs"></a>

此操作具有以下要求：
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心版本：1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a> AWS IoT Greengrass 核心软件开发工具包最低版本：Python：1.5.0 \$1 Java：1.4.0 \$1 Node.js：1.6.0

### 示例
<a name="streammanagerclient-read-messages-examples"></a>

以下代码段读取名为 `StreamName` 的流中的消息。read 方法接受一个可选 `ReadMessagesOptions` 对象，该对象指定要开始读取的序列号、要读取的最小数量和最大数量以及读取消息的超时。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    message_list = client.read_messages(
        stream_name="StreamName",
        # By default, if no options are specified, it tries to read one message from the beginning of the stream.
        options=ReadMessagesOptions(
            desired_start_sequence_number=100,
            # Try to read from sequence number 100 or greater. By default, this is 0.
            min_message_count=10,
            # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
            max_message_count=100,  # Accept up to 100 messages. By default this is 1.
            read_timeout_millis=5000
            # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
        )
    )
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 参考：[阅读消息 \$1 [ReadMessagesOptions](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.ReadMessagesOptions)](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.read_messages)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    List<Message> messages = client.readMessages("StreamName",
            // By default, if no options are specified, it tries to read one message from the beginning of the stream.
            new ReadMessagesOptions()
                    // Try to read from sequence number 100 or greater. By default this is 0.
                    .withDesiredStartSequenceNumber(100L)
                    // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
                    .withMinMessageCount(10L)
                    // Accept up to 100 messages. By default this is 1.
                    .withMaxMessageCount(100L)
                    // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
                    .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis())
    );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 开发工具包参考：[阅读消息 \$1 [ReadMessagesOptions](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/ReadMessagesOptions.html)](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#readMessages-java.lang.String-com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const messages = await client.readMessages("StreamName",
            // By default, if no options are specified, it tries to read one message from the beginning of the stream.
            new ReadMessagesOptions()
                // Try to read from sequence number 100 or greater. By default this is 0.
                .withDesiredStartSequenceNumber(100)
                // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1.
                .withMinMessageCount(10)
                // Accept up to 100 messages. By default this is 1.
                .withMaxMessageCount(100)
                // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
                .withReadTimeoutMillis(5 * 1000)
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 参考：[阅读消息 \$1 [ReadMessagesOptions](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.ReadMessagesOptions.html)](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#readMessages)

------

 

## 列出流
<a name="streammanagerclient-list-streams"></a>

在流管理器中获取流列表。

### 要求
<a name="streammanagerclient-list-streams-reqs"></a>

此操作具有以下要求：
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心版本：1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a> AWS IoT Greengrass 核心软件开发工具包最低版本：Python：1.5.0 \$1 Java：1.4.0 \$1 Node.js：1.6.0

### 示例
<a name="streammanagerclient-list-streams-examples"></a>

以下代码段获取流管理器中的流列表（按名称）。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    stream_names = client.list_streams()
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python 开发工具包参考：[list\$1streams](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.list_streams)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    List<String> streamNames = client.listStreams();
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 开发工具包参考：[listStreams](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#listStreams--)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const streams = await client.listStreams();
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js 开发工具包参考：[listStreams](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#listStreams)

------

 

## 描述消息流
<a name="streammanagerclient-describe-message-stream"></a>

获取有关流的元数据，包括流定义、大小和导出状态。

### 要求
<a name="streammanagerclient-describe-message-stream-reqs"></a>

此操作具有以下要求：
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心版本：1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a> AWS IoT Greengrass 核心软件开发工具包最低版本：Python：1.5.0 \$1 Java：1.4.0 \$1 Node.js：1.6.0

### 示例
<a name="streammanagerclient-describe-message-stream-examples"></a>

以下代码段获取有关名为 `StreamName` 的流的元数据，包括流的定义、大小和导出程序状态。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    stream_description = client.describe_message_stream(stream_name="StreamName")
    if stream_description.export_statuses[0].error_message:
        # The last export of export destination 0 failed with some error
        # Here is the last sequence number that was successfully exported
        stream_description.export_statuses[0].last_exported_sequence_number
 
    if (stream_description.storage_status.newest_sequence_number >
            stream_description.export_statuses[0].last_exported_sequence_number):
        pass
        # The end of the stream is ahead of the last exported sequence number
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python 开发工具包参考：[describe\$1message\$1stream](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.describe_message_stream)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    MessageStreamInfo description = client.describeMessageStream("StreamName");
    String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage();
    if (lastErrorMessage != null && !lastErrorMessage.equals("")) {
        // The last export of export destination 0 failed with some error.
        // Here is the last sequence number that was successfully exported.
        description.getExportStatuses().get(0).getLastExportedSequenceNumber();
    }
 
    if (description.getStorageStatus().getNewestSequenceNumber() >
            description.getExportStatuses().get(0).getLastExportedSequenceNumber()) {
        // The end of the stream is ahead of the last exported sequence number.
    }
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 开发工具包参考：[describeMessageStream](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#describeMessageStream-java.lang.String-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const description = await client.describeMessageStream("StreamName");
        const lastErrorMessage = description.exportStatuses[0].errorMessage;
        if (lastErrorMessage) {
            // The last export of export destination 0 failed with some error.
            // Here is the last sequence number that was successfully exported.
            description.exportStatuses[0].lastExportedSequenceNumber;
        }
 
        if (description.storageStatus.newestSequenceNumber >
            description.exportStatuses[0].lastExportedSequenceNumber) {
            // The end of the stream is ahead of the last exported sequence number.
        }
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 参考：[describeMessageStream](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#describeMessageStream)

------

 

## 更新消息流
<a name="streammanagerclient-update-message-stream"></a>

更新现有流的属性。如果流创建后您的要求发生变化，则可能需要更新流。例如：
+ 为 AWS 云 目标添加新的[导出配置](stream-export-configurations.md)。
+ 增加流的最大大小以更改数据的导出或保留方式。例如，将流大小与您在完整设置下的策略相结合，可能会导致数据在流管理器处理之前被删除或拒绝。
+ 暂停然后恢复导出；例如，如果导出任务运行时间较长，而您想对上传数据进行配给。

您的 Lambda 函数遵循以下高级流程来更新流：

1. [获取流的描述。](#streammanagerclient-describe-message-stream)

1. 更新相应 `MessageStreamDefinition` 和从属对象的目标属性。

1. 传入更新后的 `MessageStreamDefinition`。请务必包含更新后的流的完整对象定义。未定义属性将恢复为默认值。

   可以指定要在导出中用作起始消息的消息的序列号。

### 要求
<a name="-streammanagerclient-update-message-streamreqs"></a>

此操作具有以下要求：
+ <a name="streammanagerclient-min-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心版本：1.11.0
+ <a name="streammanagerclient-min-sdk-ggc-1.11.0"></a> AWS IoT Greengrass 核心 SDK 最低版本：Python：1.6.0 \$1 Java：1.5.0 \$1 Node.js：1.7.0

### 示例
<a name="streammanagerclient-update-message-stream-examples"></a>

以下代码段更新名为 `StreamName` 的流。它会更新导出到 Kinesis Data Streams 的流的多个属性。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    message_stream_info = client.describe_message_stream(STREAM_NAME)
    message_stream_info.definition.max_size=536870912
    message_stream_info.definition.stream_segment_size=33554432
    message_stream_info.definition.time_to_live_millis=3600000
    message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData
    message_stream_info.definition.persistence=Persistence.Memory
    message_stream_info.definition.flush_on_write=False
    message_stream_info.definition.export_definition.kinesis=
        [KinesisConfig(  
            # Updating Export definition to add a Kinesis Stream configuration.
            identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))]
    client.update_message_stream(message_stream_info.definition)
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python 开发工具包参考：[updateMessageStream](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.update_message_stream)\$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.MessageStreamDefinition)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME);
    // Update the message stream with new values.
    client.updateMessageStream(
        messageStreamInfo.getDefinition()
            .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
            // Max Size update should be greater than initial Max Size defined in Create Message Stream request
            .withMaxSize(536870912L) // Update Max Size to 512 MB.
            .withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB.
            .withFlushOnWrite(true) // Update flush on write to true.
            .withPersistence(Persistence.Memory) // Update the persistence to Memory.
            .withTimeToLiveMillis(3600000L)  // Update TTL to 1 hour.
            .withExportDefinition(
                // Optional. Choose where/how the stream is exported to the AWS 云.
                messageStreamInfo.getDefinition().getExportDefinition().
                    // Updating Export definition to add a Kinesis Stream configuration.
                    .withKinesis(new ArrayList<KinesisConfig>() {{
                        add(new KinesisConfig()
                            .withIdentifier(EXPORT_IDENTIFIER)
                            .withKinesisStreamName("test"));
                        }})
            );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK 参考：[update](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#updateMessageStream-java.lang.String-) \$1message\$1stream [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const messageStreamInfo = await c.describeMessageStream(STREAM_NAME);
        await client.updateMessageStream(
            messageStreamInfo.definition
                // Max Size update should be greater than initial Max Size defined in Create Message Stream request
                .withMaxSize(536870912)  // Default is 256 MB. Updating Max Size to 512 MB.
                .withStreamSegmentSize(33554432)  // Default is 16 MB. Updating Segment Size to 32 MB.
                .withTimeToLiveMillis(3600000)  // By default, no TTL is enabled. Update TTL to 1 hour.
                .withStrategyOnFull(StrategyOnFull.RejectNewData)  // Required. Updating Strategy on full to reject new data.
                .withPersistence(Persistence.Memory)  // Default is File. Update the persistence to Memory
                .withFlushOnWrite(true)  // Default is false. Updating to true.
                .withExportDefinition(  
                    // Optional. Choose where/how the stream is exported to the AWS 云.
                    messageStreamInfo.definition.exportDefinition
                        // Updating Export definition to add a Kinesis Stream configuration.
                        .withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())])
                )
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 参考：[updateMessageStream](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#updateMessageStream)\$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html)

------

### 更新流时的限制
<a name="streammanagerclient-update-constraints"></a>

更新流时，有以下限制。除非在以下列表中注明，否则更新会立即生效。
+ 无法更新流的持久性。要更改此行为，[请删除该流](#streammanagerclient-delete-message-stream)然后[创建一个流](#streammanagerclient-create-message-stream)以定义新的持久性策略。
+ 只有在以下条件下，您才能更新流的最大大小：
  + 最大大小必须大于或等于流的当前大小。<a name="messagestreaminfo-describe-stream"></a>要查找此信息，请[描述流](#streammanagerclient-describe-message-stream)，然后检查返回的 `MessageStreamInfo` 对象的存储状态。
  + 最大大小必须大于或等于流的段大小。
+ 可以将流的段大小更新为小于流的最大大小的值。更新的设置将应用于新的段。
+ 生存时间（TTL）属性的更新将应用于新的追加操作。如果您减小此值，流管理器也可能会删除超过 TTL 的现有段。
+ 对全属性策略的更新将应用于新的追加操作。如果您将策略设置为覆盖最旧的数据，则流管理器还可能根据新设置覆盖现有段。
+ 写入时刷新属性的更新将应用于新消息。
+ 导出配置的更新将应用于新的导出。更新请求必须包含您想要支持的所有导出配置。否则，流管理器会将其删除。
  + 更新导出配置时，请指定目标导出配置的标识符。
  + 要添加导出配置，请为新的导出配置指定唯一标识符。
  + 要删除导出配置，请省略导出配置。
+ 要[更新](#streammanagerclient-update-message-stream)流中导出配置的起始序列号，必须指定一个小于最新序列号的值。<a name="messagestreaminfo-describe-stream"></a>要查找此信息，请[描述流](#streammanagerclient-describe-message-stream)，然后检查返回的 `MessageStreamInfo` 对象的存储状态。

 

## 删除消息流
<a name="streammanagerclient-delete-message-stream"></a>

删除流。删除流时，流的所有存储数据将从磁盘中删除。

### 要求
<a name="streammanagerclient-delete-message-stream-reqs"></a>

此操作具有以下要求：
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心版本：1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a> AWS IoT Greengrass 核心软件开发工具包最低版本：Python：1.5.0 \$1 Java：1.4.0 \$1 Node.js：1.6.0

### 示例
<a name="streammanagerclient-delete-message-stream-examples"></a>

以下代码段删除名为 `StreamName` 的流。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    client.delete_message_stream(stream_name="StreamName")
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python 开发工具包参考：[deleteMessageStream](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.delete_message_stream)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    client.deleteMessageStream("StreamName");
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 开发工具包参考：[delete\$1message\$1stream](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#deleteMessageStream-java.lang.String-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        await client.deleteMessageStream("StreamName");
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 参考：[deleteMessageStream](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#deleteMessageStream)

------

## 另请参阅
<a name="work-with-streams-see-also"></a>
+ [管理 AWS IoT Greengrass 核心上的数据流](stream-manager.md)
+ [配置 AWS IoT Greengrass 直播管理器](configure-stream-manager.md)
+ [导出支持的 AWS 云 目标的配置](stream-export-configurations.md)
+ [将数据流导出到 AWS 云 （控制台）](stream-manager-console.md)
+ [将数据流导出到 AWS 云 (CLI)](stream-manager-cli.md)
+ `StreamManagerClient`在核 AWS IoT Greengrass 心 SDK 参考中：
  + [Python](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html)
  + [Java](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html)
  + [Node.js](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html)

# 导出支持的 AWS 云 目标的配置
<a name="stream-export-configurations"></a>

C AWS IoT Greengrass ore SDK `StreamManagerClient` 中使用用户定义的 Lambda 函数与流管理器进行交互。当 Lambda 函数[创建流](work-with-streams.md#streammanagerclient-create-message-stream)或[更新流](work-with-streams.md#streammanagerclient-create-message-stream)时，它会传递一个表示流属性的 `MessageStreamDefinition` 对象，包括导出定义。`ExportDefinition` 对象包含为流定义的导出配置。流管理器使用这些导出配置来确定将流导出到何处以及如何导出。

![\[ExportDefinition 属性类型的对象模型图。\]](http://docs.aws.amazon.com/zh_cn/greengrass/v1/developerguide/images/stream-manager-exportconfigs.png)


您可以为一个流定义零个或多个导出配置，包括针对单个目标类型的多个导出配置。例如，您可以将一个流导出到两个 AWS IoT Analytics 通道和一个 Kinesis 数据流。

对于失败的导出尝试，流管理器会持续重试将数据导出到 AWS 云 ，间隔不超过五分钟。重试次数没有最大限制。

**注意**  
<a name="streammanagerclient-http-config"></a>`StreamManagerClient` 还提供了一个可用于将流导出到 HTTP 服务器的目标。此目标仅用于测试目的。其不稳定，或不支持在生产环境中使用。

**Topics**
+ [AWS IoT Analytics 频道](#export-to-iot-analytics)
+ [Amazon Kinesis data streams](#export-to-kinesis)
+ [AWS IoT SiteWise 资产属性](#export-to-iot-sitewise)
+ [Amazon S3 对象](#export-to-s3)

您负责维护这些 AWS 云 资源。

## AWS IoT Analytics 频道
<a name="export-to-iot-analytics"></a>

直播管理器支持自动导出到 AWS IoT Analytics。 <a name="ita-export-destination"></a>AWS IoT Analytics 允许您对数据进行高级分析，以帮助做出业务决策和改进机器学习模型。有关更多信息，请参阅[什么是 AWS IoT Analytics？](https://docs.aws.amazon.com/iotanalytics/latest/userguide/welcome.html) 在《*AWS IoT Analytics 用户指南》*中。

在 AWS IoT Greengrass 核心软件开发工具包中，您的 Lambda 函数使用`IoTAnalyticsConfig`来定义此目标类型的导出配置。有关更多信息，请参阅目标语言的开发工具包参考：
+ Python SDK 中的 [Io TAnalytics Config](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.IoTAnalyticsConfig)
+ Java SDK 中的 [Io TAnalytics Config](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/export/IoTAnalyticsConfig.html)
+ Node.js SDK 中的 [Io TAnalytics Config](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.IoTAnalyticsConfig.html)

### 要求
<a name="export-to-iot-analytics-reqs"></a>

此导出目的地具有以下要求：
+ 中的目标频道 AWS IoT Analytics 必须与 Greengrass 群组相同 AWS 账户 。 AWS 区域 
+ [Greengrass 组角色](group-role.md) 必须允许对目标通道的 `iotanalytics:BatchPutMessage` 权限。例如：

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "iotanalytics:BatchPutMessage"
              ],
              "Resource": [
              "arn:aws:iotanalytics:us-east-1:123456789012:channel/channel_1_name",
      "arn:aws:iotanalytics:us-east-1:123456789012:channel/channel_2_name"
              ]
          }
      ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以授予对资源的具体或条件访问权限（例如，通过使用通配符 `*` 命名方案）。有关更多信息，请参阅 *IAM 用户指南*中的[添加和删除 IAM policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。

### 导出到 AWS IoT Analytics
<a name="export-streams-to-iot-analytics"></a>

要创建导出到的流 AWS IoT Analytics，您的 Lambda 函数会[创建一个包含一个或多个`IoTAnalyticsConfig`对象的导出定义的流](work-with-streams.md#streammanagerclient-create-message-stream)。此对象定义导出设置，例如目标频道、批次大小、批次间隔和优先级。

当您的 Lambda 函数从设备接收数据时，它们会[将包含大量数据的消息附加](work-with-streams.md#streammanagerclient-append-message)到目标流。

然后，流管理器根据流的导出配置中定义的批处理设置和优先级导出数据。

 

## Amazon Kinesis data streams
<a name="export-to-kinesis"></a>

流管理器支持自动导出到 Amazon Kinesis Data Streams。<a name="aks-export-destination"></a>Kinesis Data Streams 通常用于聚合大量数据并将其加载到数据仓库或 map-reduce 集群中。有关更多信息，请参阅 *Amazon Kinesis 开发人员指南*中的[什么是 Amazon Kinesis Data Streams？](https://docs.aws.amazon.com/streams/latest/dev/what-is-this-service.html)。

在 AWS IoT Greengrass 核心软件开发工具包中，您的 Lambda 函数使用`KinesisConfig`来定义此目标类型的导出配置。有关更多信息，请参阅目标语言的开发工具包参考：
+ [KinesisConfig](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.KinesisConfig)在 Python 软件开发工具包中
+ [KinesisConfig](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/export/KinesisConfig.html)在 Java 开发工具包中
+ [KinesisConfig](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.KinesisConfig.html)在 Node.js SDK 中

### 要求
<a name="export-to-kinesis-reqs"></a>

此导出目的地具有以下要求：
+ Kinesis Data Streams 中的目标流必须与 Greengrass AWS 区域 组 AWS 账户 相同且处于相同状态。
+ [Greengrass 组角色](group-role.md) 必须允许对数据流的 `kinesis:PutRecords` 权限。例如：

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "kinesis:PutRecords"
              ],
              "Resource": [
              "arn:aws:kinesis:us-east-1:123456789012:stream/stream_1_name",
      "arn:aws:kinesis:us-east-1:123456789012:stream/stream_2_name"
              ]
          }
      ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以授予对资源的具体或条件访问权限（例如，通过使用通配符 `*` 命名方案）。有关更多信息，请参阅 *IAM 用户指南*中的[添加和删除 IAM policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。

### 导出到 Kinesis Data Streams
<a name="export-streams-to-kinesis"></a>

要创建导出到 Kinesis Data Streams 的流，您的 Lambda 函数[会创建一个包含一个或多个 `KinesisConfig` 对象的导出定义的流](work-with-streams.md#streammanagerclient-create-message-stream)。此对象定义导出设置，例如目标数据流、批次大小、批次间隔和优先级。

当您的 Lambda 函数从设备接收数据时，它们会[将包含大量数据的消息附加](work-with-streams.md#streammanagerclient-append-message)到目标流。然后，流管理器根据流的导出配置中定义的批处理设置和优先级导出数据。

流管理器会为上传到 Amazon Kinesis 的每条记录生成一个唯一的随机 UUID 作为分区键。

 

## AWS IoT SiteWise 资产属性
<a name="export-to-iot-sitewise"></a>

直播管理器支持自动导出到 AWS IoT SiteWise。 <a name="itsw-export-destination"></a>AWS IoT SiteWise 允许您大规模收集、组织和分析来自工业设备的数据。有关更多信息，请参阅[什么是 AWS IoT SiteWise？](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/what-is-sitewise.html) 在《*AWS IoT SiteWise 用户指南》*中。

在 AWS IoT Greengrass 核心软件开发工具包中，您的 Lambda 函数使用`IoTSiteWiseConfig`来定义此目标类型的导出配置。有关更多信息，请参阅目标语言的开发工具包参考：
+ Python SDK TSite WiseConfig 中的 [Io](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.IoTSiteWiseConfig)
+ Java SDK TSite WiseConfig 中的 [Io](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/export/IoTSiteWiseConfig.html)
+ Node.js SDK TSite WiseConfig 中的 [Io](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.IoTSiteWiseConfig.html)

**注意**  
AWS 还提供了[物联网 SiteWise 连接器](iot-sitewise-connector.md)，这是一个预先构建的解决方案，可以与 OPC-UA 源一起使用。

### 要求
<a name="export-to-iot-sitewise-reqs"></a>

此导出目的地具有以下要求：
+ 中的目标资产属性 AWS IoT SiteWise 必须与 Greengrass 组相同 AWS 账户 。 AWS 区域 
**注意**  
有关 AWS IoT SiteWise 支持的区域列表，请参阅*AWS 一般参考*中的[AWS IoT SiteWise 终端节点和配额](https://docs.aws.amazon.com/general/latest/gr/iot-sitewise.html#iot-sitewise_region)。
+ [Greengrass 组角色](group-role.md) 必须允许对目标资产属性的 `iotsitewise:BatchPutAssetPropertyValue` 权限。以下示例策略使用 `iotsitewise:assetHierarchyPath` 条件键来授予对目标根资产及其子项的访问权限。您可以`Condition`从策略中删除，以允许访问您的所有 AWS IoT SiteWise 资产或指定单个 ARNs 资产。

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
               "Effect": "Allow",
               "Action": "iotsitewise:BatchPutAssetPropertyValue",
               "Resource": "*",
               "Condition": {
                   "StringLike": {
                       "iotsitewise:assetHierarchyPath": [
                           "/root node asset ID",
                           "/root node asset ID/*"
                       ]
                   }
               }
          }
      ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以授予对资源的具体或条件访问权限（例如，通过使用通配符 `*` 命名方案）。有关更多信息，请参阅 *IAM 用户指南*中的[添加和删除 IAM policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。

  有关重要的安全信息，请参阅*AWS IoT SiteWise 用户指南*中的[ BatchPutAssetPropertyValue 授权](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/security_iam_service-with-iam.html#security_iam_service-with-iam-id-based-policies-batchputassetpropertyvalue-action)。

### 导出到 AWS IoT SiteWise
<a name="export-streams-to-sitewise"></a>

要创建导出到的流 AWS IoT SiteWise，您的 Lambda 函数会[创建一个包含一个或多个`IoTSiteWiseConfig`对象的导出定义的流](work-with-streams.md#streammanagerclient-create-message-stream)。此对象定义导出设置，例如批次大小、批次间隔和优先级。

当您的 Lambda 函数从设备接收资产属性数据时，它们会将包含数据的消息附加到目标流。消息是 JSON 序列化的 `PutAssetPropertyValueEntry` 对象，其中包含一个或多个资产属性的属性值。有关更多信息，请参阅为 AWS IoT SiteWise 导出目标[追加消息](work-with-streams.md#streammanagerclient-append-message-sitewise)。

**注意**  
<a name="BatchPutAssetPropertyValue-data-reqs"></a>当您向发送数据时 AWS IoT SiteWise，您的数据必须满足`BatchPutAssetPropertyValue`操作的要求。有关更多信息，请参阅《AWS IoT SiteWise API Reference》**中的 [BatchPutAssetPropertyValue](https://docs.aws.amazon.com/iot-sitewise/latest/APIReference/API_BatchPutAssetPropertyValue.html)。

然后，流管理器根据流的导出配置中定义的批处理设置和优先级导出数据。

 

您可以调整流管理器设置和 Lambda 函数逻辑来设计您的导出策略。例如：
+ 对于近乎实时的导出，请设置较低的批量大小和间隔设置，并在收到数据时将数据追加到流中。
+ 为了优化批处理、缓解带宽限制或最大限度地降低成本，您的 Lambda 函数可以在 timestamp-quality-value将数据追加到流之前，为单个资产属性汇集收到的 (TQV) 数据点。一种策略是在一条消息中批量输入多达 10 种不同的财产资产组合或属性别名，而不是为同一个属性发送多个条目。这有助于流管理器保持在[AWS IoT SiteWise 配额](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/quotas.html)范围内。

 

## Amazon S3 对象
<a name="export-to-s3"></a>

流管理器支持自动导出到 Amazon S3。<a name="s3-export-destination"></a>您可以使用 Amazon S3 存储和检索大量的数据。有关更多信息，请参阅 [Amazon Simple Storage Service 开发人员指南](https://docs.aws.amazon.com/AmazonS3/latest/dev/Welcome.html)中的*什么是 Amazon S3？*。

在 AWS IoT Greengrass 核心软件开发工具包中，您的 Lambda 函数使用`S3ExportTaskExecutorConfig`来定义此目标类型的导出配置。有关更多信息，请参阅目标语言的开发工具包参考：
+ Python 软件开发工具包ExportTaskExecutorConfig中的 S@@ [3](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.S3ExportTaskExecutorConfig)
+ Java 开发工具包ExportTaskExecutorConfig中的 [S3](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/export/S3ExportTaskExecutorConfig.html)
+ Node.js 软件开发工具包ExportTaskExecutorConfig中的 [S3](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.S3ExportTaskExecutorConfig.html)

### 要求
<a name="export-to-s3-reqs"></a>

此导出目的地具有以下要求：
+ 目标 Amazon S3 存储桶必须与 Greengrass 组 AWS 账户 相同。
+ 如果 Greengrass 组的[默认容器化](lambda-group-config.md#lambda-containerization-groupsettings)是 **Greengrass 容器**，则必须将 [STREAM\$1MANAGER\$1READ\$1ONLY\$1DIRS](configure-stream-manager.md#stream-manager-read-only-directories) 参数设置为使用位于 `/tmp` 下或不在根文件系统下的输入文件目录。
+ 如果在 Greengrass **容器**模式下运行的 Lambda 函数将输入文件写入输入文件目录，则必须为该目录创建本地卷资源，并将该目录挂载到具有写入权限的容器中。这样可以确保将文件写入根文件系统并在容器外部可见。有关更多信息，请参阅 [使用 Lambda 函数和连接器访问本地资源](access-local-resources.md)。
+ [Greengrass 组角色](group-role.md) 必须允许对目标存储桶具有以下权限。例如：

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "s3:PutObject",
                  "s3:AbortMultipartUpload",
                  "s3:ListMultipartUploadParts"
              ],
              "Resource": [
                  "arn:aws:s3:::bucket-1-name/*",
                  "arn:aws:s3:::bucket-2-name/*"
              ]
          }
      ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以授予对资源的具体或条件访问权限（例如，通过使用通配符 `*` 命名方案）。有关更多信息，请参阅 *IAM 用户指南*中的[添加和删除 IAM policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。

### 导出到 Amazon S3
<a name="export-streams-to-s3"></a>

为了创建导出到 Amazon S3 的流，您的 Lambda 函数使用 `S3ExportTaskExecutorConfig` 对象来配置导出策略。该策略定义了导出设置，例如分段上传阈值和优先级。对于 Amazon S3 导出，流管理器会上传它从核心设备上的本地文件中读取的数据。要启动上传，您的 Lambda 函数会将导出任务附加到目标流。导出任务包含有关输入文件和目标 Amazon S3 对象的信息。流管理器按照任务附加到流中的顺序执行任务。

**注意**  
<a name="bucket-not-key-must-exist"></a>目标存储桶必须已经存在于您的中 AWS 账户。如果指定密钥的对象不存在，则流管理器会为您创建该对象。

 下图显示了该高级别流程。

![\[Amazon S3 导出的流管理器工作流程示意图。\]](http://docs.aws.amazon.com/zh_cn/greengrass/v1/developerguide/images/stream-manager-s3.png)


Stream Manager 使用分段上传阈值属性、[最小分段大小](configure-stream-manager.md#stream-manager-minimum-part-size)设置和输入文件的大小来确定如何上传数据。分段上传阈值必须大于或等于最小分段大小。如果要并行上传数据，则可以创建多个流。

指定您的目标 Amazon S3 对象的密钥可以在`!{timestamp:value}`占位符中包含有效的 [Java DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html) 字符串。您可以使用这些时间戳占位符根据输入文件数据的上传时间对 Amazon S3 中的数据进行分区。例如，以下键名解析为诸如 `my-key/2020/12/31/data.txt` 之类的值。

```
my-key/!{timestamp:YYYY}/!{timestamp:MM}/!{timestamp:dd}/data.txt
```

**注意**  
如果要监控流的导出状态，请先创建一个状态流，然后将导出流配置为使用该状态流。有关更多信息，请参阅 [监控导出任务](#monitor-export-status-s3)。

#### 管理输入数据
<a name="manage-s3-input-data"></a>

您可以编写代码，供物联网应用用来管理输入数据的生命周期。以下示例工作流程显示了如何使用 Lambda 函数来管理这些数据。

1. 本地进程从设备或外围设备接收数据，然后将数据写入核心设备上目录中的文件。这些是流管理器的输入文件。
**注意**  
要确定是否必须配置对输入文件目录的访问权限，请参阅 [STREAM\$1MANAGER\$1READ\$1ONLY\$1DIRS](configure-stream-manager.md#stream-manager-read-only-directories) 参数。  
流管理器运行的进程继承了该组[默认访问身份](lambda-group-config.md#lambda-access-identity-groupsettings)的所有文件系统权限。流管理器必须拥有访问输入文件的权限。如有必要，您可以使用 `chmod(1)` 命令更改文件的权限。

1. Lambda 函数会扫描该目录，并在创建新文件时[将导出任务附加](work-with-streams.md#streammanagerclient-append-message-export-task)到目标流。该任务是一个 JSON 序列化 `S3ExportTaskDefinition` 对象，用于指定输入文件的 URL、目标 Amazon S3 存储桶和密钥以及可选的用户元数据。

1. 流管理器读取输入文件并按照附加任务的顺序将数据导出到 Amazon S3。<a name="bucket-not-key-must-exist"></a>目标存储桶必须已经存在于您的中 AWS 账户。如果指定密钥的对象不存在，则流管理器会为您创建该对象。

1. Lambda 函数从状态流[读取消息](work-with-streams.md#streammanagerclient-read-messages)以监控导出状态。导出任务完成后，Lambda 函数可以删除相应的输入文件。有关更多信息，请参阅 [监控导出任务](#monitor-export-status-s3)。

### 监控导出任务
<a name="monitor-export-status-s3"></a>

您可以编写代码，让物联网应用程序监控您的 Amazon S3 导出状态。您的 Lambda 函数必须创建状态流，然后配置导出流以将状态更新写入状态流。单个状态流可以接收来自导出到 Amazon S3 的多个流的状态更新。

首先，[创建一个流](work-with-streams.md#streammanagerclient-create-message-stream)以用作状态流。您可以为流配置大小和保留策略，以控制状态消息的生命周期。例如：
+ 如果您不想存储状态消息，请将 `Persistence` 设置为 `Memory`。
+ 将 `StrategyOnFull` 设置为 `OverwriteOldestData`，这样新的状态消息就不会丢失。

然后，创建或更新导出流以使用状态流。具体而言，设置流 `S3ExportTaskExecutorConfig` 导出配置的状态配置属性。这会告诉流管理器将有关导出任务的状态消息写入状态流。在 `StatusConfig` 对象中，指定状态流的名称和详细程度。以下支持的值范围从最低 verbose (`ERROR`) 到最长 verbose (`TRACE`) 不等。默认值为 `INFO`。
+ `ERROR`
+ `WARN`
+ `INFO`
+ `DEBUG`
+ `TRACE`

 

以下示例工作流程显示了 Lambda 函数如何使用状态流来监控导出状态。

1. 如前面的工作流程所述，Lambda 函数[将导出任务附加](work-with-streams.md#streammanagerclient-append-message-export-task)到流，后者配置为将有关导出任务的状态消息写入状态流。附加操作返回一个表示任务 ID 的序列号。

1. Lambda 函数按顺序[读取](work-with-streams.md#streammanagerclient-read-messages)状态流中的消息，然后根据流名称和任务 ID 或消息上下文中的导出任务属性筛选消息。例如，Lambda 函数可以按导出任务的输入文件 URL 进行筛选，该文件由消息上下文中的 `S3ExportTaskDefinition` 对象表示。

   以下状态代码指示导出任务已达到完成状态：
   + `Success`。上传已成功完成。
   + `Failure`。流管理器遇到错误，例如，指定的存储桶不存在。解决问题后，您可以再次将导出任务追加到流中。
   + `Canceled`。 由于流或导出定义已删除，或者任务的 time-to-live (TTL) 期限已过期，该任务已中止。
**注意**  
该任务的状态也可能为 `InProgress` 或 `Warning`。当事件返回不影响任务执行的错误时，流管理器会发出警告。例如，如果无法清理已中止的部分上传，则会返回警告。

1. 导出任务完成后，Lambda 函数可以删除相应的输入文件。

以下示例显示 Lambda 函数如何读取和处理状态消息。

------
#### [ Python ]

```
import time
from greengrasssdk.stream_manager import (
    ReadMessagesOptions,
    Status,
    StatusConfig,
    StatusLevel,
    StatusMessage,
    StreamManagerClient,
)
from greengrasssdk.stream_manager.util import Util

client = StreamManagerClient()
 
try:
    # Read the statuses from the export status stream
    is_file_uploaded_to_s3 = False
    while not is_file_uploaded_to_s3:
        try:
            messages_list = client.read_messages(
                "StatusStreamName", ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000)
            )
            for message in messages_list:
                # Deserialize the status message first.
                status_message = Util.deserialize_json_bytes_to_obj(message.payload, StatusMessage)

                # Check the status of the status message. If the status is "Success",
                # the file was successfully uploaded to S3.
                # If the status was either "Failure" or "Cancelled", the server was unable to upload the file to S3.
                # We will print the message for why the upload to S3 failed from the status message.
                # If the status was "InProgress", the status indicates that the server has started uploading
                # the S3 task.
                if status_message.status == Status.Success:
                    logger.info("Successfully uploaded file at path " + file_url + " to S3.")
                    is_file_uploaded_to_s3 = True
                elif status_message.status == Status.Failure or status_message.status == Status.Canceled:
                    logger.info(
                        "Unable to upload file at path " + file_url + " to S3. Message: " + status_message.message
                    )
                    is_file_uploaded_to_s3 = True
            time.sleep(5)
        except StreamManagerException:
            logger.exception("Exception while running")
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 参考：[阅读消息 \$1 [StatusMessage](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.StatusMessage)](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.read_messages)

------
#### [ Java ]

```
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient;
import com.amazonaws.greengrass.streammanager.client.utils.ValidateAndSerialize;
import com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions;
import com.amazonaws.greengrass.streammanager.model.Status;
import com.amazonaws.greengrass.streammanager.model.StatusConfig;
import com.amazonaws.greengrass.streammanager.model.StatusLevel;
import com.amazonaws.greengrass.streammanager.model.StatusMessage;

try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    try {
        boolean isS3UploadComplete = false;
        while (!isS3UploadComplete) {
            try {
                // Read the statuses from the export status stream
                List<Message> messages = client.readMessages("StatusStreamName",
                    new ReadMessagesOptions().withMinMessageCount(1L).withReadTimeoutMillis(1000L));
                for (Message message : messages) {
                    // Deserialize the status message first.
                    StatusMessage statusMessage = ValidateAndSerialize.deserializeJsonBytesToObj(message.getPayload(), StatusMessage.class);
                    // Check the status of the status message. If the status is "Success", the file was successfully uploaded to S3.
                    // If the status was either "Failure" or "Canceled", the server was unable to upload the file to S3.
                    // We will print the message for why the upload to S3 failed from the status message.
                    // If the status was "InProgress", the status indicates that the server has started uploading the S3 task.
                    if (Status.Success.equals(statusMessage.getStatus())) {
                        System.out.println("Successfully uploaded file at path " + FILE_URL + " to S3.");
                        isS3UploadComplete = true;
                     } else if (Status.Failure.equals(statusMessage.getStatus()) || Status.Canceled.equals(statusMessage.getStatus())) {
                        System.out.println(String.format("Unable to upload file at path %s to S3. Message %s",
                            statusMessage.getStatusContext().getS3ExportTaskDefinition().getInputUrl(),
                            statusMessage.getMessage()));
                        sS3UploadComplete = true;
                    }
                }
            } catch (StreamManagerException ignored) {
            } finally {
                // Sleep for sometime for the S3 upload task to complete before trying to read the status message.
                Thread.sleep(5000);
            }
        } catch (e) {
        // Properly handle errors.
    }
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 开发工具包参考：[阅读消息 \$1 [StatusMessage](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/StatusMessage.html)](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#readMessages-java.lang.String-com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions-)

------
#### [ Node.js ]

```
const {
    StreamManagerClient, ReadMessagesOptions,
    Status, StatusConfig, StatusLevel, StatusMessage,
    util,
} = require('aws-greengrass-core-sdk').StreamManager;

const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        let isS3UploadComplete = false;
        while (!isS3UploadComplete) {
            try {
                // Read the statuses from the export status stream
                const messages = await c.readMessages("StatusStreamName",
                    new ReadMessagesOptions()
                        .withMinMessageCount(1)
                        .withReadTimeoutMillis(1000));

                messages.forEach((message) => {
                    // Deserialize the status message first.
                    const statusMessage = util.deserializeJsonBytesToObj(message.payload, StatusMessage);
                    // Check the status of the status message. If the status is 'Success', the file was successfully uploaded to S3.
                    // If the status was either 'Failure' or 'Cancelled', the server was unable to upload the file to S3.
                    // We will print the message for why the upload to S3 failed from the status message.
                    // If the status was "InProgress", the status indicates that the server has started uploading the S3 task.
                    if (statusMessage.status === Status.Success) {
                        console.log(`Successfully uploaded file at path ${FILE_URL} to S3.`);
                        isS3UploadComplete = true;
                    } else if (statusMessage.status === Status.Failure || statusMessage.status === Status.Canceled) {
                        console.log(`Unable to upload file at path ${FILE_URL} to S3. Message: ${statusMessage.message}`);
                        isS3UploadComplete = true;
                    }
                });
                // Sleep for sometime for the S3 upload task to complete before trying to read the status message.
                await new Promise((r) => setTimeout(r, 5000));
            } catch (e) {
                // Ignored
            }
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 参考：[阅读消息 \$1 [StatusMessage](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StatusMessage.html)](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#readMessages)

------

# 将数据流导出到 AWS 云 （控制台）
<a name="stream-manager-console"></a>

本教程向您展示如何使用 AWS IoT 控制台配置和部署启用直播管理器的 AWS IoT Greengrass 群组。该组包含一个用户定义的 Lambda 函数，该函数可以在流管理器中写入流，然后将其自动导出到 AWS 云中。

流管理器使得摄取、处理和导出大容量数据流更高效也更可靠。在本教程中，您将创建一个使用 IoT 数据的 `TransferStream` Lambda 函数。Lambda 函数使用 AWS IoT Greengrass Core SDK 在流管理器中创建流，然后对其进行读取和写入。然后，流管理器将流导出到 Kinesis Data Streams。下图演示了此工作流程。

![\[流管理工作流图。\]](http://docs.aws.amazon.com/zh_cn/greengrass/v1/developerguide/images/stream-manager-scenario.png)


本教程的重点是展示用户定义的 Lambda 函数如何使用 AWS IoT Greengrass Core SDK 中的`StreamManagerClient`对象与流管理器进行交互。为简单起见，您为本教程创建的 Python Lambda 函数将生成模拟设备数据。

## 先决条件
<a name="stream-manager-console-prerequisites"></a>

要完成此教程，需要：<a name="stream-manager-howto-prereqs"></a>
+ Greengrass 组和 Greengrass Core（v1.10 或更高版本）。有关如何创建 Greengrass 组和核心的信息，请参阅 [入门 AWS IoT Greengrass](gg-gs.md)。入门教程还包括安装 AWS IoT Greengrass 核心软件的步骤。
**注意**  <a name="stream-manager-not-supported-openwrt"></a>
<a name="stream-manager-not-supported-openwrt-para"></a> OpenWrt 发行版不支持直播管理器。
+ 核心设备上安装的 Java 8 运行时 (JDK 8)。<a name="install-java8-runtime-general"></a>
  + 对于基于 Debian 的发行版（包括 Raspbian）或基于 Ubuntui 的发行版，运行以下命令：

    ```
    sudo apt install openjdk-8-jdk
    ```
  + 对于基于 Red Hat 的发行版（包括 Amazon Linux），请运行以下命令：

    ```
    sudo yum install java-1.8.0-openjdk
    ```

    有关更多信息，请参阅 OpenJDK 文档中的[如何下载并安装预先构建的 OpenJDK 程序包](https://openjdk.java.net/install/)。
+ AWS IoT Greengrass 适用于 Python 的核心开发工具包 v1.5.0 或更高版本。要在适用于 Python 的 AWS IoT Greengrass Core 软件开发工具包中使用 `StreamManagerClient`，您必须：
  + 在核心设备上安装 Python 3.7 或更高版本。
  + 将开发工具包和其依赖项包含在 Lambda 函数部署程序包中。本教程中提供了说明。
**提示**  
可以将 `StreamManagerClient` 与 Java 或 NodeJS 结合使用。有关示例代码，请参阅适用于 [Java 的AWS IoT Greengrass Core SDK](https://github.com/aws/aws-greengrass-core-sdk-java/blob/master/samples/StreamManagerKinesis/src/main/java/com/amazonaws/greengrass/examples/StreamManagerKinesis.java) 和[适用于 Node.js 的AWS IoT Greengrass 酷睿 SDK](https://github.com/aws/aws-greengrass-core-sdk-js/blob/master/greengrassExamples/StreamManagerKinesis/index.js) GitHub。
+ 在 Amazon Kinesis Data Streams 中**MyKinesisStream**创建的目标流，名称与你的 Greengrass 群组 AWS 区域 相同。有关更多信息，请参阅 *Amazon Kinesis 开发人员指南*中的[创建流](https://docs.aws.amazon.com/streams/latest/dev/fundamental-stream.html#create-stream)。
**注意**  
在本教程中，流管理器将数据导出到 Kinesis Data Streams，这将向您的 AWS 账户账户收取费用。有关定价的信息，请参阅 [Kinesis Data Streams 定价](https://aws.amazon.com/kinesis/data-streams/pricing/)。  
为避免产生费用，您可以在不创建 Kinesis 数据流的情况下运行本教程。在这种情况下，您检查日志以查看流管理器试图将流导出到 Kinesis Data Streams。
+ 一个添加到了 `kinesis:PutRecords` 的 IAM policy，该策略允许对目标数据流执行 [Greengrass 组角色](group-role.md) 操作，如以下示例所示：

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "kinesis:PutRecords"
              ],
              "Resource": [
              "arn:aws:kinesis:us-east-1:123456789012:stream/MyKinesisStream"
              ]
          }
      ]
  }
  ```

------

本教程包含以下概括步骤：

1. [创建 Lambda 函数部署程序包](#stream-manager-console-create-deployment-package)

1. [创建 Lambda 函数](#stream-manager-console-create-function)

1. [将函数添加到组](#stream-manager-console-create-gg-function)

1. [启用流管理器](#stream-manager-console-enable-stream-manager)

1. [配置本地日志记录](#stream-manager-console-configure-logging)

1. [部署组](#stream-manager-console-create-deployment)

1. [测试应用程序](#stream-manager-console-test-application)

完成本教程大约需要 20 分钟。

## 步骤 1：创建 Lambda 函数部署程序包
<a name="stream-manager-console-create-deployment-package"></a>

在此步骤中，您创建包含 Python 函数代码和依赖项的 Lambda 函数部署包。您稍后在 AWS Lambda中创建 Lambda 函数时上传此程序包。Lambda 函数使用 AWS IoT Greengrass 核心软件开发工具包来创建本地流并与之交互。

**注意**  
 用户定义的 Lambda 函数必须使用 [AWS IoT Greengrass 核心开发工具包](lambda-functions.md#lambda-sdks-core)与流管理器交互。有关 Greengrass 流管理器的要求的更多信息，请参阅 [Greengrass 流管理器要求](stream-manager.md#stream-manager-requirements)。

1.  下载[适用于 Python 的AWS IoT Greengrass Core 开发工具包](lambda-functions.md#lambda-sdks-core) v1.5.0 或更高版本。

1. <a name="unzip-ggc-sdk"></a>解压缩下载的程序包以获取软件开发工具包。软件开发工具包是 `greengrasssdk` 文件夹。

1. <a name="install-python-sdk-dependencies-stream-manager"></a>安装程序包依赖项以将其包含在 Lambda 函数部署程序包的开发工具包中。<a name="python-sdk-dependencies-stream-manager"></a>

   1. 导航到包含该 `requirements.txt` 文件的开发工具包目录。此文件列出了依赖项。

   1. 安装开发工具包依赖项。例如，运行以下 `pip` 命令将它们安装在当前目录中：

      ```
      pip install --target . -r requirements.txt
      ```

1. 将以下 Python 代码函数保存在名为 `transfer_stream.py` 的本地文件中。
**提示**  
 有关使用 Java 和 NodeJS 的示例代码，请参阅AWS IoT Greengrass 适用于 J [ava 的 Core SDK AWS IoT Greengrass 和适用于 Node.js](https://github.com/aws/aws-greengrass-core-sdk-java/blob/master/samples/StreamManagerKinesis/src/main/java/com/amazonaws/greengrass/examples/StreamManagerKinesis.java) [的酷睿 SDK](https://github.com/aws/aws-greengrass-core-sdk-js/blob/master/greengrassExamples/StreamManagerKinesis/index.js)。 GitHub

   ```
   import asyncio
   import logging
   import random
   import time
   
   from greengrasssdk.stream_manager import (
       ExportDefinition,
       KinesisConfig,
       MessageStreamDefinition,
       ReadMessagesOptions,
       ResourceNotFoundException,
       StrategyOnFull,
       StreamManagerClient,
   )
   
   
   # This example creates a local stream named "SomeStream".
   # It starts writing data into that stream and then stream manager automatically exports  
   # the data to a customer-created Kinesis data stream named "MyKinesisStream". 
   # This example runs forever until the program is stopped.
   
   # The size of the local stream on disk will not exceed the default (which is 256 MB).
   # Any data appended after the stream reaches the size limit continues to be appended, and
   # stream manager deletes the oldest data until the total stream size is back under 256 MB.
   # The Kinesis data stream in the cloud has no such bound, so all the data from this script is
   # uploaded to Kinesis and you will be charged for that usage.
   
   
   def main(logger):
       try:
           stream_name = "SomeStream"
           kinesis_stream_name = "MyKinesisStream"
   
           # Create a client for the StreamManager
           client = StreamManagerClient()
   
           # Try deleting the stream (if it exists) so that we have a fresh start
           try:
               client.delete_message_stream(stream_name=stream_name)
           except ResourceNotFoundException:
               pass
   
           exports = ExportDefinition(
               kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)]
           )
           client.create_message_stream(
               MessageStreamDefinition(
                   name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports
               )
           )
   
           # Append two messages and print their sequence numbers
           logger.info(
               "Successfully appended message to stream with sequence number %d",
               client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")),
           )
           logger.info(
               "Successfully appended message to stream with sequence number %d",
               client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")),
           )
   
           # Try reading the two messages we just appended and print them out
           logger.info(
               "Successfully read 2 messages: %s",
               client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)),
           )
   
           logger.info("Now going to start writing random integers between 0 and 1000 to the stream")
           # Now start putting in random data between 0 and 1000 to emulate device sensor input
           while True:
               logger.debug("Appending new random integer to stream")
               client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big"))
               time.sleep(1)
   
       except asyncio.TimeoutError:
           logger.exception("Timed out while executing")
       except Exception:
           logger.exception("Exception while running")
   
   
   def function_handler(event, context):
       return
   
   
   logging.basicConfig(level=logging.INFO)
   # Start up this sample code
   main(logger=logging.getLogger())
   ```

1. 将以下项目压缩到名为 `transfer_stream_python.zip` 的文件中。此即 Lambda 函数部署程序包。
   + **transfer\$1stream.py**。应用程序逻辑。
   + **greengrasssdk**。发布 MQTT 消息的 Python Greengrass Lambda 函数所需的库。

     适用于 Python 的 AWS IoT Greengrass 核心 SDK 版本 1.5.0 或更高版本中提供了@@ [流管理器操作](work-with-streams.md)。
   + 你为适用于 Python 的 AWS IoT Greengrass 核心开发工具包安装的依赖项（例如，`cbor2`目录）。

   创建 `zip` 文件时，仅包含这些项目，而不是包含文件夹。

## 第 2 步：创建 Lambda 函数
<a name="stream-manager-console-create-function"></a>

在此步骤中，您将使用 AWS Lambda 控制台创建 Lambda 函数并将其配置为使用您的部署包。接着，发布函数版本并创建别名。

1. 首先，创建 Lambda 函数。

   1. <a name="lambda-console-open"></a>在中 AWS 管理控制台，选择**服务**，然后打开 AWS Lambda 控制台。

   1. <a name="lambda-console-create-function"></a>选择 **创建函数**，然后选择 **从头开始创作**。

   1. 在**基本信息**部分中，使用以下值：
      + 对于**函数名称**，请输入 **TransferStream**。
      + 对于**运行时系统**，选择 **Python 3.7**。
      + 对于**权限**，请保留默认设置。这将创建一个授予基本 Lambda 权限的执行角色。此角色未被使用 AWS IoT Greengrass。

   1. <a name="lambda-console-save-function"></a>在页面底部，选择**创建函数**。

1. 接下来，注册处理程序并上传您的 Lambda 函数部署程序包。

   1. <a name="lambda-console-upload"></a>在**代码**选项卡上的**代码源**下，选择**上传自**。从下拉列表中选择 **.zip 文件**。  
![\[“上传自”下拉列表中突出显示了.zip 文件。\]](http://docs.aws.amazon.com/zh_cn/greengrass/v1/developerguide/images/lra-console/upload-deployment-package.png)

   1. 选择**上传**，然后选择您的 `transfer_stream_python.zip` 部署包。然后，选择**保存**。

   1. <a name="lambda-console-runtime-settings-para"></a>在函数的**代码**选项卡中，在**运行时设置**下选择**编辑**，然后输入以下值。
      + 对于**运行时系统**，选择 **Python 3.7**。
      + 对于**处理程序**，输入 **transfer\$1stream.function\$1handler**。

   1. <a name="lambda-console-save-config"></a>选择**保存**。
**注意**  
 AWS Lambda 主机上的 “**测试**” 按钮不适用于此功能。 AWS IoT Greengrass 核心软件开发工具包不包含在控制台中独立运行 Greengrass Lambda 函数所需的模块。 AWS Lambda 这些模块（例如 `greengrass_common`）是在函数部署到您的 Greengrass 核心之后提供给它们的。

1. 现在，发布 Lambda 函数的第一个版本并创建[此版本的别名](https://docs.aws.amazon.com/lambda/latest/dg/versioning-aliases.html)。
**注意**  
Greengrass 组可以按别名（推荐）或版本引用 Lambda 函数。使用别名，您可以更轻松地管理代码更新，因为您在更新函数代码时，不必更改订阅表或组定义。相反，您只需将别名指向新的函数版本。

   1. <a name="shared-publish-function-version"></a>在**操作**菜单上，选择**发布新版本**。

   1. <a name="shared-publish-function-version-description"></a>对于**版本描述**，输入 **First version**，然后选择**发布**。

   1. 在**TransferStream:1** 配置页面上，从 “**操作**” 菜单中选择 “**创建别名**”。

   1. 在**创建新别名**页面上，使用以下值：
      + 对于**名称**，输入 **GG\$1TransferStream**。
      + 对于**版本**，选择 **1**。
**注意**  
AWS IoT Greengrass **不支持 \$1LATEST 版本的 Lambda 别名。**

   1. 选择**创建**。

现在，您已准备就绪，可以将 Lambda 函数添加到 Greengrass 组。

## 步骤 3：将 Lambda 函数添加到 Greengrass 组
<a name="stream-manager-console-create-gg-function"></a>

在该步骤中，您将 Lambda 函数添加到该组，然后配置其生命周期和环境变量。有关更多信息，请参阅 [使用组特定的配置控制 Greengrass Lambda 函数的执行](lambda-group-config.md)。

1. <a name="console-gg-groups"></a>**在 AWS IoT 控制台导航窗格的**管理**下，展开 **Greengrass** 设备，然后选择群组 (V1)。**

1. <a name="group-choose-target-group"></a>选择目标组。

1. <a name="choose-add-lambda"></a>在组配置页面上，选择**Lambda 函数**选项卡。

1. 在**我的 Lambda 函数**部分下，选择**添加**。

1. 在**添加 Lambda 函数**页面上，为您的 Lambda 函数选择 **Lambda 函数**。

1. **对于 **Lambda 版本**，请选择别名:gg\$1。TransferStream**

   现在，配置用于确定 Greengrass 组中 Lambda 函数的行为的属性。

1. 在 **Lambda 函数配置**部分中，进行以下更改：
   + 将 **Memory limit (内存限制)** 设置为 32 MB。
   + 对于**已固定**，选择 **True**。
**注意**  
<a name="long-lived-lambda"></a>*长寿命*（或*固定*）的 Lambda 函数在启动后自动启动，并在 AWS IoT Greengrass 自己的容器中继续运行。这与*按需* Lambda 函数相反，后者在调用时启动，并在没有要运行的任务时停止。有关更多信息，请参阅 [Greengrass Lambda 函数的生命周期配置](lambda-functions.md#lambda-lifecycle)。

1. 选择**添加 Lambda 函数**。

## 步骤 4：启用流管理器
<a name="stream-manager-console-enable-stream-manager"></a>

在此步骤中，您确保启用流管理器。

1. 在组配置页面上，选择**Lambda 函数**选项卡。

1. 在**系统 Lambda 函数**下，选择**流管理器**，然后检查状态。如果禁用，请选择 **Edit (编辑)**。然后，选择 **Enable (启用)** 和 **Save (保存)**。您可以对本教程使用默认参数设置。有关更多信息，请参阅 [配置 AWS IoT Greengrass 直播管理器](configure-stream-manager.md)。

**注意**  <a name="ggstreammanager-function-config-console"></a>
使用控制台启用流管理器并部署组时，流管理器的内存大小默认设置为 4194304 KB (4 GB)。建议您将内存大小设置为至少 128000 KB。

## 步骤 5：配置本地日志记录
<a name="stream-manager-console-configure-logging"></a>

在此步骤中，您将在组中配置 AWS IoT Greengrass 系统组件、用户定义的 Lambda 函数和连接器，以将日志写入核心设备的文件系统。您可以使用日志对可能遇到的任何问题进行故障排除。有关更多信息，请参阅 [使用 AWS IoT Greengrass 日志进行监控](greengrass-logs-overview.md)。

1. <a name="shared-group-settings-local-logs-configuration"></a>在 **Local logs configuration (本地日志配置)** 下，检查是否配置了本地日志记录。

1. <a name="shared-group-settings-local-logs-edit"></a>如果未为 Greengrass 系统组件或用户定义的 Lambda 函数配置日志，请选择**编辑**。

1. <a name="shared-group-settings-local-logs-event-source"></a>选择**用户 Lambda 函数日志级别**和 **Greengrass 系统日志级别**。

1. <a name="shared-group-settings-local-logs-save"></a>保留日志记录级别和磁盘空间限制的默认值，然后选择 **Save (保存)**。

## 步骤 6：部署 Greengrass 组
<a name="stream-manager-console-create-deployment"></a>

将组部署到核心设备。

1. <a name="shared-deploy-group-checkggc"></a>确保 AWS IoT Greengrass 核心正在运行。根据需要在您的 Raspberry Pi 终端中运行以下命令。

   1. 要检查进程守护程序是否正在运行，请执行以下操作：

      ```
      ps aux | grep -E 'greengrass.*daemon'
      ```

      如果输出包含 `root` 的 `/greengrass/ggc/packages/ggc-version/bin/daemon` 条目，则表示进程守护程序正在运行。
**注意**  
路径中的版本取决于 AWS IoT Greengrass 核心设备上安装的 Core 软件版本。

   1. 启动进程守护程序：

      ```
      cd /greengrass/ggc/core/
      sudo ./greengrassd start
      ```

1. <a name="shared-deploy-group-deploy"></a>在组配置页面上，选择**部署**。

1. <a name="shared-deploy-group-ipconfig"></a>

   1. 在 **Lambda 函数**选项卡的**系统 Lambda 函数**部分下，选择 **IP 检测器**，再选择**编辑**。

   1. 在**编辑 IP 检测器设置**对话框中，选择**自动检测和覆盖 MQTT 代理端点**。

   1. 选择**保存**。

      这使得设备可以自动获取核心的连接信息，例如 IP 地址、DNS 和端口号。建议自动检测，但 AWS IoT Greengrass 也支持手动指定的端点。只有在首次部署组时，系统才会提示您选择发现方法。
**注意**  
如果出现提示，请授予创建 [Greengrass 服务角色并将其与当前](service-role.md)角色关联的权限。 AWS 账户 AWS 区域此角色 AWS IoT Greengrass 允许访问您在 AWS 服务中的资源。

      **部署**页面显示了部署时间戳、版本 ID 和状态。完成后，部署的状态应显示为 **已完成**。

      有关问题排查帮助，请参阅[故障排除 AWS IoT Greengrass](gg-troubleshooting.md)。

## 步骤 7：测试应用程序
<a name="stream-manager-console-test-application"></a>

`TransferStream` Lambda 函数生成模拟的设备数据。它将数据写入流管理器导出到目标 Kinesis 数据流的流。

1. <a name="stream-manager-howto-test-open-kinesis-console"></a>在 Amazon Kinesis 控制台的 Kinesi **s 数据流**下，选择。**MyKinesisStream**
**注意**  
如果您在运行教程时没有目标 Kinesis 数据流， [请检查流管理器的日志文件](stream-manager-cli.md#stream-manager-cli-logs) (`GGStreamManager`)。如果它在错误消息中包含 `export stream MyKinesisStream doesn't exist`，则测试成功。此错误意味着服务试图导出到流，但流不存在。

1. <a name="stream-manager-howto-view-put-records"></a>在**MyKinesisStream**页面上，选择**监控**。如果测试成功，您应在 **Put Records (放置记录)** 图表中看到数据。根据您的连接，显示数据可能需要一分钟时间。
**重要**  
测试完成后，删除 Kinesis 数据流以避免产生更多费用。  
运行以下命令以停止 Greengrass 守护程序。这样可以防止核心发送消息，直到您准备好继续测试。  

   ```
   cd /greengrass/ggc/core/
   sudo ./greengrassd stop
   ```

1. 从核心中移除 **TransferStream**Lambda 函数。

   1. <a name="console-gg-groups"></a>**在 AWS IoT 控制台导航窗格的**管理**下，展开 **Greengrass** 设备，然后选择群组 (V1)。**

   1. 在 **Greengrass 组**下，选择您的组。

   1. 在 **Lambdas** 页面上，为**TransferStream**函数选择省略号 (**...**)，然后选择**移除**函数。

   1. 从**操作**中，选择**部署**。

要查看日志记录信息或解决流的问题，请检查日志中的 `TransferStream` 和 `GGStreamManager` 函数。您必须具有读取文件系统 AWS IoT Greengrass 日志的`root`权限。
+ `TransferStream` 将日志条目写入 `greengrass-root/ggc/var/log/user/region/account-id/TransferStream.log`。
+ `GGStreamManager` 将日志条目写入 `greengrass-root/ggc/var/log/system/GGStreamManager.log`。

如果您需要更多故障排除信息，可以将**用户 Lambda 日志**的[日志级别](#stream-manager-console-configure-logging)设置为**调试日志**，然后再次部署该组。

## 另请参阅
<a name="stream-manager-console-see-also"></a>
+ [管理 AWS IoT Greengrass 核心上的数据流](stream-manager.md)
+ [配置 AWS IoT Greengrass 直播管理器](configure-stream-manager.md)
+ [StreamManagerClient 用于处理直播](work-with-streams.md)
+ [导出支持的 AWS 云 目标的配置](stream-export-configurations.md)
+ [将数据流导出到 AWS 云 (CLI)](stream-manager-cli.md)

# 将数据流导出到 AWS 云 (CLI)
<a name="stream-manager-cli"></a>

本教程向您展示如何使用配置和部署启用了流管理器的 AWS IoT Greengrass 群组。 AWS CLI 该组包含一个用户定义的 Lambda 函数，该函数可以在流管理器中写入流，然后将其自动导出到 AWS 云中。

流管理器使得摄取、处理和导出大容量数据流更高效也更可靠。在本教程中，您将创建一个使用 IoT 数据的 `TransferStream` Lambda 函数。Lambda 函数使用 AWS IoT Greengrass Core SDK 在流管理器中创建流，然后对其进行读取和写入。然后，流管理器将流导出到 Kinesis Data Streams。下图演示了此工作流程。

![\[流管理工作流图。\]](http://docs.aws.amazon.com/zh_cn/greengrass/v1/developerguide/images/stream-manager-scenario.png)


本教程的重点是展示用户定义的 Lambda 函数如何使用 AWS IoT Greengrass Core SDK 中的`StreamManagerClient`对象与流管理器进行交互。为简单起见，您为本教程创建的 Python Lambda 函数将生成模拟设备数据。

当您使用 AWS IoT Greengrass API（其中包含 Greengrass 命令）创建群组时，直播管理器默认处于 AWS CLI禁用状态。要在核心上启用流管理器，您需要[创建一个函数定义版本](#stream-manager-cli-create-function-definition)，其中包括系统 `GGStreamManager` Lambda 函数和一个引用新的函数定义版本的组版本。然后部署组。

## 先决条件
<a name="stream-manager-cli-prerequisites"></a>

要完成此教程，需要：<a name="stream-manager-howto-prereqs"></a>
+ Greengrass 组和 Greengrass Core（v1.10 或更高版本）。有关如何创建 Greengrass 组和核心的信息，请参阅 [入门 AWS IoT Greengrass](gg-gs.md)。入门教程还包括安装 AWS IoT Greengrass 核心软件的步骤。
**注意**  <a name="stream-manager-not-supported-openwrt"></a>
<a name="stream-manager-not-supported-openwrt-para"></a> OpenWrt 发行版不支持直播管理器。
+ 核心设备上安装的 Java 8 运行时 (JDK 8)。<a name="install-java8-runtime-general"></a>
  + 对于基于 Debian 的发行版（包括 Raspbian）或基于 Ubuntui 的发行版，运行以下命令：

    ```
    sudo apt install openjdk-8-jdk
    ```
  + 对于基于 Red Hat 的发行版（包括 Amazon Linux），请运行以下命令：

    ```
    sudo yum install java-1.8.0-openjdk
    ```

    有关更多信息，请参阅 OpenJDK 文档中的[如何下载并安装预先构建的 OpenJDK 程序包](https://openjdk.java.net/install/)。
+ AWS IoT Greengrass 适用于 Python 的核心开发工具包 v1.5.0 或更高版本。要在适用于 Python 的 AWS IoT Greengrass Core 软件开发工具包中使用 `StreamManagerClient`，您必须：
  + 在核心设备上安装 Python 3.7 或更高版本。
  + 将开发工具包和其依赖项包含在 Lambda 函数部署程序包中。本教程中提供了说明。
**提示**  
可以将 `StreamManagerClient` 与 Java 或 NodeJS 结合使用。有关示例代码，请参阅适用于 [Java 的AWS IoT Greengrass Core SDK](https://github.com/aws/aws-greengrass-core-sdk-java/blob/master/samples/StreamManagerKinesis/src/main/java/com/amazonaws/greengrass/examples/StreamManagerKinesis.java) 和[适用于 Node.js 的AWS IoT Greengrass 酷睿 SDK](https://github.com/aws/aws-greengrass-core-sdk-js/blob/master/greengrassExamples/StreamManagerKinesis/index.js) GitHub。
+ 在 Amazon Kinesis Data Streams 中**MyKinesisStream**创建的目标流，名称与你的 Greengrass 群组 AWS 区域 相同。有关更多信息，请参阅 *Amazon Kinesis 开发人员指南*中的[创建流](https://docs.aws.amazon.com/streams/latest/dev/fundamental-stream.html#create-stream)。
**注意**  
在本教程中，流管理器将数据导出到 Kinesis Data Streams，这将向您的 AWS 账户账户收取费用。有关定价的信息，请参阅 [Kinesis Data Streams 定价](https://aws.amazon.com/kinesis/data-streams/pricing/)。  
为避免产生费用，您可以在不创建 Kinesis 数据流的情况下运行本教程。在这种情况下，您检查日志以查看流管理器试图将流导出到 Kinesis Data Streams。
+ 一个添加到了 `kinesis:PutRecords` 的 IAM policy，该策略允许对目标数据流执行 [Greengrass 组角色](group-role.md) 操作，如以下示例所示：

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "kinesis:PutRecords"
              ],
              "Resource": [
              "arn:aws:kinesis:us-east-1:123456789012:stream/MyKinesisStream"
              ]
          }
      ]
  }
  ```

------<a name="aws-cli-howto-prereqs"></a>
+  AWS CLI 已在您的计算机上安装和配置。有关更多信息，请参阅 *AWS Command Line Interface 用户指南*中的[安装 AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/installing.html) 和[配置 AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html) 。

   

  本教程中的示例命令是针对 Linux 及其他基于 Unix 的系统编写的。如果你使用的是 Windows，有关语法差异[的更多信息，请参阅为 AWS 命令行界面指定参数值](https://docs.aws.amazon.com/cli/latest/userguide/cli-using-param.html)。

  如果命令包含 JSON 字符串，本教程提供了在单行包含 JSON 的示例。在某些系统上，使用此格式可能会更高效地编辑和运行命令。

 

本教程包含以下概括步骤：

1. [创建 Lambda 函数部署程序包](#stream-manager-cli-create-deployment-package)

1. [创建 Lambda 函数](#stream-manager-cli-create-function)

1. [创建函数定义和版本](#stream-manager-cli-create-function-definition)

1. [创建记录器定义和版本](#stream-manager-cli-create-logger-definition)

1. [获取核心定义版本的 ARN](#stream-manager-cli-get-core-definition-version-arn)

1. [创建组版本](#stream-manager-cli-create-group-version)

1. [创建 部署。](#stream-manager-cli-create-deployment)

1. [测试应用程序](#stream-manager-cli-test-application)

完成本教程大约需要 30 分钟。

## 步骤 1：创建 Lambda 函数部署程序包
<a name="stream-manager-cli-create-deployment-package"></a>

在此步骤中，您创建包含 Python 函数代码和依赖项的 Lambda 函数部署包。您稍后在 AWS Lambda中创建 Lambda 函数时上传此程序包。Lambda 函数使用 AWS IoT Greengrass 核心软件开发工具包来创建本地流并与之交互。

**注意**  
 用户定义的 Lambda 函数必须使用 [AWS IoT Greengrass 核心开发工具包](lambda-functions.md#lambda-sdks-core)与流管理器交互。有关 Greengrass 流管理器的要求的更多信息，请参阅 [Greengrass 流管理器要求](stream-manager.md#stream-manager-requirements)。

1.  下载[适用于 Python 的AWS IoT Greengrass Core 开发工具包](lambda-functions.md#lambda-sdks-core) v1.5.0 或更高版本。

1. <a name="unzip-ggc-sdk"></a>解压缩下载的程序包以获取软件开发工具包。软件开发工具包是 `greengrasssdk` 文件夹。

1. <a name="install-python-sdk-dependencies-stream-manager"></a>安装程序包依赖项以将其包含在 Lambda 函数部署程序包的开发工具包中。<a name="python-sdk-dependencies-stream-manager"></a>

   1. 导航到包含该 `requirements.txt` 文件的开发工具包目录。此文件列出了依赖项。

   1. 安装开发工具包依赖项。例如，运行以下 `pip` 命令将它们安装在当前目录中：

      ```
      pip install --target . -r requirements.txt
      ```

1. 将以下 Python 代码函数保存在名为 `transfer_stream.py` 的本地文件中。
**提示**  
 有关使用 Java 和 NodeJS 的示例代码，请参阅AWS IoT Greengrass 适用于 J [ava 的 Core SDK AWS IoT Greengrass 和适用于 Node.js](https://github.com/aws/aws-greengrass-core-sdk-java/blob/master/samples/StreamManagerKinesis/src/main/java/com/amazonaws/greengrass/examples/StreamManagerKinesis.java) [的酷睿 SDK](https://github.com/aws/aws-greengrass-core-sdk-js/blob/master/greengrassExamples/StreamManagerKinesis/index.js)。 GitHub

   ```
   import asyncio
   import logging
   import random
   import time
   
   from greengrasssdk.stream_manager import (
       ExportDefinition,
       KinesisConfig,
       MessageStreamDefinition,
       ReadMessagesOptions,
       ResourceNotFoundException,
       StrategyOnFull,
       StreamManagerClient,
   )
   
   
   # This example creates a local stream named "SomeStream".
   # It starts writing data into that stream and then stream manager automatically exports  
   # the data to a customer-created Kinesis data stream named "MyKinesisStream". 
   # This example runs forever until the program is stopped.
   
   # The size of the local stream on disk will not exceed the default (which is 256 MB).
   # Any data appended after the stream reaches the size limit continues to be appended, and
   # stream manager deletes the oldest data until the total stream size is back under 256 MB.
   # The Kinesis data stream in the cloud has no such bound, so all the data from this script is
   # uploaded to Kinesis and you will be charged for that usage.
   
   
   def main(logger):
       try:
           stream_name = "SomeStream"
           kinesis_stream_name = "MyKinesisStream"
   
           # Create a client for the StreamManager
           client = StreamManagerClient()
   
           # Try deleting the stream (if it exists) so that we have a fresh start
           try:
               client.delete_message_stream(stream_name=stream_name)
           except ResourceNotFoundException:
               pass
   
           exports = ExportDefinition(
               kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)]
           )
           client.create_message_stream(
               MessageStreamDefinition(
                   name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports
               )
           )
   
           # Append two messages and print their sequence numbers
           logger.info(
               "Successfully appended message to stream with sequence number %d",
               client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")),
           )
           logger.info(
               "Successfully appended message to stream with sequence number %d",
               client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")),
           )
   
           # Try reading the two messages we just appended and print them out
           logger.info(
               "Successfully read 2 messages: %s",
               client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)),
           )
   
           logger.info("Now going to start writing random integers between 0 and 1000 to the stream")
           # Now start putting in random data between 0 and 1000 to emulate device sensor input
           while True:
               logger.debug("Appending new random integer to stream")
               client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big"))
               time.sleep(1)
   
       except asyncio.TimeoutError:
           logger.exception("Timed out while executing")
       except Exception:
           logger.exception("Exception while running")
   
   
   def function_handler(event, context):
       return
   
   
   logging.basicConfig(level=logging.INFO)
   # Start up this sample code
   main(logger=logging.getLogger())
   ```

1. 将以下项目压缩到名为 `transfer_stream_python.zip` 的文件中。此即 Lambda 函数部署程序包。
   + **transfer\$1stream.py**。应用程序逻辑。
   + **greengrasssdk**。发布 MQTT 消息的 Python Greengrass Lambda 函数所需的库。

     适用于 Python 的 AWS IoT Greengrass 核心 SDK 版本 1.5.0 或更高版本中提供了@@ [流管理器操作](work-with-streams.md)。
   + 你为适用于 Python 的 AWS IoT Greengrass 核心开发工具包安装的依赖项（例如，`cbor2`目录）。

   创建 `zip` 文件时，仅包含这些项目，而不是包含文件夹。

## 第 2 步：创建 Lambda 函数
<a name="stream-manager-cli-create-function"></a>

1. <a name="cli-create-empty-lambda-role"></a>创建 IAM 角色，以便您可以在创建该函数时传入角色 ARN。

------
#### [ JSON Expanded ]

   ```
   aws iam create-role --role-name Lambda_empty --assume-role-policy '{
       "Version": "2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Principal": {
                   "Service": "lambda.amazonaws.com"
               },
              "Action": "sts:AssumeRole"
           }
       ]
   }'
   ```

------
#### [ JSON Single-line ]

   ```
   aws iam create-role --role-name Lambda_empty --assume-role-policy '{"Version": "2012-10-17",		 	 	  "Statement": [{"Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com"},"Action": "sts:AssumeRole"}]}'
   ```

------
**注意**  
AWS IoT Greengrass 不使用这个角色，因为你的 Greengrass Lambda 函数的权限是在 Greengrass 群组角色中指定的。对于本教程，您将创建一个空角色。

1. <a name="cli-copy-lambda-role-arn"></a>从输出中复制 `Arn`。

1. 使用 AWS Lambda API 创建`TransferStream`函数。以下命令假定该 zip 文件位于当前目录中。
   + 将 *role-arn* 替换为复制的 `Arn`。

   ```
   aws lambda create-function \
   --function-name TransferStream \
   --zip-file fileb://transfer_stream_python.zip \
   --role role-arn \
   --handler transfer_stream.function_handler \
   --runtime python3.7
   ```

1. 发布该函数的版本。

   ```
   aws lambda publish-version --function-name TransferStream --description 'First version'
   ```

1. 为发布的版本创建别名。

   Greengrass 组可以按别名（推荐）或版本引用 Lambda 函数。使用别名，您可以更轻松地管理代码更新，因为您在更新函数代码时，不必更改订阅表或组定义。相反，您只需将别名指向新的函数版本。

   ```
   aws lambda create-alias --function-name TransferStream --name GG_TransferStream --function-version 1
   ```
**注意**  
AWS IoT Greengrass **不支持 \$1LATEST 版本的 Lambda 别名。**

1. 从输出中复制 `AliasArn`。在为配置函数时，您可以使用此值 AWS IoT Greengrass。

现在，您可以为配置该函数了 AWS IoT Greengrass。

## 步骤 3：创建函数定义和版本
<a name="stream-manager-cli-create-function-definition"></a>

此步骤将创建引用系统 `GGStreamManager` Lambda 函数和用户定义的 `TransferStream` Lambda 函数的函数定义版本。要在使用 AWS IoT Greengrass API 时启用流管理器，您的函数定义版本必须包含该`GGStreamManager`函数。

1. 创建一个包含初始版本的函数定义，该初始版本包含系统和用户定义的 Lambda 函数。

   以下定义版本启用了具有默认[参数设置](configure-stream-manager.md)的流管理器。要配置自定义设置，您必须为相应的流管理器参数定义环境变量。有关示例，请参阅[启用、禁用或配置流管理器 (CLI)](configure-stream-manager.md#enable-stream-manager-cli)。 AWS IoT Greengrass 对省略的参数使用默认设置。 `MemorySize`至少应该是`128000`。 `Pinned`必须设置为`true`。
**注意**  
<a name="long-lived-lambda"></a>*长寿命*（或*固定*）的 Lambda 函数在启动后自动启动，并在 AWS IoT Greengrass 自己的容器中继续运行。这与*按需* Lambda 函数相反，后者在调用时启动，并在没有要运行的任务时停止。有关更多信息，请参阅 [Greengrass Lambda 函数的生命周期配置](lambda-functions.md#lambda-lifecycle)。
   + *arbitrary-function-id*替换为函数的名称，例如**stream-manager**。
   + *alias-arn*替换为`AliasArn`您在创建 `TransferStream` Lambda 函数别名时复制的。

    

------
#### [ JSON expanded ]

   ```
   aws greengrass create-function-definition --name MyGreengrassFunctions --initial-version '{
       "Functions": [
           {
               "Id": "arbitrary-function-id",
               "FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1", 
               "FunctionConfiguration": {
                   "MemorySize": 128000,
                   "Pinned": true,
                   "Timeout": 3
               }
           },
           {
               "Id": "TransferStreamFunction",
               "FunctionArn": "alias-arn",
               "FunctionConfiguration": {
                   "Executable": "transfer_stream.function_handler",
                   "MemorySize": 16000,
                   "Pinned": true,
                   "Timeout": 5
               }
           }
       ]
   }'
   ```

------
#### [ JSON single ]

   ```
   aws greengrass create-function-definition \
   --name MyGreengrassFunctions \
   --initial-version '{"Functions": [{"Id": "arbitrary-function-id","FunctionArn": "arn:aws:lambda:::function:GGStreamManager:1", "FunctionConfiguration": {"Environment": {"Variables":{"STREAM_MANAGER_STORE_ROOT_DIR": "/data","STREAM_MANAGER_SERVER_PORT": "1234","STREAM_MANAGER_EXPORTER_MAX_BANDWIDTH": "20000"}},"MemorySize": 128000,"Pinned": true,"Timeout": 3}},{"Id": "TransferStreamFunction", "FunctionArn": "alias-arn", "FunctionConfiguration": {"Executable": "transfer_stream.function_handler", "MemorySize": 16000,"Pinned": true,"Timeout": 5}}]}'
   ```

------
**注意**  
`Timeout` 是函数定义版本所需的，但 `GGStreamManager` 不使用它。有关 `Timeout` 和其他组级别设置的更多信息，请参阅 [使用组特定的配置控制 Greengrass Lambda 函数的执行](lambda-group-config.md)。

1. 从输出中复制 `LatestVersionArn`。您将使用此值向部署到核心的组版本添加函数定义版本。

## 步骤 4：创建记录器定义和版本
<a name="stream-manager-cli-create-logger-definition"></a>

配置组的日志记录设置。在本教程中，您将配置 AWS IoT Greengrass 系统组件、用户定义的 Lambda 函数和连接器，以将日志写入核心设备的文件系统。您可以使用日志对可能遇到的任何问题进行故障排除。有关更多信息，请参阅 [使用 AWS IoT Greengrass 日志进行监控](greengrass-logs-overview.md)。

1. <a name="create-logger-definition"></a>创建一个包含初始版本的记录器定义。

------
#### [ JSON Expanded ]

   ```
   aws greengrass create-logger-definition --name "LoggingConfigs" --initial-version '{
       "Loggers": [
           {
               "Id": "1",
               "Component": "GreengrassSystem",
               "Level": "INFO",
               "Space": 10240,
               "Type": "FileSystem"
           },
           {
               "Id": "2",
               "Component": "Lambda",
               "Level": "INFO",
               "Space": 10240,
               "Type": "FileSystem"
           }
       ]
   }'
   ```

------
#### [ JSON Single-line ]

   ```
   aws greengrass create-logger-definition \
       --name "LoggingConfigs" \
       --initial-version '{"Loggers":[{"Id":"1","Component":"GreengrassSystem","Level":"INFO","Space":10240,"Type":"FileSystem"},{"Id":"2","Component":"Lambda","Level":"INFO","Space":10240,"Type":"FileSystem"}]}'
   ```

------

1. <a name="copy-logger-definition-version-id"></a>从输出中复制记录器定义的 `LatestVersionArn`。您将使用此值向部署到核心的组版本添加记录器定义版本。

## 步骤 5：获取核心定义版本的 ARN
<a name="stream-manager-cli-get-core-definition-version-arn"></a>

获取要添加到新组版本的核心定义版本的 ARN。要部署组版本，该组版本必须引用包含确切一个核心的核心定义版本。

1. <a name="get-group-id-latestversion"></a>获取目标 Greengrass 群 IDs 组和群组版本。此过程假定这是最新的组和组版本。以下查询将返回最近创建的组。

   ```
   aws greengrass list-groups --query "reverse(sort_by(Groups, &CreationTimestamp))[0]"
   ```

   或者，您也可以按名称查询。系统不要求组名称是唯一的，所以可能会返回多个组。

   ```
   aws greengrass list-groups --query "Groups[?Name=='MyGroup']"
   ```
**注意**  
<a name="find-group-ids-console"></a>您也可以在 AWS IoT 控制台中找到这些值。组 ID 显示在组的**设置**页面上。群组版本显示 IDs 在群组的 “**部署**” 选项卡上。

1. <a name="copy-target-group-id"></a>从输出中复制目标组的 `Id`。您将使用此值获取核心定义版本及部署组的时间。

1. <a name="copy-latest-group-version-id"></a>从输出中复制 `LatestVersion`，这是添加到组的最后一个版本的 ID。您将使用此值获取核心定义版本。

1. 获取核心定义版本的 ARN：

   1. 获取组版本。
      + *group-id*替换为你为`Id`该群组复制的。
      + *group-version-id*替换为你为`LatestVersion`该群组复制的。

      ```
      aws greengrass get-group-version \
      --group-id group-id \
      --group-version-id group-version-id
      ```

   1. 从输出中复制 `CoreDefinitionVersionArn`。您将使用此值向部署到核心的组版本添加核心定义版本。

## 步骤 6：创建组版本
<a name="stream-manager-cli-create-group-version"></a>

现在，您已准备就绪，可以创建一个包含您要部署的实体的组版本。要实现此目的，需要创建一个引用每种组件类型的目标版本的组版本。在本教程中，您将包括核心定义版本、函数定义版本和记录器定义版本。

1. 创建组版本。
   + *group-id*替换为你为`Id`该群组复制的。
   + *core-definition-version-arn*替换`CoreDefinitionVersionArn`为你为核心定义版本复制的。
   + *function-definition-version-arn*替换为你`LatestVersionArn`为新函数定义版本复制的函数。
   + *logger-definition-version-arn*替换为你为新`LatestVersionArn`的 Logger 定义版本复制的。

   ```
   aws greengrass create-group-version \
   --group-id group-id \
   --core-definition-version-arn core-definition-version-arn \
   --function-definition-version-arn function-definition-version-arn \
   --logger-definition-version-arn logger-definition-version-arn
   ```

1. <a name="copy-group-version-id"></a>从输出中复制 `Version`。这是新组版本的 ID。

## 步骤 7：创建部署
<a name="stream-manager-cli-create-deployment"></a>

将组部署到核心设备。

1. <a name="shared-deploy-group-checkggc"></a>确保 AWS IoT Greengrass 内核正在运行。根据需要在您的 Raspberry Pi 终端中运行以下命令。

   1. 要检查进程守护程序是否正在运行，请执行以下操作：

      ```
      ps aux | grep -E 'greengrass.*daemon'
      ```

      如果输出包含 `root` 的 `/greengrass/ggc/packages/ggc-version/bin/daemon` 条目，则表示进程守护程序正在运行。
**注意**  
路径中的版本取决于 AWS IoT Greengrass 核心设备上安装的 Core 软件版本。

   1. 启动进程守护程序：

      ```
      cd /greengrass/ggc/core/
      sudo ./greengrassd start
      ```

1. <a name="create-deployment"></a>创建 部署。
   + *group-id*替换为你为`Id`该群组复制的。
   + *group-version-id*替换为你`Version`为新群组版本复制的版本。

   ```
   aws greengrass create-deployment \
   --deployment-type NewDeployment \
   --group-id group-id \
   --group-version-id group-version-id
   ```

1. <a name="copy-deployment-id"></a>从输出中复制 `DeploymentId`。

1. <a name="get-deployment-status"></a>获取部署状态。
   + *group-id*替换为你为`Id`该群组复制的。
   + *deployment-id*替换为您`DeploymentId`为部署复制的。

   ```
   aws greengrass get-deployment-status \
   --group-id group-id \
   --deployment-id deployment-id
   ```

   如果状态为 `Success`，则部署成功。有关问题排查帮助，请参阅[故障排除 AWS IoT Greengrass](gg-troubleshooting.md)。

## 步骤 8：测试应用程序
<a name="stream-manager-cli-test-application"></a>

`TransferStream` Lambda 函数生成模拟的设备数据。它将数据写入流管理器导出到目标 Kinesis 数据流的流。

1. <a name="stream-manager-howto-test-open-kinesis-console"></a>在 Amazon Kinesis 控制台的 Kinesi **s 数据流**下，选择。**MyKinesisStream**
**注意**  
如果您在运行教程时没有目标 Kinesis 数据流， [请检查流管理器的日志文件](#stream-manager-cli-logs) (`GGStreamManager`)。如果它在错误消息中包含 `export stream MyKinesisStream doesn't exist`，则测试成功。此错误意味着服务试图导出到流，但流不存在。

1. <a name="stream-manager-howto-view-put-records"></a>在**MyKinesisStream**页面上，选择**监控**。如果测试成功，您应在 **Put Records (放置记录)** 图表中看到数据。根据您的连接，显示数据可能需要一分钟时间。
**重要**  
测试完成后，删除 Kinesis 数据流以避免产生更多费用。  
运行以下命令以停止 Greengrass 守护程序。这样可以防止核心发送消息，直到您准备好继续测试。  

   ```
   cd /greengrass/ggc/core/
   sudo ./greengrassd stop
   ```

1. 从核心中移除 **TransferStream**Lambda 函数。

   1. 按照[步骤 6：创建组版本](#stream-manager-cli-create-group-version)创建新的组版本，但删除 `create-group-version` 命令中的 `--function-definition-version-arn` 选项。或者，创建一个不包含 **TransferStream**Lambda 函数的函数定义版本。
**注意**  
通过省略部署的组版本中的系统 `GGStreamManager` Lambda 函数，可以禁用核心上的流管理。

   1. 按照[步骤 7：创建部署](#stream-manager-cli-create-deployment)以部署新的组版本。

要查看日志记录信息或解决流的问题，请检查日志中的 `TransferStream` 和 `GGStreamManager` 函数。您必须具有读取文件系统 AWS IoT Greengrass 日志的`root`权限。
+ `TransferStream` 将日志条目写入 `greengrass-root/ggc/var/log/user/region/account-id/TransferStream.log`。
+ `GGStreamManager` 将日志条目写入 `greengrass-root/ggc/var/log/system/GGStreamManager.log`。

如果需要更多疑难解答信息，可以将 `Lambda` 日志记录级别设置为 `DEBUG`，然后创建并部署新的组版本。

## 另请参阅
<a name="stream-manager-cli-see-also"></a>
+ [管理 AWS IoT Greengrass 核心上的数据流](stream-manager.md)
+ [StreamManagerClient 用于处理直播](work-with-streams.md)
+ [导出支持的 AWS 云 目标的配置](stream-export-configurations.md)
+ [配置 AWS IoT Greengrass 直播管理器](configure-stream-manager.md)
+ [将数据流导出到 AWS 云 （控制台）](stream-manager-console.md)
+ <a name="see-also-iam-cli"></a>AWS Identity and Access Management 命令*参考@@ [中的 (IAM)AWS CLI 命令](https://docs.aws.amazon.com/cli/latest/reference/iam)*
+ <a name="see-also-lambda-cli"></a>[AWS Lambda 命令](https://docs.aws.amazon.com/cli/latest/reference/lambda)*参考中的AWS CLI 命令*
+ <a name="see-also-gg-cli"></a>[AWS IoT Greengrass 命令](https://docs.aws.amazon.com/cli/latest/reference/greengrass/index.html)*参考中的AWS CLI 命令*