

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# StreamManagerClient 用于处理直播
<a name="work-with-streams"></a>

在 Greengrass 核心设备上运行的用户定义的 Greengrass 组件可以使用流管理器 SDK 中的 `StreamManagerClient` 对象在[流管理器](manage-data-streams.md)中创建流，然后与流进行交互。当组件创建流时，它会定义流的 AWS 云 目的地、优先级以及其他导出和数据保留策略。要将数据发送到流管理器，组件会将数据附加到流中。如果为流定义了导出目标，流管理器会自动导出流。

**注意**  
<a name="stream-manager-clients"></a>通常，流管理器的客户端是用户定义的 Greengrass 组件。如果您的业务案例需要它，您也可以允许在 Greengrass 核心上运行的非组件进程（例如 Docker 容器）与流管理器交互。有关更多信息，请参阅 [客户端身份验证](manage-data-streams.md#stream-manager-security-client-authentication)。

本主题中的代码段向您展示客户端如何调用 `StreamManagerClient` 方式处理流。有关方法及其参数的实现详细信息，请使用指向每个代码片段后面列出的开发工具包参考的链接。

如果您在 Lambda 函数中使用流管理器，则您的 Lambda 函数应该在函数处理程序之外实例化 `StreamManagerClient`。如果在处理程序中进行实例化，该函数每次被调用时都会创建一个 `client` 并连接到流管理器。

**注意**  
如果在处理程序中实例化 `StreamManagerClient`，则必须在 `client` 完成其工作时显式调用 `close()` 方法。否则，`client` 会保持连接打开，并且另一个线程一直运行，直到脚本退出。

`StreamManagerClient` 支持以下操作：
+ [创建消息流](#streammanagerclient-create-message-stream)
+ [附加消息](#streammanagerclient-append-message)
+ [读取消息](#streammanagerclient-read-messages)
+ [列出流](#streammanagerclient-list-streams)
+ [描述消息流](#streammanagerclient-describe-message-stream)
+ [更新消息流](#streammanagerclient-update-message-stream)
+ [删除消息流](#streammanagerclient-delete-message-stream)

## 创建消息流
<a name="streammanagerclient-create-message-stream"></a>

要创建流，用户定义的 Greengrass 组件会调用 create 方法并传入一个 `MessageStreamDefinition` 对象。此对象指定流的唯一名称，并定义当达到最大流大小时，流管理器应如何处理新数据。您可以使用 `MessageStreamDefinition` 及其数据类型（如 `ExportDefinition`、`StrategyOnFull` 和 `Persistence`）来定义其他流属性。这些方法包括：
+ 自动导出的目标 AWS IoT Analytics Kinesis Data Stream AWS IoT SiteWise s 和 Amazon S3 目的地。有关更多信息，请参阅 [导出支持的 AWS 云 目标的配置](stream-export-configurations.md)。
+ 导出优先级。流管理器先导出优先级较高的流，然后导出优先级较低的流。
+ Kinesis Data Streams 和 AWS IoT Analytics目标的最大批处理大小 AWS IoT SiteWise 和批处理间隔。当满足任一条件时，流管理器导出消息。
+ Time-to-live (TTL)。保证流数据可用于处理的时间量。您应确保数据可以在此时间段内使用。这不是删除策略。TTL 期限后可能不会立即删除数据。
+ 流持久性。选择将流保存到文件系统，以便在核心重新启动期间保留数据或将流保存在内存中。
+ 起始序列号。指定要在导出中用作起始消息的消息的序列号。

有关 `MessageStreamDefinition` 的更多信息，请参阅目标语言的开发工具包参考：
+ [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html)在 Java 开发工具包中
+ [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html)在 Node.js 软件开发工具包中
+ [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.MessageStreamDefinition)在 Python 软件开发工具包中

**注意**  
<a name="streammanagerclient-http-config"></a>`StreamManagerClient` 还提供了一个可用于将流导出到 HTTP 服务器的目标。此目标仅用于测试目的。其不稳定，或不支持在生产环境中使用。

创建流后，您的 Greengrass 组件可以[将消息附加](#streammanagerclient-append-message)到流中以发送数据以供导出，并从流中[读取消息](#streammanagerclient-read-messages)以进行本地处理。您创建的流数量取决于您的硬件功能和业务案例。一种策略是为 AWS IoT Analytics 或 Kinesis 数据流中的每个目标频道创建一个流，但您可以为一个流定义多个目标。流具有持久的使用寿命。

### 要求
<a name="streammanagerclient-create-message-stream-reqs"></a>

此操作具有以下要求：
+ <a name="streammanagerclient-min-sm-sdk"></a>流管理器 SDK 最低版本：Python：1.1.0 \$1 Java：1.1.0 \$1 Node.js：1.1.0

### 示例
<a name="streammanagerclient-create-message-stream-examples"></a>

以下代码段创建一个名为 `StreamName` 的流。它定义了 `MessageStreamDefinition` 中的流属性和从属的数据类型。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    client.create_message_stream(MessageStreamDefinition(
        name="StreamName",    # Required.
        max_size=268435456,    # Default is 256 MB.
        stream_segment_size=16777216,    # Default is 16 MB.
        time_to_live_millis=None,    # By default, no TTL is enabled.
        strategy_on_full=StrategyOnFull.OverwriteOldestData,    # Required.
        persistence=Persistence.File,    # Default is File.
        flush_on_write=False,    # Default is false.
        export_definition=ExportDefinition(    # Optional. Choose where/how the stream is exported to the AWS 云.
            kinesis=None,
            iot_analytics=None,
            iot_sitewise=None,
            s3_task_executor=None
        )
    ))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 参考：[创建消息流 \$1 [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.MessageStreamDefinition)](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.create_message_stream)

------
#### [ Java ]

```
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
    client.createMessageStream(
            new MessageStreamDefinition()
                    .withName("StreamName") // Required.
                    .withMaxSize(268435456L)    // Default is 256 MB.
                    .withStreamSegmentSize(16777216L)    // Default is 16 MB.
                    .withTimeToLiveMillis(null)    // By default, no TTL is enabled.
                    .withStrategyOnFull(StrategyOnFull.OverwriteOldestData)    // Required.
                    .withPersistence(Persistence.File)    // Default is File.
                    .withFlushOnWrite(false)    // Default is false.
                    .withExportDefinition(    // Optional. Choose where/how the stream is exported to the AWS 云.
                            new ExportDefinition()
                                    .withKinesis(null)
                                    .withIotAnalytics(null)
                                    .withIotSitewise(null)
                                    .withS3(null)
                    )
 
    );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 开发工具包参考：[createMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#createMessageStream-com.amazonaws.greengrass.streammanager.model.MessageStreamDefinition-)\$1 [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
  try {
    await client.createMessageStream(
      new MessageStreamDefinition()
        .withName("StreamName") // Required.
        .withMaxSize(268435456)  // Default is 256 MB.
        .withStreamSegmentSize(16777216)  // Default is 16 MB.
        .withTimeToLiveMillis(null)  // By default, no TTL is enabled.
        .withStrategyOnFull(StrategyOnFull.OverwriteOldestData)  // Required.
        .withPersistence(Persistence.File)  // Default is File.
        .withFlushOnWrite(false)  // Default is false.
        .withExportDefinition(  // Optional. Choose where/how the stream is exported to the AWS 云.
          new ExportDefinition()
            .withKinesis(null)
            .withIotAnalytics(null)
            .withIotSiteWise(null)
            .withS3(null)
        )
    );
  } catch (e) {
    // Properly handle errors.
  }
});
client.onError((err) => {
  // Properly handle connection errors.
  // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 参考：[createMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#createMessageStream)\$1 [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html)

------

有关配置导出目标的更多信息，请参阅[导出支持的 AWS 云 目标的配置](stream-export-configurations.md)。

## 附加消息
<a name="streammanagerclient-append-message"></a>

要将数据发送到流管理器进行导出，您的 Greengrass 组件会将数据附加到目标流。导出目标决定要传递给此方法的数据类型。

### 要求
<a name="streammanagerclient-append-message-reqs"></a>

此操作具有以下要求：
+ <a name="streammanagerclient-min-sm-sdk"></a>流管理器 SDK 最低版本：Python：1.1.0 \$1 Java：1.1.0 \$1 Node.js：1.1.0

### 示例
<a name="streammanagerclient-append-message-examples"></a>

#### AWS IoT Analytics 或 Kinesis Data Streams 导出目的地
<a name="streammanagerclient-append-message-blob"></a>

以下代码段将消息附加到名为 `StreamName` 的流。对于我们 AWS IoT Analytics 的 Kinesis Data Streams 目标，你的 Greengrass 组件会附加一大堆数据。

此代码段具有以下要求：
+ <a name="streammanagerclient-min-sm-sdk"></a>流管理器 SDK 最低版本：Python：1.1.0 \$1 Java：1.1.0 \$1 Node.js：1.1.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data')
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python 开发工具包参考：[append\$1message](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.append_message)

------
#### [ Java ]

```
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
    long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes());
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 开发工具包参考：[appendMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array"));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js 开发工具包参考：[appendMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage)

------

#### AWS IoT SiteWise 出口目的地
<a name="streammanagerclient-append-message-sitewise"></a>

以下代码段将消息附加到名为 `StreamName` 的流。对于 AWS IoT SiteWise 目的地，您的 Greengrass 组件会附加一个序列化对象。`PutAssetPropertyValueEntry`有关更多信息，请参阅 [导出到 AWS IoT SiteWise](stream-export-configurations.md#export-streams-to-sitewise)。

**注意**  
<a name="BatchPutAssetPropertyValue-data-reqs"></a>当您向发送数据时 AWS IoT SiteWise，您的数据必须满足`BatchPutAssetPropertyValue`操作的要求。有关更多信息，请参阅《AWS IoT SiteWise API Reference》**中的 [BatchPutAssetPropertyValue](https://docs.aws.amazon.com/iot-sitewise/latest/APIReference/API_BatchPutAssetPropertyValue.html)。

此代码段具有以下要求：
+ <a name="streammanagerclient-min-sm-sdk"></a>流管理器 SDK 最低版本：Python：1.1.0 \$1 Java：1.1.0 \$1 Node.js：1.1.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    # SiteWise requires unique timestamps in all messages and also needs timestamps not earlier
    # than 10 minutes in the past. Add some randomness to time and offset.

    # Note: To create a new asset property data, you should use the classes defined in the
    # greengrasssdk.stream_manager module.

    time_in_nanos = TimeInNanos(
        time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
    )
    variant = Variant(double_value=random.random())
    asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
    putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset)
    sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 参考：[追加消息 \$1 [PutAssetPropertyValueEntry](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.PutAssetPropertyValueEntry)](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.append_message)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    Random rand = new Random();
    // Note: To create a new asset property data, you should use the classes defined in the
    // com.amazonaws.greengrass.streammanager.model.sitewise package.
    List<AssetPropertyValue> entries = new ArrayList<>() ;

    // IoTSiteWise requires unique timestamps in all messages and also needs timestamps not earlier
    // than 10 minutes in the past. Add some randomness to time and offset.
    final int maxTimeRandomness = 60;
    final int maxOffsetRandomness = 10000;
    double randomValue = rand.nextDouble();
    TimeInNanos timestamp = new TimeInNanos()
            .withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness))
            .withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness)));
    AssetPropertyValue entry = new AssetPropertyValue()
            .withValue(new Variant().withDoubleValue(randomValue))
            .withQuality(Quality.GOOD)
            .withTimestamp(timestamp);
    entries.add(entry);

    PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
            .withEntryId(UUID.randomUUID().toString())
            .withPropertyAlias("PropertyAlias")
            .withPropertyValues(entries);
    long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK 参考：[附加消息 \$1 [PutAssetPropertyValueEntry](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/sitewise/PutAssetPropertyValueEntry.html)](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const maxTimeRandomness = 60;
        const maxOffsetRandomness = 10000;
        const randomValue = Math.random();
        // Note: To create a new asset property data, you should use the classes defined in the
        // aws-greengrass-core-sdk StreamManager module.
        const timestamp = new TimeInNanos()
            .withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness))
            .withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness));
        const entry = new AssetPropertyValue()
            .withValue(new Variant().withDoubleValue(randomValue))
            .withQuality(Quality.GOOD)
            .withTimestamp(timestamp);

        const putAssetPropertyValueEntry =    new PutAssetPropertyValueEntry()
            .withEntryId(`${ENTRY_ID_PREFIX}${i}`)
            .withPropertyAlias("PropertyAlias")
            .withPropertyValues([entry]);
        const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 参考：[追加消息 \$1 [PutAssetPropertyValueEntry](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.PutAssetPropertyValueEntry.html)](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage)

------

#### Amazon S3 导出目标
<a name="streammanagerclient-append-message-export-task"></a>

以下代码段将导出任务附加到名为 `StreamName` 的流。对于 Amazon S3 目标，您的 Greengrass 组件会附加一个序列化 `S3ExportTaskDefinition` 对象，其中包含有关源输入文件和目标 Amazon S3 对象的信息。如果指定的对象不存在，流管理器会为您创建。有关更多信息，请参阅 [导出到 Amazon S3](stream-export-configurations.md#export-streams-to-s3)。

此代码段具有以下要求：
+ <a name="streammanagerclient-min-sm-sdk"></a>流管理器 SDK 最低版本：Python：1.1.0 \$1 Java：1.1.0 \$1 Node.js：1.1.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    # Append an Amazon S3 Task definition and print the sequence number.
    s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName")
    sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

[Python SDK 参考：[追加消息 \$1 S3](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.append_message) ExportTaskDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.S3ExportTaskDefinition)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    // Append an Amazon S3 export task definition and print the sequence number.
    S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition()
        .withBucket("BucketName")
        .withKey("KeyName")
        .withInputUrl("URLToFile");
    long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition));
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

[Java SDK 参考：[附加消息 \$1](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-) S3 ExportTaskDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/S3ExportTaskDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
     // Append an Amazon S3 export task definition and print the sequence number.
     const taskDefinition = new S3ExportTaskDefinition()
        .withBucket("BucketName")
        .withKey("KeyName")
        .withInputUrl("URLToFile");
        const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition)));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

