

终止支持通知：2026 年 10 月 7 日， AWS 将停止对的支持。 AWS IoT Greengrass Version 1 2026 年 10 月 7 日之后，您将无法再访问这些 AWS IoT Greengrass V1 资源。如需了解更多信息，请访问[迁移自 AWS IoT Greengrass Version 1](https://docs.aws.amazon.com/greengrass/v2/developerguide/migrate-from-v1.html)。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 将数据流导出到 AWS 云 （控制台）
<a name="stream-manager-console"></a>

本教程向您展示如何使用 AWS IoT 控制台配置和部署启用直播管理器的 AWS IoT Greengrass 群组。该组包含一个用户定义的 Lambda 函数，该函数可以在流管理器中写入流，然后将其自动导出到 AWS 云中。

流管理器使得摄取、处理和导出大容量数据流更高效也更可靠。在本教程中，您将创建一个使用 IoT 数据的 `TransferStream` Lambda 函数。Lambda 函数使用 AWS IoT Greengrass Core SDK 在流管理器中创建流，然后对其进行读取和写入。然后，流管理器将流导出到 Kinesis Data Streams。下图演示了此工作流程。

![\[流管理工作流图。\]](http://docs.aws.amazon.com/zh_cn/greengrass/v1/developerguide/images/stream-manager-scenario.png)


本教程的重点是展示用户定义的 Lambda 函数如何使用 AWS IoT Greengrass Core SDK 中的`StreamManagerClient`对象与流管理器进行交互。为简单起见，您为本教程创建的 Python Lambda 函数将生成模拟设备数据。

## 先决条件
<a name="stream-manager-console-prerequisites"></a>

要完成此教程，需要：<a name="stream-manager-howto-prereqs"></a>
+ Greengrass 组和 Greengrass Core（v1.10 或更高版本）。有关如何创建 Greengrass 组和核心的信息，请参阅 [入门 AWS IoT Greengrass](gg-gs.md)。入门教程还包括安装 AWS IoT Greengrass 核心软件的步骤。
**注意**  <a name="stream-manager-not-supported-openwrt"></a>
<a name="stream-manager-not-supported-openwrt-para"></a> OpenWrt 发行版不支持直播管理器。
+ 核心设备上安装的 Java 8 运行时 (JDK 8)。<a name="install-java8-runtime-general"></a>
  + 对于基于 Debian 的发行版（包括 Raspbian）或基于 Ubuntui 的发行版，运行以下命令：

    ```
    sudo apt install openjdk-8-jdk
    ```
  + 对于基于 Red Hat 的发行版（包括 Amazon Linux），请运行以下命令：

    ```
    sudo yum install java-1.8.0-openjdk
    ```

    有关更多信息，请参阅 OpenJDK 文档中的[如何下载并安装预先构建的 OpenJDK 程序包](https://openjdk.java.net/install/)。
+ AWS IoT Greengrass 适用于 Python 的核心开发工具包 v1.5.0 或更高版本。要在适用于 Python 的 AWS IoT Greengrass Core 软件开发工具包中使用 `StreamManagerClient`，您必须：
  + 在核心设备上安装 Python 3.7 或更高版本。
  + 将开发工具包和其依赖项包含在 Lambda 函数部署程序包中。本教程中提供了说明。
**提示**  
可以将 `StreamManagerClient` 与 Java 或 NodeJS 结合使用。有关示例代码，请参阅适用于 [Java 的AWS IoT Greengrass Core SDK](https://github.com/aws/aws-greengrass-core-sdk-java/blob/master/samples/StreamManagerKinesis/src/main/java/com/amazonaws/greengrass/examples/StreamManagerKinesis.java) 和[适用于 Node.js 的AWS IoT Greengrass 酷睿 SDK](https://github.com/aws/aws-greengrass-core-sdk-js/blob/master/greengrassExamples/StreamManagerKinesis/index.js) GitHub。
+ 在 Amazon Kinesis Data Streams 中**MyKinesisStream**创建的目标流，名称与你的 Greengrass 群组 AWS 区域 相同。有关更多信息，请参阅 *Amazon Kinesis 开发人员指南*中的[创建流](https://docs.aws.amazon.com/streams/latest/dev/fundamental-stream.html#create-stream)。
**注意**  
在本教程中，流管理器将数据导出到 Kinesis Data Streams，这将向您的 AWS 账户账户收取费用。有关定价的信息，请参阅 [Kinesis Data Streams 定价](https://aws.amazon.com/kinesis/data-streams/pricing/)。  
为避免产生费用，您可以在不创建 Kinesis 数据流的情况下运行本教程。在这种情况下，您检查日志以查看流管理器试图将流导出到 Kinesis Data Streams。
+ 一个添加到了 `kinesis:PutRecords` 的 IAM policy，该策略允许对目标数据流执行 [Greengrass 组角色](group-role.md) 操作，如以下示例所示：

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "kinesis:PutRecords"
              ],
              "Resource": [
              "arn:aws:kinesis:us-east-1:123456789012:stream/MyKinesisStream"
              ]
          }
      ]
  }
  ```

------

本教程包含以下概括步骤：

1. [创建 Lambda 函数部署程序包](#stream-manager-console-create-deployment-package)

1. [创建 Lambda 函数](#stream-manager-console-create-function)

1. [将函数添加到组](#stream-manager-console-create-gg-function)

1. [启用流管理器](#stream-manager-console-enable-stream-manager)

1. [配置本地日志记录](#stream-manager-console-configure-logging)

1. [部署组](#stream-manager-console-create-deployment)

1. [测试应用程序](#stream-manager-console-test-application)

完成本教程大约需要 20 分钟。

## 步骤 1：创建 Lambda 函数部署程序包
<a name="stream-manager-console-create-deployment-package"></a>

在此步骤中，您创建包含 Python 函数代码和依赖项的 Lambda 函数部署包。您稍后在 AWS Lambda中创建 Lambda 函数时上传此程序包。Lambda 函数使用 AWS IoT Greengrass 核心软件开发工具包来创建本地流并与之交互。

**注意**  
 用户定义的 Lambda 函数必须使用 [AWS IoT Greengrass 核心开发工具包](lambda-functions.md#lambda-sdks-core)与流管理器交互。有关 Greengrass 流管理器的要求的更多信息，请参阅 [Greengrass 流管理器要求](stream-manager.md#stream-manager-requirements)。

1.  下载[适用于 Python 的AWS IoT Greengrass Core 开发工具包](lambda-functions.md#lambda-sdks-core) v1.5.0 或更高版本。

1. <a name="unzip-ggc-sdk"></a>解压缩下载的程序包以获取软件开发工具包。软件开发工具包是 `greengrasssdk` 文件夹。

1. <a name="install-python-sdk-dependencies-stream-manager"></a>安装程序包依赖项以将其包含在 Lambda 函数部署程序包的开发工具包中。<a name="python-sdk-dependencies-stream-manager"></a>

   1. 导航到包含该 `requirements.txt` 文件的开发工具包目录。此文件列出了依赖项。

   1. 安装开发工具包依赖项。例如，运行以下 `pip` 命令将它们安装在当前目录中：

      ```
      pip install --target . -r requirements.txt
      ```

1. 将以下 Python 代码函数保存在名为 `transfer_stream.py` 的本地文件中。
**提示**  
 有关使用 Java 和 NodeJS 的示例代码，请参阅AWS IoT Greengrass 适用于 J [ava 的 Core SDK AWS IoT Greengrass 和适用于 Node.js](https://github.com/aws/aws-greengrass-core-sdk-java/blob/master/samples/StreamManagerKinesis/src/main/java/com/amazonaws/greengrass/examples/StreamManagerKinesis.java) [的酷睿 SDK](https://github.com/aws/aws-greengrass-core-sdk-js/blob/master/greengrassExamples/StreamManagerKinesis/index.js)。 GitHub

   ```
   import asyncio
   import logging
   import random
   import time
   
   from greengrasssdk.stream_manager import (
       ExportDefinition,
       KinesisConfig,
       MessageStreamDefinition,
       ReadMessagesOptions,
       ResourceNotFoundException,
       StrategyOnFull,
       StreamManagerClient,
   )
   
   
   # This example creates a local stream named "SomeStream".
   # It starts writing data into that stream and then stream manager automatically exports  
   # the data to a customer-created Kinesis data stream named "MyKinesisStream". 
   # This example runs forever until the program is stopped.
   
   # The size of the local stream on disk will not exceed the default (which is 256 MB).
   # Any data appended after the stream reaches the size limit continues to be appended, and
   # stream manager deletes the oldest data until the total stream size is back under 256 MB.
   # The Kinesis data stream in the cloud has no such bound, so all the data from this script is
   # uploaded to Kinesis and you will be charged for that usage.
   
   
   def main(logger):
       try:
           stream_name = "SomeStream"
           kinesis_stream_name = "MyKinesisStream"
   
           # Create a client for the StreamManager
           client = StreamManagerClient()
   
           # Try deleting the stream (if it exists) so that we have a fresh start
           try:
               client.delete_message_stream(stream_name=stream_name)
           except ResourceNotFoundException:
               pass
   
           exports = ExportDefinition(
               kinesis=[KinesisConfig(identifier="KinesisExport" + stream_name, kinesis_stream_name=kinesis_stream_name)]
           )
           client.create_message_stream(
               MessageStreamDefinition(
                   name=stream_name, strategy_on_full=StrategyOnFull.OverwriteOldestData, export_definition=exports
               )
           )
   
           # Append two messages and print their sequence numbers
           logger.info(
               "Successfully appended message to stream with sequence number %d",
               client.append_message(stream_name, "ABCDEFGHIJKLMNO".encode("utf-8")),
           )
           logger.info(
               "Successfully appended message to stream with sequence number %d",
               client.append_message(stream_name, "PQRSTUVWXYZ".encode("utf-8")),
           )
   
           # Try reading the two messages we just appended and print them out
           logger.info(
               "Successfully read 2 messages: %s",
               client.read_messages(stream_name, ReadMessagesOptions(min_message_count=2, read_timeout_millis=1000)),
           )
   
           logger.info("Now going to start writing random integers between 0 and 1000 to the stream")
           # Now start putting in random data between 0 and 1000 to emulate device sensor input
           while True:
               logger.debug("Appending new random integer to stream")
               client.append_message(stream_name, random.randint(0, 1000).to_bytes(length=4, signed=True, byteorder="big"))
               time.sleep(1)
   
       except asyncio.TimeoutError:
           logger.exception("Timed out while executing")
       except Exception:
           logger.exception("Exception while running")
   
   
   def function_handler(event, context):
       return
   
   
   logging.basicConfig(level=logging.INFO)
   # Start up this sample code
   main(logger=logging.getLogger())
   ```

1. 将以下项目压缩到名为 `transfer_stream_python.zip` 的文件中。此即 Lambda 函数部署程序包。
   + **transfer\$1stream.py**。应用程序逻辑。
   + **greengrasssdk**。发布 MQTT 消息的 Python Greengrass Lambda 函数所需的库。

     适用于 Python 的 AWS IoT Greengrass 核心 SDK 版本 1.5.0 或更高版本中提供了@@ [流管理器操作](work-with-streams.md)。
   + 你为适用于 Python 的 AWS IoT Greengrass 核心开发工具包安装的依赖项（例如，`cbor2`目录）。

   创建 `zip` 文件时，仅包含这些项目，而不是包含文件夹。

## 第 2 步：创建 Lambda 函数
<a name="stream-manager-console-create-function"></a>

在此步骤中，您将使用 AWS Lambda 控制台创建 Lambda 函数并将其配置为使用您的部署包。接着，发布函数版本并创建别名。

1. 首先，创建 Lambda 函数。

   1. <a name="lambda-console-open"></a>在中 AWS 管理控制台，选择**服务**，然后打开 AWS Lambda 控制台。

   1. <a name="lambda-console-create-function"></a>选择 **创建函数**，然后选择 **从头开始创作**。

   1. 在**基本信息**部分中，使用以下值：
      + 对于**函数名称**，请输入 **TransferStream**。
      + 对于**运行时系统**，选择 **Python 3.7**。
      + 对于**权限**，请保留默认设置。这将创建一个授予基本 Lambda 权限的执行角色。此角色未被使用 AWS IoT Greengrass。

   1. <a name="lambda-console-save-function"></a>在页面底部，选择**创建函数**。

1. 接下来，注册处理程序并上传您的 Lambda 函数部署程序包。

   1. <a name="lambda-console-upload"></a>在**代码**选项卡上的**代码源**下，选择**上传自**。从下拉列表中选择 **.zip 文件**。  
![\[“上传自”下拉列表中突出显示了.zip 文件。\]](http://docs.aws.amazon.com/zh_cn/greengrass/v1/developerguide/images/lra-console/upload-deployment-package.png)

   1. 选择**上传**，然后选择您的 `transfer_stream_python.zip` 部署包。然后，选择**保存**。

   1. <a name="lambda-console-runtime-settings-para"></a>在函数的**代码**选项卡中，在**运行时设置**下选择**编辑**，然后输入以下值。
      + 对于**运行时系统**，选择 **Python 3.7**。
      + 对于**处理程序**，输入 **transfer\$1stream.function\$1handler**。

   1. <a name="lambda-console-save-config"></a>选择**保存**。
**注意**  
 AWS Lambda 主机上的 “**测试**” 按钮不适用于此功能。 AWS IoT Greengrass 核心软件开发工具包不包含在控制台中独立运行 Greengrass Lambda 函数所需的模块。 AWS Lambda 这些模块（例如 `greengrass_common`）是在函数部署到您的 Greengrass 核心之后提供给它们的。

1. 现在，发布 Lambda 函数的第一个版本并创建[此版本的别名](https://docs.aws.amazon.com/lambda/latest/dg/versioning-aliases.html)。
**注意**  
Greengrass 组可以按别名（推荐）或版本引用 Lambda 函数。使用别名，您可以更轻松地管理代码更新，因为您在更新函数代码时，不必更改订阅表或组定义。相反，您只需将别名指向新的函数版本。

   1. <a name="shared-publish-function-version"></a>在**操作**菜单上，选择**发布新版本**。

   1. <a name="shared-publish-function-version-description"></a>对于**版本描述**，输入 **First version**，然后选择**发布**。

   1. 在**TransferStream:1** 配置页面上，从 “**操作**” 菜单中选择 “**创建别名**”。

   1. 在**创建新别名**页面上，使用以下值：
      + 对于**名称**，输入 **GG\$1TransferStream**。
      + 对于**版本**，选择 **1**。
**注意**  
AWS IoT Greengrass **不支持 \$1LATEST 版本的 Lambda 别名。**

   1. 选择**创建**。

现在，您已准备就绪，可以将 Lambda 函数添加到 Greengrass 组。

## 步骤 3：将 Lambda 函数添加到 Greengrass 组
<a name="stream-manager-console-create-gg-function"></a>

在该步骤中，您将 Lambda 函数添加到该组，然后配置其生命周期和环境变量。有关更多信息，请参阅 [使用组特定的配置控制 Greengrass Lambda 函数的执行](lambda-group-config.md)。

1. <a name="console-gg-groups"></a>**在 AWS IoT 控制台导航窗格的**管理**下，展开 **Greengrass** 设备，然后选择群组 (V1)。**

1. <a name="group-choose-target-group"></a>选择目标组。

1. <a name="choose-add-lambda"></a>在组配置页面上，选择**Lambda 函数**选项卡。

1. 在**我的 Lambda 函数**部分下，选择**添加**。

1. 在**添加 Lambda 函数**页面上，为您的 Lambda 函数选择 **Lambda 函数**。

1. **对于 **Lambda 版本**，请选择别名:gg\$1。TransferStream**

   现在，配置用于确定 Greengrass 组中 Lambda 函数的行为的属性。

1. 在 **Lambda 函数配置**部分中，进行以下更改：
   + 将 **Memory limit (内存限制)** 设置为 32 MB。
   + 对于**已固定**，选择 **True**。
**注意**  
<a name="long-lived-lambda"></a>*长寿命*（或*固定*）的 Lambda 函数在启动后自动启动，并在 AWS IoT Greengrass 自己的容器中继续运行。这与*按需* Lambda 函数相反，后者在调用时启动，并在没有要运行的任务时停止。有关更多信息，请参阅 [Greengrass Lambda 函数的生命周期配置](lambda-functions.md#lambda-lifecycle)。

1. 选择**添加 Lambda 函数**。

## 步骤 4：启用流管理器
<a name="stream-manager-console-enable-stream-manager"></a>

在此步骤中，您确保启用流管理器。

1. 在组配置页面上，选择**Lambda 函数**选项卡。

1. 在**系统 Lambda 函数**下，选择**流管理器**，然后检查状态。如果禁用，请选择 **Edit (编辑)**。然后，选择 **Enable (启用)** 和 **Save (保存)**。您可以对本教程使用默认参数设置。有关更多信息，请参阅 [配置 AWS IoT Greengrass 直播管理器](configure-stream-manager.md)。

**注意**  <a name="ggstreammanager-function-config-console"></a>
使用控制台启用流管理器并部署组时，流管理器的内存大小默认设置为 4194304 KB (4 GB)。建议您将内存大小设置为至少 128000 KB。

## 步骤 5：配置本地日志记录
<a name="stream-manager-console-configure-logging"></a>

在此步骤中，您将在组中配置 AWS IoT Greengrass 系统组件、用户定义的 Lambda 函数和连接器，以将日志写入核心设备的文件系统。您可以使用日志对可能遇到的任何问题进行故障排除。有关更多信息，请参阅 [使用 AWS IoT Greengrass 日志进行监控](greengrass-logs-overview.md)。

1. <a name="shared-group-settings-local-logs-configuration"></a>在 **Local logs configuration (本地日志配置)** 下，检查是否配置了本地日志记录。

1. <a name="shared-group-settings-local-logs-edit"></a>如果未为 Greengrass 系统组件或用户定义的 Lambda 函数配置日志，请选择**编辑**。

1. <a name="shared-group-settings-local-logs-event-source"></a>选择**用户 Lambda 函数日志级别**和 **Greengrass 系统日志级别**。

1. <a name="shared-group-settings-local-logs-save"></a>保留日志记录级别和磁盘空间限制的默认值，然后选择 **Save (保存)**。

## 步骤 6：部署 Greengrass 组
<a name="stream-manager-console-create-deployment"></a>

将组部署到核心设备。

1. <a name="shared-deploy-group-checkggc"></a>确保 AWS IoT Greengrass 核心正在运行。根据需要在您的 Raspberry Pi 终端中运行以下命令。

   1. 要检查进程守护程序是否正在运行，请执行以下操作：

      ```
      ps aux | grep -E 'greengrass.*daemon'
      ```

      如果输出包含 `root` 的 `/greengrass/ggc/packages/ggc-version/bin/daemon` 条目，则表示进程守护程序正在运行。
**注意**  
路径中的版本取决于 AWS IoT Greengrass 核心设备上安装的 Core 软件版本。

   1. 启动进程守护程序：

      ```
      cd /greengrass/ggc/core/
      sudo ./greengrassd start
      ```

1. <a name="shared-deploy-group-deploy"></a>在组配置页面上，选择**部署**。

1. <a name="shared-deploy-group-ipconfig"></a>

   1. 在 **Lambda 函数**选项卡的**系统 Lambda 函数**部分下，选择 **IP 检测器**，再选择**编辑**。

   1. 在**编辑 IP 检测器设置**对话框中，选择**自动检测和覆盖 MQTT 代理端点**。

   1. 选择**保存**。

      这使得设备可以自动获取核心的连接信息，例如 IP 地址、DNS 和端口号。建议自动检测，但 AWS IoT Greengrass 也支持手动指定的端点。只有在首次部署组时，系统才会提示您选择发现方法。
**注意**  
如果出现提示，请授予创建 [Greengrass 服务角色并将其与当前](service-role.md)角色关联的权限。 AWS 账户 AWS 区域此角色 AWS IoT Greengrass 允许访问您在 AWS 服务中的资源。

      **部署**页面显示了部署时间戳、版本 ID 和状态。完成后，部署的状态应显示为 **已完成**。

      有关问题排查帮助，请参阅[故障排除 AWS IoT Greengrass](gg-troubleshooting.md)。

## 步骤 7：测试应用程序
<a name="stream-manager-console-test-application"></a>

`TransferStream` Lambda 函数生成模拟的设备数据。它将数据写入流管理器导出到目标 Kinesis 数据流的流。

1. <a name="stream-manager-howto-test-open-kinesis-console"></a>在 Amazon Kinesis 控制台的 Kinesi **s 数据流**下，选择。**MyKinesisStream**
**注意**  
如果您在运行教程时没有目标 Kinesis 数据流， [请检查流管理器的日志文件](stream-manager-cli.md#stream-manager-cli-logs) (`GGStreamManager`)。如果它在错误消息中包含 `export stream MyKinesisStream doesn't exist`，则测试成功。此错误意味着服务试图导出到流，但流不存在。

1. <a name="stream-manager-howto-view-put-records"></a>在**MyKinesisStream**页面上，选择**监控**。如果测试成功，您应在 **Put Records (放置记录)** 图表中看到数据。根据您的连接，显示数据可能需要一分钟时间。
**重要**  
测试完成后，删除 Kinesis 数据流以避免产生更多费用。  
运行以下命令以停止 Greengrass 守护程序。这样可以防止核心发送消息，直到您准备好继续测试。  

   ```
   cd /greengrass/ggc/core/
   sudo ./greengrassd stop
   ```

1. 从核心中移除 **TransferStream**Lambda 函数。

   1. <a name="console-gg-groups"></a>**在 AWS IoT 控制台导航窗格的**管理**下，展开 **Greengrass** 设备，然后选择群组 (V1)。**

   1. 在 **Greengrass 组**下，选择您的组。

   1. 在 **Lambdas** 页面上，为**TransferStream**函数选择省略号 (**...**)，然后选择**移除**函数。

   1. 从**操作**中，选择**部署**。

要查看日志记录信息或解决流的问题，请检查日志中的 `TransferStream` 和 `GGStreamManager` 函数。您必须具有读取文件系统 AWS IoT Greengrass 日志的`root`权限。
+ `TransferStream` 将日志条目写入 `greengrass-root/ggc/var/log/user/region/account-id/TransferStream.log`。
+ `GGStreamManager` 将日志条目写入 `greengrass-root/ggc/var/log/system/GGStreamManager.log`。

如果您需要更多故障排除信息，可以将**用户 Lambda 日志**的[日志级别](#stream-manager-console-configure-logging)设置为**调试日志**，然后再次部署该组。

## 另请参阅
<a name="stream-manager-console-see-also"></a>
+ [管理 AWS IoT Greengrass 核心上的数据流](stream-manager.md)
+ [配置 AWS IoT Greengrass 直播管理器](configure-stream-manager.md)
+ [StreamManagerClient 用于处理直播](work-with-streams.md)
+ [导出支持的 AWS 云 目标的配置](stream-export-configurations.md)
+ [将数据流导出到 AWS 云 (CLI)](stream-manager-cli.md)