[Node.js SDK 参考：[追加消息 \$1](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage) S3 ExportTaskDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.S3ExportTaskDefinition.html)

------

## 读取消息
<a name="streammanagerclient-read-messages"></a>

从流读取消息。

### 要求
<a name="streammanagerclient-read-messages-reqs"></a>

此操作具有以下要求：
+ <a name="streammanagerclient-min-sm-sdk"></a>流管理器 SDK 最低版本：Python：1.1.0 \$1 Java：1.1.0 \$1 Node.js：1.1.0

### 示例
<a name="streammanagerclient-read-messages-examples"></a>

以下代码段读取名为 `StreamName` 的流中的消息。read 方法接受一个可选 `ReadMessagesOptions` 对象，该对象指定要开始读取的序列号、要读取的最小数量和最大数量以及读取消息的超时。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    message_list = client.read_messages(
        stream_name="StreamName",
        # By default, if no options are specified, it tries to read one message from the beginning of the stream.
        options=ReadMessagesOptions(
            desired_start_sequence_number=100,
            # Try to read from sequence number 100 or greater. By default, this is 0.
            min_message_count=10,
            # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
            max_message_count=100,    # Accept up to 100 messages. By default this is 1.
            read_timeout_millis=5000
            # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
        )
    )
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 参考：[阅读消息 \$1 [ReadMessagesOptions](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.ReadMessagesOptions)](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.read_messages)

------
#### [ Java ]

```
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
    List<Message> messages = client.readMessages("StreamName",
            // By default, if no options are specified, it tries to read one message from the beginning of the stream.
            new ReadMessagesOptions()
                    // Try to read from sequence number 100 or greater. By default this is 0.
                    .withDesiredStartSequenceNumber(100L)
                    // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
                    .withMinMessageCount(10L)
                    // Accept up to 100 messages. By default this is 1.
                    .withMaxMessageCount(100L)
                    // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
                    .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis())
    );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 开发工具包参考：[阅读消息 \$1 [ReadMessagesOptions](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/ReadMessagesOptions.html)](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#readMessages-java.lang.String-com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const messages = await client.readMessages("StreamName",
            // By default, if no options are specified, it tries to read one message from the beginning of the stream.
            new ReadMessagesOptions()
                // Try to read from sequence number 100 or greater. By default this is 0.
                .withDesiredStartSequenceNumber(100)
                // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1.
                .withMinMessageCount(10)
                // Accept up to 100 messages. By default this is 1.
                .withMaxMessageCount(100)
                // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
                .withReadTimeoutMillis(5 * 1000)
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 参考：[阅读消息 \$1 [ReadMessagesOptions](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.ReadMessagesOptions.html)](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#readMessages)

------

## 列出流
<a name="streammanagerclient-list-streams"></a>

在流管理器中获取流列表。

### 要求
<a name="streammanagerclient-list-streams-reqs"></a>

此操作具有以下要求：
+ <a name="streammanagerclient-min-sm-sdk"></a>流管理器 SDK 最低版本：Python：1.1.0 \$1 Java：1.1.0 \$1 Node.js：1.1.0

### 示例
<a name="streammanagerclient-list-streams-examples"></a>

以下代码段获取流管理器中的流列表（按名称）。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    stream_names = client.list_streams()
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python 开发工具包参考：[list\$1streams](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.list_streams)

------
#### [ Java ]

```
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
    List<String> streamNames = client.listStreams();
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 开发工具包参考：[listStreams](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#listStreams--)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const streams = await client.listStreams();
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js 开发工具包参考：[listStreams](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#listStreams)

------

## 描述消息流
<a name="streammanagerclient-describe-message-stream"></a>

获取有关流的元数据，包括流定义、大小和导出状态。

### 要求
<a name="streammanagerclient-describe-message-stream-reqs"></a>

此操作具有以下要求：
+ <a name="streammanagerclient-min-sm-sdk"></a>流管理器 SDK 最低版本：Python：1.1.0 \$1 Java：1.1.0 \$1 Node.js：1.1.0

### 示例
<a name="streammanagerclient-describe-message-stream-examples"></a>

以下代码段获取有关名为 `StreamName` 的流的元数据，包括流的定义、大小和导出程序状态。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    stream_description = client.describe_message_stream(stream_name="StreamName")
    if stream_description.export_statuses[0].error_message:
        # The last export of export destination 0 failed with some error
        # Here is the last sequence number that was successfully exported
        stream_description.export_statuses[0].last_exported_sequence_number
 
    if (stream_description.storage_status.newest_sequence_number >
            stream_description.export_statuses[0].last_exported_sequence_number):
        pass
        # The end of the stream is ahead of the last exported sequence number
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python 开发工具包参考：[describe\$1message\$1stream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.describe_message_stream)

------
#### [ Java ]

```
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
    MessageStreamInfo description = client.describeMessageStream("StreamName");
    String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage();
    if (lastErrorMessage != null && !lastErrorMessage.equals("")) {
        // The last export of export destination 0 failed with some error.
        // Here is the last sequence number that was successfully exported.
        description.getExportStatuses().get(0).getLastExportedSequenceNumber();
    }
 
    if (description.getStorageStatus().getNewestSequenceNumber() >
            description.getExportStatuses().get(0).getLastExportedSequenceNumber()) {
        // The end of the stream is ahead of the last exported sequence number.
    }
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 开发工具包参考：[describeMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#describeMessageStream-java.lang.String-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const description = await client.describeMessageStream("StreamName");
        const lastErrorMessage = description.exportStatuses[0].errorMessage;
        if (lastErrorMessage) {
            // The last export of export destination 0 failed with some error.
            // Here is the last sequence number that was successfully exported.
            description.exportStatuses[0].lastExportedSequenceNumber;
        }
 
        if (description.storageStatus.newestSequenceNumber >
            description.exportStatuses[0].lastExportedSequenceNumber) {
            // The end of the stream is ahead of the last exported sequence number.
        }
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 参考：[describeMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#describeMessageStream)

------

## 更新消息流
<a name="streammanagerclient-update-message-stream"></a>

更新现有流的属性。如果流创建后您的要求发生变化，则可能需要更新流。例如：
+ 为 AWS 云 目标添加新的[导出配置](stream-export-configurations.md)。
+ 增加流的最大大小以更改数据的导出或保留方式。例如，将流大小与您在完整设置下的策略相结合，可能会导致数据在流管理器处理之前被删除或拒绝。
+ 暂停然后恢复导出；例如，如果导出任务运行时间较长，而您想对上传数据进行配给。

您的 Greengrass 组件遵循以下高级流程来更新流：

1. [获取流的描述。](#streammanagerclient-describe-message-stream)

1. 更新相应 `MessageStreamDefinition` 和从属对象的目标属性。

1. 传入更新后的 `MessageStreamDefinition`。请务必包含更新后的流的完整对象定义。未定义属性将恢复为默认值。

   可以指定要在导出中用作起始消息的消息的序列号。

### 要求
<a name="streammanagerclient-update-message-streamreqs"></a>

此操作具有以下要求：
+ <a name="streammanagerclient-min-sm-sdk"></a>流管理器 SDK 最低版本：Python：1.1.0 \$1 Java：1.1.0 \$1 Node.js：1.1.0

### 示例
<a name="streammanagerclient-update-message-stream-examples"></a>

以下代码段更新名为 `StreamName` 的流。它会更新导出到 Kinesis Data Streams 的流的多个属性。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    message_stream_info = client.describe_message_stream(STREAM_NAME)
    message_stream_info.definition.max_size=536870912
    message_stream_info.definition.stream_segment_size=33554432
    message_stream_info.definition.time_to_live_millis=3600000
    message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData
    message_stream_info.definition.persistence=Persistence.Memory
    message_stream_info.definition.flush_on_write=False
    message_stream_info.definition.export_definition.kinesis=
        [KinesisConfig(    
            # Updating Export definition to add a Kinesis Stream configuration.
            identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))]
    client.update_message_stream(message_stream_info.definition)
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python 开发工具包参考：[updateMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.update_message_stream)\$1 [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.MessageStreamDefinition)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME);
    // Update the message stream with new values.
    client.updateMessageStream(
        messageStreamInfo.getDefinition()
            .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
            // Max Size update should be greater than initial Max Size defined in Create Message Stream request
            .withMaxSize(536870912L) // Update Max Size to 512 MB.
            .withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB.
            .withFlushOnWrite(true) // Update flush on write to true.
            .withPersistence(Persistence.Memory) // Update the persistence to Memory.
            .withTimeToLiveMillis(3600000L)    // Update TTL to 1 hour.
            .withExportDefinition(
                // Optional. Choose where/how the stream is exported to the AWS 云.
                messageStreamInfo.getDefinition().getExportDefinition().
                    // Updating Export definition to add a Kinesis Stream configuration.
                    .withKinesis(new ArrayList<KinesisConfig>() {{
                        add(new KinesisConfig()
                            .withIdentifier(EXPORT_IDENTIFIER)
                            .withKinesisStreamName("test"));
                        }})
            );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java SDK 参考：[update](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#updateMessageStream-java.lang.String-) \$1message\$1stream [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const messageStreamInfo = await c.describeMessageStream(STREAM_NAME);
        await client.updateMessageStream(
            messageStreamInfo.definition
                // Max Size update should be greater than initial Max Size defined in Create Message Stream request
                .withMaxSize(536870912)    // Default is 256 MB. Updating Max Size to 512 MB.
                .withStreamSegmentSize(33554432)    // Default is 16 MB. Updating Segment Size to 32 MB.
                .withTimeToLiveMillis(3600000)    // By default, no TTL is enabled. Update TTL to 1 hour.
                .withStrategyOnFull(StrategyOnFull.RejectNewData)    // Required. Updating Strategy on full to reject new data.
                .withPersistence(Persistence.Memory)    // Default is File. Update the persistence to Memory
                .withFlushOnWrite(true)    // Default is false. Updating to true.
                .withExportDefinition(    
                    // Optional. Choose where/how the stream is exported to the AWS 云.
                    messageStreamInfo.definition.exportDefinition
                        // Updating Export definition to add a Kinesis Stream configuration.
                        .withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())])
                )
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 参考：[updateMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#updateMessageStream)\$1 [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html)

------

### 更新流时的限制
<a name="streammanagerclient-update-constraints"></a>

更新流时，有以下限制。除非在以下列表中注明，否则更新会立即生效。
+ 无法更新流的持久性。要更改此行为，[请删除该流](#streammanagerclient-delete-message-stream)然后[创建一个流](#streammanagerclient-create-message-stream)以定义新的持久性策略。
+ 只有在以下条件下，您才能更新流的最大大小：
  + 最大大小必须大于或等于流的当前大小。<a name="messagestreaminfo-describe-stream"></a>要查找此信息，请[描述流](#streammanagerclient-describe-message-stream)，然后检查返回的 `MessageStreamInfo` 对象的存储状态。
  + 最大大小必须大于或等于流的段大小。
+ 可以将流的段大小更新为小于流的最大大小的值。更新的设置将应用于新的段。
+ 生存时间（TTL）属性的更新将应用于新的追加操作。如果您减小此值，流管理器也可能会删除超过 TTL 的现有段。
+ 对全属性策略的更新将应用于新的追加操作。如果您将策略设置为覆盖最旧的数据，则流管理器还可能根据新设置覆盖现有段。
+ 写入时刷新属性的更新将应用于新消息。
+ 导出配置的更新将应用于新的导出。更新请求必须包含您想要支持的所有导出配置。否则，流管理器会将其删除。
  + 更新导出配置时，请指定目标导出配置的标识符。
  + 要添加导出配置，请为新的导出配置指定唯一标识符。
  + 要删除导出配置，请省略导出配置。
+ 要[更新](#streammanagerclient-update-message-stream)流中导出配置的起始序列号，必须指定一个小于最新序列号的值。<a name="messagestreaminfo-describe-stream"></a>要查找此信息，请[描述流](#streammanagerclient-describe-message-stream)，然后检查返回的 `MessageStreamInfo` 对象的存储状态。

## 删除消息流
<a name="streammanagerclient-delete-message-stream"></a>

删除流。删除流时，流的所有存储数据将从磁盘中删除。

### 要求
<a name="streammanagerclient-delete-message-stream-reqs"></a>

此操作具有以下要求：
+ <a name="streammanagerclient-min-sm-sdk"></a>流管理器 SDK 最低版本：Python：1.1.0 \$1 Java：1.1.0 \$1 Node.js：1.1.0

### 示例
<a name="streammanagerclient-delete-message-stream-examples"></a>

以下代码段删除名为 `StreamName` 的流。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    client.delete_message_stream(stream_name="StreamName")
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python 开发工具包参考：[deleteMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.delete_message_stream)

------
#### [ Java ]

```
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
    client.deleteMessageStream("StreamName");
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 开发工具包参考：[delete\$1message\$1stream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#deleteMessageStream-java.lang.String-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        await client.deleteMessageStream("StreamName");
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 参考：[deleteMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#deleteMessageStream)

------

## 另请参阅
<a name="work-with-streams-see-also"></a>
+ [管理 Greengrass 核心设备上的数据流](manage-data-streams.md)
+ [配置 AWS IoT Greengrass 流管理器](configure-stream-manager.md)
+ [导出支持的 AWS 云 目标的配置](stream-export-configurations.md)
+ 流管理器 SDK 参考中的 `StreamManagerClient`：
  + [Python](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html)
  + [Java](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html)
  + [Node.js](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html)

# 导出支持的 AWS 云 目标的配置
<a name="stream-export-configurations"></a>

用户定义的 Greengrass 组件在流管理器 SDK 中使用 `StreamManagerClient` 与流管理器交互。当组件[创建流](work-with-streams.md#streammanagerclient-create-message-stream)或[更新流](work-with-streams.md#streammanagerclient-create-message-stream)时，它会传递一个表示流属性的 `MessageStreamDefinition` 对象，包括导出定义。`ExportDefinition` 对象包含为流定义的导出配置。流管理器使用这些导出配置来确定将流导出到何处以及如何导出。

![\[ExportDefinition 属性类型的对象模型图。\]](http://docs.aws.amazon.com/zh_cn/greengrass/v2/developerguide/images/stream-manager-exportconfigs.png)


您可以为一个流定义零个或多个导出配置，包括针对单个目标类型的多个导出配置。例如，您可以将一个流导出到两个 AWS IoT Analytics 通道和一个 Kinesis 数据流。

对于失败的导出尝试，流管理器会持续重试将数据导出到，间隔不超过五分钟。 AWS 云 重试次数没有最大限制。

**注意**  
<a name="streammanagerclient-http-config"></a>`StreamManagerClient` 还提供了一个可用于将流导出到 HTTP 服务器的目标。此目标仅用于测试目的。其不稳定，或不支持在生产环境中使用。

**Topics**
+ [AWS IoT Analytics 频道](#export-to-iot-analytics)
+ [Amazon Kinesis data streams](#export-to-kinesis)
+ [AWS IoT SiteWise 资产属性](#export-to-iot-sitewise)
+ [Amazon S3 对象](#export-to-s3)

您有责任维护这些 AWS 云 资源。

## AWS IoT Analytics 频道
<a name="export-to-iot-analytics"></a>

直播管理器支持自动导出到 AWS IoT Analytics。 <a name="ita-export-destination"></a>AWS IoT Analytics 允许您对数据进行高级分析，以帮助做出业务决策和改进机器学习模型。有关更多信息，请参阅[什么是 AWS IoT Analytics？](https://docs.aws.amazon.com/iotanalytics/latest/userguide/welcome.html) 在《*AWS IoT Analytics 用户指南》*中。

在流管理器 SDK 中，您的 Greengrass 组件使用 `IoTAnalyticsConfig` 来定义此目标类型的导出配置。有关更多信息，请参阅目标语言的开发工具包参考：
+ Python SDK 中的 [Io TAnalytics Config](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.IoTAnalyticsConfig)
+ Java SDK 中的 [Io TAnalytics Config](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/export/IoTAnalyticsConfig.html)
+ Node.js SDK 中的 [Io TAnalytics Config](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.IoTAnalyticsConfig.html)

### 要求
<a name="export-to-iot-analytics-reqs"></a>

此导出目的地具有以下要求：
+ 中的目标频道 AWS IoT Analytics 必须与 Greengrass 核心设备相同 AWS 账户 。 AWS 区域 
+ [授权核心设备与 AWS 服务交互](device-service-role.md) 必须允许对目标通道的 `iotanalytics:BatchPutMessage` 权限。例如：

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "iotanalytics:BatchPutMessage"
        ],
        "Resource": [
          "arn:aws:iotanalytics:us-east-1:123456789012:channel/channel_1_name",
          "arn:aws:iotanalytics:us-east-1:123456789012:channel/channel_2_name"
        ]
      }
    ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以授予对资源的具体或条件访问权限（例如，通过使用通配符 `*` 命名方案）。有关更多信息，请参阅 *IAM 用户指南*中的[添加和删除 IAM policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。

### 导出到 AWS IoT Analytics
<a name="export-streams-to-iot-analytics"></a>

要创建导出到的流 AWS IoT Analytics，Greengrass [组件会创建一个包含一个](work-with-streams.md#streammanagerclient-create-message-stream)或多个对象的导出定义的流。`IoTAnalyticsConfig`此对象定义导出设置，例如目标频道、批次大小、批次间隔和优先级。

当您的 Greengrass 组件从设备接收数据时，它们会[将包含大量数据的消息附加](work-with-streams.md#streammanagerclient-append-message)到目标流。

然后，流管理器根据流的导出配置中定义的批处理设置和优先级导出数据。

## Amazon Kinesis data streams
<a name="export-to-kinesis"></a>

流管理器支持自动导出到 Amazon Kinesis Data Streams。<a name="aks-export-destination"></a>Kinesis Data Streams 通常用于聚合大量数据并将其加载到数据仓库 MapReduce 或集群中。有关更多信息，请参阅 *Amazon Kinesis 开发人员指南*中的[什么是 Amazon Kinesis Data Streams？](https://docs.aws.amazon.com/streams/latest/dev/what-is-this-service.html)。

在流管理器 SDK 中，您的 Greengrass 组件使用 `KinesisConfig` 来定义此目标类型的导出配置。有关更多信息，请参阅目标语言的开发工具包参考：
+ [KinesisConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.KinesisConfig)在 Python 软件开发工具包中
+ [KinesisConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/export/KinesisConfig.html)在 Java 开发工具包中
+ [KinesisConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.KinesisConfig.html)在 Node.js SDK 中

### 要求
<a name="export-to-kinesis-reqs"></a>

此导出目的地具有以下要求：
+ Kinesis Data Streams 中的目标流必须与 Greengrass 核心设备 AWS 账户 相同 AWS 区域 。
+ （推荐）流管理器 v2.2.1 提高了将流导出至 Kinesis Data Streams 目的地的性能。要使用该最新版本中的改进之处，请将[流管理器组件](stream-manager-component.md)升级至 v2.2.1，然后使用 [Greengrass 令牌交换角色](device-service-role.md)中的 `kinesis:ListShards` 策略。
+ [授权核心设备与 AWS 服务交互](device-service-role.md) 必须允许对数据流的 `kinesis:PutRecords` 权限。例如：

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "kinesis:PutRecords"
        ],
        "Resource": [
          "arn:aws:kinesis:us-east-1:123456789012:stream/stream_1_name",
          "arn:aws:kinesis:us-east-1:123456789012:stream/stream_2_name"
        ]
      }
    ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以授予对资源的具体或条件访问权限（例如，通过使用通配符 `*` 命名方案）。有关更多信息，请参阅 *IAM 用户指南*中的[添加和删除 IAM policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。

### 导出到 Kinesis Data Streams
<a name="export-streams-to-kinesis"></a>

为创建导出到 Kinesis Data Streams 的流，您的 Greengrass 组件会使用包含一个或多个 `KinesisConfig` 对象的导出定义[创建流](work-with-streams.md#streammanagerclient-create-message-stream)。此对象定义导出设置，例如目标数据流、批次大小、批次间隔和优先级。

当您的 Greengrass 组件从设备接收数据时，它们会[将包含大量数据的消息附加](work-with-streams.md#streammanagerclient-append-message)到目标流。然后，流管理器根据流的导出配置中定义的批处理设置和优先级导出数据。

流管理器会为上传到 Amazon Kinesis 的每条记录生成一个唯一的随机 UUID 作为分区键。

## AWS IoT SiteWise 资产属性
<a name="export-to-iot-sitewise"></a>

直播管理器支持自动导出到 AWS IoT SiteWise。 <a name="itsw-export-destination"></a>AWS IoT SiteWise 允许您大规模收集、组织和分析来自工业设备的数据。有关更多信息，请参阅[什么是 AWS IoT SiteWise？](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/what-is-sitewise.html) 在《*AWS IoT SiteWise 用户指南》*中。

在流管理器 SDK 中，您的 Greengrass 组件使用 `IoTSiteWiseConfig` 来定义此目标类型的导出配置。有关更多信息，请参阅目标语言的开发工具包参考：
+ Python SDK TSite WiseConfig 中的 [Io](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.IoTSiteWiseConfig)
+ Java SDK TSite WiseConfig 中的 [Io](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/export/IoTSiteWiseConfig.html)
+ Node.js SDK TSite WiseConfig 中的 [Io](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.IoTSiteWiseConfig.html)

**注意**  
AWS 还提供了 AWS IoT SiteWise 组件，这些组件提供了一个预先构建的解决方案，可用于流式传输来自 OPC-UA 来源的数据。有关更多信息，请参阅 [物联网 SiteWise OPC UA 采集器](iotsitewise-opcua-collector-component.md)。

### 要求
<a name="export-to-iot-sitewise-reqs"></a>

此导出目的地具有以下要求：
+ 中的目标资产属性 AWS IoT SiteWise 必须与 Greengrass 核心设备相同 AWS 账户 。 AWS 区域 
**注意**  
有关 AWS IoT SiteWise 支持的列表，请参阅《 AWS 区域*AWS 一般参考*》中的[AWS IoT SiteWise 终端节点和配额](https://docs.aws.amazon.com/general/latest/gr/iot-sitewise.html#iot-sitewise_region)。
+ [授权核心设备与 AWS 服务交互](device-service-role.md) 必须允许对目标资产属性的 `iotsitewise:BatchPutAssetPropertyValue` 权限。以下示例策略使用 `iotsitewise:assetHierarchyPath` 条件键来授予对目标根资产及其子项的访问权限。您可以`Condition`从策略中删除，以允许访问您的所有 AWS IoT SiteWise 资产或指定单个 ARNs 资产。

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
         "Effect": "Allow",
         "Action": "iotsitewise:BatchPutAssetPropertyValue",
         "Resource": "*",
         "Condition": {
           "StringLike": {
             "iotsitewise:assetHierarchyPath": [
               "/root node asset ID",
               "/root node asset ID/*"
             ]
           }
         }
      }
    ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以授予对资源的具体或条件访问权限（例如，通过使用通配符 `*` 命名方案）。有关更多信息，请参阅 *IAM 用户指南*中的[添加和删除 IAM policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。

  有关重要的安全信息，请参阅《*AWS IoT SiteWise 用户指南*》中的[ BatchPutAssetPropertyValue 授权](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/security_iam_service-with-iam.html#security_iam_service-with-iam-id-based-policies-batchputassetpropertyvalue-action)。

### 导出到 AWS IoT SiteWise
<a name="export-streams-to-sitewise"></a>

要创建导出到的流 AWS IoT SiteWise，Greengrass [组件会创建一个包含一个](work-with-streams.md#streammanagerclient-create-message-stream)或多个对象的导出定义的流。`IoTSiteWiseConfig`此对象定义导出设置，例如批次大小、批次间隔和优先级。

当您的 Greengrass 组件从设备接收资产属性数据时，它们会将包含数据的消息附加到目标流。消息是 JSON 序列化的 `PutAssetPropertyValueEntry` 对象，其中包含一个或多个资产属性的属性值。有关更多信息，请参阅为 AWS IoT SiteWise 导出目标[追加消息](work-with-streams.md#streammanagerclient-append-message-sitewise)。

**注意**  
<a name="BatchPutAssetPropertyValue-data-reqs"></a>当您向发送数据时 AWS IoT SiteWise，您的数据必须满足`BatchPutAssetPropertyValue`操作的要求。有关更多信息，请参阅《AWS IoT SiteWise API Reference》**中的 [BatchPutAssetPropertyValue](https://docs.aws.amazon.com/iot-sitewise/latest/APIReference/API_BatchPutAssetPropertyValue.html)。

然后，流管理器根据流的导出配置中定义的批处理设置和优先级导出数据。

您可以调整流管理器设置和 Greengrass 组件逻辑来设计您的导出策略。例如：
+ 对于近乎实时的导出，请设置较低的批量大小和间隔设置，并在收到数据时将数据追加到流中。
+ 为了优化批处理、缓解带宽限制或最大限度地降低成本，Greengrass 组件可以在 timestamp-quality-value将数据追加到流之前，为单个资产属性汇集接收到的 (TQV) 数据点。一种策略是在一条消息中批量输入多达 10 种不同的财产资产组合或属性别名，而不是为同一个属性发送多个条目。这有助于流管理器保持在[AWS IoT SiteWise 配额](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/quotas.html)范围内。

## Amazon S3 对象
<a name="export-to-s3"></a>

流管理器支持自动导出到 Amazon S3。<a name="s3-export-destination"></a>您可以使用 Amazon S3 存储和检索大量的数据。有关更多信息，请参阅 [Amazon Simple Storage Service 开发人员指南](https://docs.aws.amazon.com/AmazonS3/latest/dev/Welcome.html)中的*什么是 Amazon S3？*。

在流管理器 SDK 中，您的 Greengrass 组件使用 `S3ExportTaskExecutorConfig` 来定义此目标类型的导出配置。有关更多信息，请参阅目标语言的开发工具包参考：
+ Python 软件开发工具包ExportTaskExecutorConfig中的 S@@ [3](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.S3ExportTaskExecutorConfig)
+ Java 开发工具包ExportTaskExecutorConfig中的 [S3](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/export/S3ExportTaskExecutorConfig.html)
+ Node.js 软件开发工具包ExportTaskExecutorConfig中的 [S3](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.S3ExportTaskExecutorConfig.html)

### 要求
<a name="export-to-s3-reqs"></a>

此导出目的地具有以下要求：
+ 目标 Amazon S3 存储桶必须与 Greengrass 核心设备 AWS 账户 相同。
+ 如果在 **Greengrass 容器**模式下运行的 Lambda 函数将输入文件写入输入文件目录，您必须将该目录作为卷挂载到具有写入权限的容器中。这样可以确保将文件写入根文件系统，并对容器外部运行的流管理器组件可见。
+ 如果 Docker 容器组件将输入文件写入输入文件目录，您必须将该目录作为卷挂载到具有写入权限的容器中。这样可以确保将文件写入根文件系统，并对容器外部运行的流管理器组件可见。
+ [授权核心设备与 AWS 服务交互](device-service-role.md) 必须允许对目标存储桶具有以下权限。例如：

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "s3:PutObject",
          "s3:AbortMultipartUpload",
          "s3:ListMultipartUploadParts"
        ],
        "Resource": [
          "arn:aws:s3:::bucket-1-name/*",
          "arn:aws:s3:::bucket-2-name/*"
        ]
      }
    ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以授予对资源的具体或条件访问权限（例如，通过使用通配符 `*` 命名方案）。有关更多信息，请参阅 *IAM 用户指南*中的[添加和删除 IAM policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。

### 导出到 Amazon S3
<a name="export-streams-to-s3"></a>

为创建导出到 Amazon S3 的流，您的 Greengrass 组件使用 `S3ExportTaskExecutorConfig` 对象来配置导出策略。该策略定义了导出设置，例如分段上传阈值和优先级。对于 Amazon S3 导出，流管理器会上传它从核心设备上的本地文件中读取的数据。要启动上传，您的 Greengrass 组件会将导出任务附加到目标流。导出任务包含有关输入文件和目标 Amazon S3 对象的信息。流管理器按照任务附加到流中的顺序运行任务。

**注意**  
 <a name="bucket-not-key-must-exist"></a>目标存储桶必须已经存在于您的中 AWS 账户。如果指定密钥的对象不存在，则流管理器会为您创建该对象。

Stream Manager 使用分段上传阈值属性、[最小分段大小](configure-stream-manager.md#stream-manager-minimum-part-size)设置和输入文件的大小来确定如何上传数据。分段上传阈值必须大于或等于最小分段大小。如果要并行上传数据，则可以创建多个流。

指定您的目标 Amazon S3 对象的密钥可以在`!{timestamp:value}`占位符中包含有效的 [Java DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html) 字符串。您可以使用这些时间戳占位符根据输入文件数据的上传时间对 Amazon S3 中的数据进行分区。例如，以下键名解析为诸如 `my-key/2020/12/31/data.txt` 之类的值。

```
my-key/!{timestamp:YYYY}/!{timestamp:MM}/!{timestamp:dd}/data.txt
```

**注意**  
如果要监控流的导出状态，请先创建一个状态流，然后将导出流配置为使用该状态流。有关更多信息，请参阅 [监控导出任务](#monitor-export-status-s3)。

#### 管理输入数据
<a name="manage-s3-input-data"></a>

您可以编写代码，供物联网应用用来管理输入数据的生命周期。以下示例工作流程显示了如何使用 Greengrass 组件来管理这些数据。

1. 本地进程从设备或外围设备接收数据，然后将数据写入核心设备上目录中的文件。这些是流管理器的输入文件。

1. Greengrass 组件会扫描该目录，并在创建新文件时[将导出任务附加](work-with-streams.md#streammanagerclient-append-message-export-task)到目标流。该任务是一个 JSON 序列化 `S3ExportTaskDefinition` 对象，用于指定输入文件的 URL、目标 Amazon S3 存储桶和密钥以及可选的用户元数据。

1. 流管理器读取输入文件并按照附加任务的顺序将数据导出到 Amazon S3。<a name="bucket-not-key-must-exist"></a>目标存储桶必须已经存在于您的中 AWS 账户。如果指定密钥的对象不存在，则流管理器会为您创建该对象。

1. Greengrass 组件从状态流[读取消息](work-with-streams.md#streammanagerclient-read-messages)以监控导出状态。导出任务完成后，Greengrass 组件可以删除相应的输入文件。有关更多信息，请参阅 [监控导出任务](#monitor-export-status-s3)。

### 监控导出任务
<a name="monitor-export-status-s3"></a>

您可以编写代码，让物联网应用程序监控您的 Amazon S3 导出状态。您的 Greengrass 组件必须创建状态流，然后配置导出流以将状态更新写入状态流。单个状态流可以接收来自导出到 Amazon S3 的多个流的状态更新。

首先，[创建一个流](work-with-streams.md#streammanagerclient-create-message-stream)以用作状态流。您可以为流配置大小和保留策略，以控制状态消息的生命周期。例如：
+ 如果您不想存储状态消息，请将 `Persistence` 设置为 `Memory`。
+ 将 `StrategyOnFull` 设置为 `OverwriteOldestData`，这样新的状态消息就不会丢失。

然后，创建或更新导出流以使用状态流。具体而言，设置流 `S3ExportTaskExecutorConfig` 导出配置的状态配置属性。此设置会告诉流管理器将有关导出任务的状态消息写入状态流。在 `StatusConfig` 对象中，指定状态流的名称和详细程度。以下支持的值范围从最低 verbose (`ERROR`) 到最长 verbose (`TRACE`) 不等。默认值为 `INFO`。
+ `ERROR`
+ `WARN`
+ `INFO`
+ `DEBUG`
+ `TRACE`

以下示例工作流程显示了 Greengrass 组件如何使用状态流来监控导出状态。

1. 如前面的工作流程所述，Greengrass 组件[将导出任务附加](work-with-streams.md#streammanagerclient-append-message-export-task)到流，后者配置为将有关导出任务的状态消息写入状态流。附加操作返回一个表示任务 ID 的序列号。

1. Greengrass 组件按顺序[读取](work-with-streams.md#streammanagerclient-read-messages)状态流中的消息，然后根据流名称和任务 ID 或消息上下文中的导出任务属性筛选消息。例如，Greengrass 组件可以按导出任务的输入文件 URL 进行筛选，该文件由消息上下文中的 `S3ExportTaskDefinition` 对象表示。

   以下状态代码指示导出任务已达到完成状态：
   + `Success`。上传已成功完成。
   + `Failure`。流管理器遇到错误，例如，指定的存储桶不存在。解决问题后，您可以再次将导出任务追加到流中。
   + `Canceled`。 由于流或导出定义已删除，或者任务的 time-to-live (TTL) 期限已过期，该任务已停止。
**注意**  
该任务的状态也可能为 `InProgress` 或 `Warning`。当事件返回不影响任务执行的错误时，流管理器会发出警告。例如，如果无法清理部分上传，则会返回警告。

1. 导出任务完成后，Greengrass 组件可以删除相应的输入文件。

以下示例显示 Greengrass 组件如何读取和处理状态消息。

------
#### [ Python ]

```
import time
from stream_manager import (
    ReadMessagesOptions,
    Status,
    StatusConfig,
    StatusLevel,
    StatusMessage,
    StreamManagerClient,
)
from stream_manager.util import Util

client = StreamManagerClient()
 
try:
    # Read the statuses from the export status stream
    is_file_uploaded_to_s3 = False
    while not is_file_uploaded_to_s3:
        try:
            messages_list = client.read_messages(
                "StatusStreamName", ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000)
            )
            for message in messages_list:
                # Deserialize the status message first.
                status_message = Util.deserialize_json_bytes_to_obj(message.payload, StatusMessage)

                # Check the status of the status message. If the status is "Success",
                # the file was successfully uploaded to S3.
                # If the status was either "Failure" or "Cancelled", the server was unable to upload the file to S3.
                # We will print the message for why the upload to S3 failed from the status message.
                # If the status was "InProgress", the status indicates that the server has started uploading
                # the S3 task.
                if status_message.status == Status.Success:
                    logger.info("Successfully uploaded file at path " + file_url + " to S3.")
                    is_file_uploaded_to_s3 = True
                elif status_message.status == Status.Failure or status_message.status == Status.Canceled:
                    logger.info(
                        "Unable to upload file at path " + file_url + " to S3. Message: " + status_message.message
                    )
                    is_file_uploaded_to_s3 = True
            time.sleep(5)
        except StreamManagerException:
            logger.exception("Exception while running")
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 参考：[阅读消息 \$1 [StatusMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.StatusMessage)](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.read_messages)

------
#### [ Java ]

```
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient;
import com.amazonaws.greengrass.streammanager.client.StreamManagerClientFactory;
import com.amazonaws.greengrass.streammanager.client.utils.ValidateAndSerialize;
import com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions;
import com.amazonaws.greengrass.streammanager.model.Status;
import com.amazonaws.greengrass.streammanager.model.StatusConfig;
import com.amazonaws.greengrass.streammanager.model.StatusLevel;
import com.amazonaws.greengrass.streammanager.model.StatusMessage;

 try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
    try {
        boolean isS3UploadComplete = false;
        while (!isS3UploadComplete) {
            try {
                // Read the statuses from the export status stream
                List<Message> messages = client.readMessages("StatusStreamName",
                    new ReadMessagesOptions().withMinMessageCount(1L).withReadTimeoutMillis(1000L));
                for (Message message : messages) {
                    // Deserialize the status message first.
                    StatusMessage statusMessage = ValidateAndSerialize.deserializeJsonBytesToObj(message.getPayload(), StatusMessage.class);
                    // Check the status of the status message. If the status is "Success", the file was successfully uploaded to S3.
                    // If the status was either "Failure" or "Canceled", the server was unable to upload the file to S3.
                    // We will print the message for why the upload to S3 failed from the status message.
                    // If the status was "InProgress", the status indicates that the server has started uploading the S3 task.
                    if (Status.Success.equals(statusMessage.getStatus())) {
                        System.out.println("Successfully uploaded file at path " + FILE_URL + " to S3.");
                        isS3UploadComplete = true;
                     } else if (Status.Failure.equals(statusMessage.getStatus()) || Status.Canceled.equals(statusMessage.getStatus())) {
                        System.out.println(String.format("Unable to upload file at path %s to S3. Message %s",
                            statusMessage.getStatusContext().getS3ExportTaskDefinition().getInputUrl(),
                            statusMessage.getMessage()));
                        sS3UploadComplete = true;
                    }
                }
            } catch (StreamManagerException ignored) {
            } finally {
                // Sleep for sometime for the S3 upload task to complete before trying to read the status message.
                Thread.sleep(5000);
            }
        } catch (e) {
        // Properly handle errors.
    }
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 开发工具包参考：[阅读消息 \$1 [StatusMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/StatusMessage.html)](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#readMessages-java.lang.String-com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions-)

------
#### [ Node.js ]

```
const {
    StreamManagerClient, ReadMessagesOptions,
    Status, StatusConfig, StatusLevel, StatusMessage,
    util,
} = require(*'aws-greengrass-stream-manager-sdk'*);

const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        let isS3UploadComplete = false;
        while (!isS3UploadComplete) {
            try {
                // Read the statuses from the export status stream
                const messages = await c.readMessages("StatusStreamName",
                    new ReadMessagesOptions()
                        .withMinMessageCount(1)
                        .withReadTimeoutMillis(1000));

                messages.forEach((message) => {
                    // Deserialize the status message first.
                    const statusMessage = util.deserializeJsonBytesToObj(message.payload, StatusMessage);
                    // Check the status of the status message. If the status is 'Success', the file was successfully uploaded to S3.
                    // If the status was either 'Failure' or 'Cancelled', the server was unable to upload the file to S3.
                    // We will print the message for why the upload to S3 failed from the status message.
                    // If the status was "InProgress", the status indicates that the server has started uploading the S3 task.
                    if (statusMessage.status === Status.Success) {
                        console.log(`Successfully uploaded file at path ${FILE_URL} to S3.`);
                        isS3UploadComplete = true;
                    } else if (statusMessage.status === Status.Failure || statusMessage.status === Status.Canceled) {
                        console.log(`Unable to upload file at path ${FILE_URL} to S3. Message: ${statusMessage.message}`);
                        isS3UploadComplete = true;
                    }
                });
                // Sleep for sometime for the S3 upload task to complete before trying to read the status message.
                await new Promise((r) => setTimeout(r, 5000));
            } catch (e) {
                // Ignored
            }
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 参考：[阅读消息 \$1 [StatusMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StatusMessage.html)](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#readMessages)

------