

支援終止通知：2026 年 10 月 7 日 AWS 將停止 的支援 AWS IoT Greengrass Version 1。2026 年 10 月 7 日之後，您將無法再存取 AWS IoT Greengrass V1 資源。如需詳細資訊，請造訪[從 遷移 AWS IoT Greengrass Version 1](https://docs.aws.amazon.com/greengrass/v2/developerguide/migrate-from-v1.html)。

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 StreamManagerClient 搭配串流
<a name="work-with-streams"></a>

在 AWS IoT Greengrass 核心上執行的使用者定義 Lambda 函數可以使用[AWS IoT Greengrass 核心 SDK](lambda-functions.md#lambda-sdks) 中的 `StreamManagerClient` 物件，在串流[管理員](stream-manager.md)中建立串流，然後與串流互動。當 Lambda 函數建立串流時，它會定義串流的 AWS 雲端 目的地、優先順序和其他匯出和資料保留政策。若要將資料傳送至串流管理員，Lambda 函數會將資料附加至串流。如果為串流定義匯出目的地，串流管理員會自動匯出串流。

**注意**  
<a name="stream-manager-clients"></a>一般而言，串流管理員的用戶端是使用者定義的 Lambda 函數。如果您的商業案例需要，您也可以允許在 Greengrass 核心 （例如 Docker 容器） 上執行的非 Lambda 程序與串流管理員互動。如需詳細資訊，請參閱[用戶端身分驗證](stream-manager.md#stream-manager-security-client-authentication)。

本主題中的程式碼片段會示範用戶端如何呼叫`StreamManagerClient`方法以使用串流。如需方法及其引數的實作詳細資訊，請使用每個程式碼片段後列出的 SDK 參考連結。如需包含完整 Python Lambda 函數的教學課程，請參閱 [將資料串流匯出至 AWS 雲端 （主控台）](stream-manager-console.md)或 [將資料串流匯出至 AWS 雲端 (CLI)](stream-manager-cli.md)。

您的 Lambda 函數應該在函數處理常式`StreamManagerClient`之外執行個體化。如果在處理常式內執行個體化，則該函數在每次叫用時都會建立 `client` 以及與串流管理員的連線。

**注意**  
如果您在處理常式內將 `StreamManagerClient` 執行個體化，則必須在 `client` 完成其工作時明確呼叫 `close()` 方法。否則，`client` 會將連線保持在開啟狀態，並將另一個執行緒保持在執行狀態，直到指令碼結束為止。

`StreamManagerClient` 支援下列操作：
+ [建立訊息串流](#streammanagerclient-create-message-stream)
+ [附加訊息](#streammanagerclient-append-message)
+ [讀取訊息](#streammanagerclient-read-messages)
+ [列出串流](#streammanagerclient-list-streams)
+ [描述訊息串流](#streammanagerclient-describe-message-stream)
+ [更新訊息串流](#streammanagerclient-update-message-stream)
+ [刪除訊息串流](#streammanagerclient-delete-message-stream)

## 建立訊息串流
<a name="streammanagerclient-create-message-stream"></a>

若要建立串流，使用者定義的 Lambda 函數會呼叫建立方法並傳入`MessageStreamDefinition`物件。此物件會指定串流的唯一名稱，並定義串流管理員在達到串流大小上限時應如何處理新資料。您可以使用 `MessageStreamDefinition` 及其資料類型 (例如 `ExportDefinition`、`StrategyOnFull` 和 `Persistence`) 來定義其他串流屬性。其中包含：
+ 自動匯出的目標 AWS IoT SiteWise、 AWS IoT Analytics Kinesis Data Streams 和 Amazon S3 目的地。如需詳細資訊，請參閱[匯出支援 AWS 雲端 目的地的組態](stream-export-configurations.md)。
+ 匯出優先順序。串流管理員會先匯出優先順序較高的串流，然後再匯出較低的串流。
+  AWS IoT Analytics、Kinesis Data Streams 和目的地的批次大小上限和 AWS IoT SiteWise 批次間隔。符合任一條件時，串流管理員會匯出訊息。
+ 存留時間 (TTL)。保證串流資料可供處理的時間量。您應該確定資料可以在這段期間內使用。這不是刪除政策。TTL 期間後，資料可能不會立即刪除。
+ 串流持久性。選擇此選項可將串流儲存至檔案系統，以便在核心重新啟動期間保留資料，或將串流儲存在記憶體中。
+ 起始序號。指定要用作匯出中起始訊息的訊息序號。

如需 的詳細資訊`MessageStreamDefinition`，請參閱目標語言的 SDK 參考：
+ Java 開發套件中的 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html) 
+ Node.js SDK 中的 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html) 
+ Python SDK 中的 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.MessageStreamDefinition) 

**注意**  
<a name="streammanagerclient-http-config"></a>`StreamManagerClient` 也提供目標目的地，可用來將串流匯出至 HTTP 伺服器。此目標僅供測試之用。它不穩定或不支援在生產環境中使用。

建立串流後，您的 Lambda 函數可以將[訊息附加](#streammanagerclient-append-message)至串流，以傳送資料進行匯出，並從串流[讀取訊息](#streammanagerclient-append-message)以進行本機處理。您建立的串流數量取決於您的硬體功能和商業案例。其中一個策略是為 AWS IoT Analytics 或 Kinesis 資料串流中的每個目標頻道建立串流，但您可以定義串流的多個目標。串流的生命週期相當耐久。

### 要求
<a name="streammanagerclient-create-message-stream-reqs"></a>

此操作有下列需求：
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心版本：1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.5.0  \$1  Java：1.4.0  \$1  Node.js：1.6.0

**注意**  
使用 AWS IoT SiteWise 或 Amazon S3 匯出目的地建立串流有下列需求：  
<a name="streammanagerclient-min-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心版本：1.11.0
<a name="streammanagerclient-min-sdk-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.6.0  \$1  Java：1.5.0  \$1  Node.js：1.7.0

### 範例
<a name="streammanagerclient-create-message-stream-examples"></a>

以下程式碼片段會建立名為 `StreamName` 的串流。它定義 `MessageStreamDefinition`和次級資料類型中的串流屬性。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    client.create_message_stream(MessageStreamDefinition(
        name="StreamName",  # Required.
        max_size=268435456,  # Default is 256 MB.
        stream_segment_size=16777216,  # Default is 16 MB.
        time_to_live_millis=None,  # By default, no TTL is enabled.
        strategy_on_full=StrategyOnFull.OverwriteOldestData,  # Required.
        persistence=Persistence.File,  # Default is File.
        flush_on_write=False,  # Default is false.
        export_definition=ExportDefinition(  # Optional. Choose where/how the stream is exported to the AWS 雲端.
            kinesis=None,
            iot_analytics=None,
            iot_sitewise=None,
            s3_task_executor=None
        )
    ))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[create\$1message\$1stream](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.create_message_stream) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.MessageStreamDefinition)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    client.createMessageStream(
            new MessageStreamDefinition()
                    .withName("StreamName") // Required.
                    .withMaxSize(268435456L)  // Default is 256 MB.
                    .withStreamSegmentSize(16777216L)  // Default is 16 MB.
                    .withTimeToLiveMillis(null)  // By default, no TTL is enabled.
                    .withStrategyOnFull(StrategyOnFull.OverwriteOldestData)  // Required.
                    .withPersistence(Persistence.File)  // Default is File.
                    .withFlushOnWrite(false)  // Default is false.
                    .withExportDefinition(  // Optional. Choose where/how the stream is exported to the AWS 雲端.
                            new ExportDefinition()
                                    .withKinesis(null)
                                    .withIotAnalytics(null)
                                    .withIotSitewise(null)
                                    .withS3TaskExecutor(null)
                    )
 
    );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[createMessageStream](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#createMessageStream-com.amazonaws.greengrass.streammanager.model.MessageStreamDefinition-) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        await client.createMessageStream(
            new MessageStreamDefinition()
                .withName("StreamName") // Required.
                .withMaxSize(268435456)  // Default is 256 MB.
                .withStreamSegmentSize(16777216)  // Default is 16 MB.
                .withTimeToLiveMillis(null)  // By default, no TTL is enabled.
                .withStrategyOnFull(StrategyOnFull.OverwriteOldestData)  // Required.
                .withPersistence(Persistence.File)  // Default is File.
                .withFlushOnWrite(false)  // Default is false.
                .withExportDefinition(  // Optional. Choose where/how the stream is exported to the AWS 雲端.
                    new ExportDefinition()
                        .withKinesis(null)
                        .withIotAnalytics(null)
                        .withIotSitewise(null)
                        .withS3TaskExecutor(null)
                )
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[createMessageStream](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#createMessageStream) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html)

------

如需設定匯出目的地的詳細資訊，請參閱 [匯出支援 AWS 雲端 目的地的組態](stream-export-configurations.md)。

 

## 附加訊息
<a name="streammanagerclient-append-message"></a>

若要將資料傳送至串流管理員以進行匯出，您的 Lambda 函數會將資料附加至目標串流。匯出目的地會決定要傳遞至此方法的資料類型。

### 要求
<a name="streammanagerclient-append-message-reqs"></a>

此操作有下列需求：
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心版本：1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.5.0  \$1  Java：1.4.0  \$1  Node.js：1.6.0

**注意**  
使用 AWS IoT SiteWise 或 Amazon S3 匯出目的地附加訊息有下列需求：  
<a name="streammanagerclient-min-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心版本：1.11.0
<a name="streammanagerclient-min-sdk-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.6.0  \$1  Java：1.5.0  \$1  Node.js：1.7.0

### 範例
<a name="streammanagerclient-append-message-examples"></a>

#### AWS IoT Analytics 或 Kinesis Data Streams 匯出目的地
<a name="streammanagerclient-append-message-blob"></a>

下列程式碼片段附加一個訊息到名為 `StreamName` 的串流。對於 AWS IoT Analytics 或 Kinesis Data Streams 目的地，您的 Lambda 函數會附加資料的 Blob。

此程式碼片段有下列需求：
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心版本：1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.5.0  \$1  Java：1.4.0  \$1  Node.js：1.6.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data')
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[end\$1message](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.append_message)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes());
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[appendMessage](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array"));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[appendMessage](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage)

------

#### AWS IoT SiteWise 匯出目的地
<a name="streammanagerclient-append-message-sitewise"></a>

下列程式碼片段附加一個訊息到名為 `StreamName` 的串流。對於 AWS IoT SiteWise 目的地，您的 Lambda 函數會附加序列化`PutAssetPropertyValueEntry`物件。如需詳細資訊，請參閱[匯出至 AWS IoT SiteWise](stream-export-configurations.md#export-streams-to-sitewise)。

**注意**  
<a name="BatchPutAssetPropertyValue-data-reqs"></a>當您傳送資料至 時 AWS IoT SiteWise，您的資料必須符合 `BatchPutAssetPropertyValue`動作的要求。如需詳細資訊，請參閱《AWS IoT SiteWise API 參考》**中的 [BatchPutAssetPropertyValue](https://docs.aws.amazon.com/iot-sitewise/latest/APIReference/API_BatchPutAssetPropertyValue.html)。

此程式碼片段有下列需求：
+ <a name="streammanagerclient-min-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心版本：1.11.0
+ <a name="streammanagerclient-min-sdk-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.6.0  \$1  Java：1.5.0  \$1  Node.js：1.7.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    # SiteWise requires unique timestamps in all messages. Add some randomness to time and offset.

    # Note: To create a new asset property data, you should use the classes defined in the
    # greengrasssdk.stream_manager module.

    time_in_nanos = TimeInNanos(
        time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
    )
    variant = Variant(double_value=random.random())
    asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
    putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset)
    sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[end\$1message](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.append_message) \$1 [PutAssetPropertyValueEntry](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.PutAssetPropertyValueEntry)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    Random rand = new Random();
    // Note: To create a new asset property data, you should use the classes defined in the
    // com.amazonaws.greengrass.streammanager.model.sitewise package.
    List<AssetPropertyValue> entries = new ArrayList<>() ;

    // IoTSiteWise requires unique timestamps in all messages. Add some randomness to time and offset.
    final int maxTimeRandomness = 60;
    final int maxOffsetRandomness = 10000;
    double randomValue = rand.nextDouble();
    TimeInNanos timestamp = new TimeInNanos()
            .withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness))
            .withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness)));
    AssetPropertyValue entry = new AssetPropertyValue()
            .withValue(new Variant().withDoubleValue(randomValue))
            .withQuality(Quality.GOOD)
            .withTimestamp(timestamp);
    entries.add(entry);

    PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
            .withEntryId(UUID.randomUUID().toString())
            .withPropertyAlias("PropertyAlias")
            .withPropertyValues(entries);
    long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[appendMessage](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-) \$1 [PutAssetPropertyValueEntry](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/sitewise/PutAssetPropertyValueEntry.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const maxTimeRandomness = 60;
        const maxOffsetRandomness = 10000;
        const randomValue = Math.random();
        // Note: To create a new asset property data, you should use the classes defined in the
        // aws-greengrass-core-sdk StreamManager module.
        const timestamp = new TimeInNanos()
            .withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness))
            .withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness));
        const entry = new AssetPropertyValue()
            .withValue(new Variant().withDoubleValue(randomValue))
            .withQuality(Quality.GOOD)
            .withTimestamp(timestamp);

        const putAssetPropertyValueEntry =  new PutAssetPropertyValueEntry()
            .withEntryId(`${ENTRY_ID_PREFIX}${i}`)
            .withPropertyAlias("PropertyAlias")
            .withPropertyValues([entry]);
        const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[appendMessage](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage) \$1 [PutAssetPropertyValueEntry](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.PutAssetPropertyValueEntry.html)

------

#### Amazon S3 匯出目的地
<a name="streammanagerclient-append-message-export-task"></a>

下列程式碼片段會將匯出任務附加至名為 的串流`StreamName`。對於 Amazon S3 目的地，您的 Lambda 函數會附加序列化`S3ExportTaskDefinition`物件，其中包含來源輸入檔案和目標 Amazon S3 物件的相關資訊。如果指定的物件不存在，串流管理員會為您建立它。如需詳細資訊，請參閱[匯出至 Amazon S3](stream-export-configurations.md#export-streams-to-s3)。

此程式碼片段有下列需求：
+ <a name="streammanagerclient-min-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心版本：1.11.0
+ <a name="streammanagerclient-min-sdk-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.6.0  \$1  Java：1.5.0  \$1  Node.js：1.7.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    # Append an Amazon S3 Task definition and print the sequence number.
    s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName")
    sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[end\$1message](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.append_message) \$1 [S3ExportTaskDefinition](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.S3ExportTaskDefinition)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    // Append an Amazon S3 export task definition and print the sequence number.
    S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition()
        .withBucket("BucketName")
        .withKey("KeyName")
        .withInputUrl("URLToFile");
    long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition));
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[appendMessage](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-) \$1 [S3ExportTaskDefinition](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/S3ExportTaskDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
     // Append an Amazon S3 export task definition and print the sequence number.
     const taskDefinition = new S3ExportTaskDefinition()
        .withBucket("BucketName")
        .withKey("KeyName")
        .withInputUrl("URLToFile");
        const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition)));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[appendMessage](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage) \$1 [S3ExportTaskDefinition](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.S3ExportTaskDefinition.html)

------

 

## 讀取訊息
<a name="streammanagerclient-read-messages"></a>

從串流讀取訊息。

### 要求
<a name="streammanagerclient-read-messages-reqs"></a>

此操作有下列需求：
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心版本：1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.5.0  \$1  Java：1.4.0  \$1  Node.js：1.6.0

### 範例
<a name="streammanagerclient-read-messages-examples"></a>

下列程式碼片段可從名為 `StreamName` 的串流讀取訊息。讀取方法需要一個選用的 `ReadMessagesOptions` 物件以指定序號，從要讀取的最小、最大數字和讀取訊息的逾時開始讀取。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    message_list = client.read_messages(
        stream_name="StreamName",
        # By default, if no options are specified, it tries to read one message from the beginning of the stream.
        options=ReadMessagesOptions(
            desired_start_sequence_number=100,
            # Try to read from sequence number 100 or greater. By default, this is 0.
            min_message_count=10,
            # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
            max_message_count=100,  # Accept up to 100 messages. By default this is 1.
            read_timeout_millis=5000
            # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
        )
    )
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[Read\$1messages](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.read_messages) \$1 [ReadMessagesOptions](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.ReadMessagesOptions)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    List<Message> messages = client.readMessages("StreamName",
            // By default, if no options are specified, it tries to read one message from the beginning of the stream.
            new ReadMessagesOptions()
                    // Try to read from sequence number 100 or greater. By default this is 0.
                    .withDesiredStartSequenceNumber(100L)
                    // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
                    .withMinMessageCount(10L)
                    // Accept up to 100 messages. By default this is 1.
                    .withMaxMessageCount(100L)
                    // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
                    .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis())
    );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[readMessages](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#readMessages-java.lang.String-com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions-) \$1 [ReadMessagesOptions](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/ReadMessagesOptions.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const messages = await client.readMessages("StreamName",
            // By default, if no options are specified, it tries to read one message from the beginning of the stream.
            new ReadMessagesOptions()
                // Try to read from sequence number 100 or greater. By default this is 0.
                .withDesiredStartSequenceNumber(100)
                // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1.
                .withMinMessageCount(10)
                // Accept up to 100 messages. By default this is 1.
                .withMaxMessageCount(100)
                // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
                .withReadTimeoutMillis(5 * 1000)
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[readMessages](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#readMessages) \$1 [ReadMessagesOptions](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.ReadMessagesOptions.html)

------

 

## 列出串流
<a name="streammanagerclient-list-streams"></a>

取得串流管理員中的串流清單。

### 要求
<a name="streammanagerclient-list-streams-reqs"></a>

此操作有下列需求：
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心版本：1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.5.0  \$1  Java：1.4.0  \$1  Node.js：1.6.0

### 範例
<a name="streammanagerclient-list-streams-examples"></a>

下列程式碼片段可獲取串流管理員中的串流清單 (依據名稱)。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    stream_names = client.list_streams()
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[list\$1streams](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.list_streams)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    List<String> streamNames = client.listStreams();
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[listStreams](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#listStreams--)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const streams = await client.listStreams();
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[listStreams](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#listStreams)

------

 

## 描述訊息串流
<a name="streammanagerclient-describe-message-stream"></a>

取得串流的中繼資料，包括串流定義、大小和匯出狀態。

### 要求
<a name="streammanagerclient-describe-message-stream-reqs"></a>

此操作有下列需求：
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心版本：1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.5.0  \$1  Java：1.4.0  \$1  Node.js：1.6.0

### 範例
<a name="streammanagerclient-describe-message-stream-examples"></a>

下列程式碼片段可獲取名為 `StreamName` 的串流相關的中繼資料，包括串流定義、大小和匯出工具狀態。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    stream_description = client.describe_message_stream(stream_name="StreamName")
    if stream_description.export_statuses[0].error_message:
        # The last export of export destination 0 failed with some error
        # Here is the last sequence number that was successfully exported
        stream_description.export_statuses[0].last_exported_sequence_number
 
    if (stream_description.storage_status.newest_sequence_number >
            stream_description.export_statuses[0].last_exported_sequence_number):
        pass
        # The end of the stream is ahead of the last exported sequence number
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考： [describe\$1message\$1stream](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.describe_message_stream)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    MessageStreamInfo description = client.describeMessageStream("StreamName");
    String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage();
    if (lastErrorMessage != null && !lastErrorMessage.equals("")) {
        // The last export of export destination 0 failed with some error.
        // Here is the last sequence number that was successfully exported.
        description.getExportStatuses().get(0).getLastExportedSequenceNumber();
    }
 
    if (description.getStorageStatus().getNewestSequenceNumber() >
            description.getExportStatuses().get(0).getLastExportedSequenceNumber()) {
        // The end of the stream is ahead of the last exported sequence number.
    }
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考： [describeMessageStream](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#describeMessageStream-java.lang.String-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const description = await client.describeMessageStream("StreamName");
        const lastErrorMessage = description.exportStatuses[0].errorMessage;
        if (lastErrorMessage) {
            // The last export of export destination 0 failed with some error.
            // Here is the last sequence number that was successfully exported.
            description.exportStatuses[0].lastExportedSequenceNumber;
        }
 
        if (description.storageStatus.newestSequenceNumber >
            description.exportStatuses[0].lastExportedSequenceNumber) {
            // The end of the stream is ahead of the last exported sequence number.
        }
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考： [describeMessageStream](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#describeMessageStream)

------

 

## 更新訊息串流
<a name="streammanagerclient-update-message-stream"></a>

更新現有串流的屬性。如果您的需求在建立串流之後有所變更，建議您更新串流。例如：
+ 新增 AWS 雲端 目的地的新[匯出組態](stream-export-configurations.md)。
+ 增加串流的大小上限，以變更資料匯出或保留的方式。例如，串流大小結合您在完整設定上的策略，可能會導致資料遭到刪除或拒絕，然後串流管理員才能處理資料。
+ 暫停和繼續匯出；例如，如果匯出任務長時間執行，而且您想要對上傳資料進行評價。

您的 Lambda 函數遵循此高階程序來更新串流：

1. [取得串流的描述。](#streammanagerclient-describe-message-stream)

1. 更新對應`MessageStreamDefinition`和次級物件的目標屬性。

1. 在更新的 中傳遞 `MessageStreamDefinition`。請務必包含更新串流的完整物件定義。未定義的屬性會還原為預設值。

   您可以指定要用作匯出中起始訊息的訊息序號。

### 要求
<a name="-streammanagerclient-update-message-streamreqs"></a>

此操作有下列需求：
+ <a name="streammanagerclient-min-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心版本：1.11.0
+ <a name="streammanagerclient-min-sdk-ggc-1.11.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.6.0  \$1  Java：1.5.0  \$1  Node.js：1.7.0

### 範例
<a name="streammanagerclient-update-message-stream-examples"></a>

下列程式碼片段會更新名為 的串流`StreamName`。它會更新匯出至 Kinesis Data Streams 之串流的多個屬性。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    message_stream_info = client.describe_message_stream(STREAM_NAME)
    message_stream_info.definition.max_size=536870912
    message_stream_info.definition.stream_segment_size=33554432
    message_stream_info.definition.time_to_live_millis=3600000
    message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData
    message_stream_info.definition.persistence=Persistence.Memory
    message_stream_info.definition.flush_on_write=False
    message_stream_info.definition.export_definition.kinesis=
        [KinesisConfig(  
            # Updating Export definition to add a Kinesis Stream configuration.
            identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))]
    client.update_message_stream(message_stream_info.definition)
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[updateMessageStream](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.update_message_stream) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.MessageStreamDefinition)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME);
    // Update the message stream with new values.
    client.updateMessageStream(
        messageStreamInfo.getDefinition()
            .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
            // Max Size update should be greater than initial Max Size defined in Create Message Stream request
            .withMaxSize(536870912L) // Update Max Size to 512 MB.
            .withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB.
            .withFlushOnWrite(true) // Update flush on write to true.
            .withPersistence(Persistence.Memory) // Update the persistence to Memory.
            .withTimeToLiveMillis(3600000L)  // Update TTL to 1 hour.
            .withExportDefinition(
                // Optional. Choose where/how the stream is exported to the AWS 雲端.
                messageStreamInfo.getDefinition().getExportDefinition().
                    // Updating Export definition to add a Kinesis Stream configuration.
                    .withKinesis(new ArrayList<KinesisConfig>() {{
                        add(new KinesisConfig()
                            .withIdentifier(EXPORT_IDENTIFIER)
                            .withKinesisStreamName("test"));
                        }})
            );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[update\$1message\$1stream](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#updateMessageStream-java.lang.String-) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const messageStreamInfo = await c.describeMessageStream(STREAM_NAME);
        await client.updateMessageStream(
            messageStreamInfo.definition
                // Max Size update should be greater than initial Max Size defined in Create Message Stream request
                .withMaxSize(536870912)  // Default is 256 MB. Updating Max Size to 512 MB.
                .withStreamSegmentSize(33554432)  // Default is 16 MB. Updating Segment Size to 32 MB.
                .withTimeToLiveMillis(3600000)  // By default, no TTL is enabled. Update TTL to 1 hour.
                .withStrategyOnFull(StrategyOnFull.RejectNewData)  // Required. Updating Strategy on full to reject new data.
                .withPersistence(Persistence.Memory)  // Default is File. Update the persistence to Memory
                .withFlushOnWrite(true)  // Default is false. Updating to true.
                .withExportDefinition(  
                    // Optional. Choose where/how the stream is exported to the AWS 雲端.
                    messageStreamInfo.definition.exportDefinition
                        // Updating Export definition to add a Kinesis Stream configuration.
                        .withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())])
                )
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[updateMessageStream](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#updateMessageStream) \$1 [MessageStreamDefinition](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html)

------

### 更新串流的限制
<a name="streammanagerclient-update-constraints"></a>

更新串流時，適用下列限制條件。除非以下清單中另有說明，否則更新會立即生效。
+ 您無法更新串流的持久性。若要變更此行為，[請刪除串流](#streammanagerclient-delete-message-stream)並[建立定義新持久性政策的串流](#streammanagerclient-create-message-stream)。
+ 您只能在下列情況下更新串流的大小上限：
  + 大小上限必須大於或等於串流的目前大小。<a name="messagestreaminfo-describe-stream"></a>若要尋找此資訊，[請描述串流](#streammanagerclient-describe-message-stream)，然後檢查傳回`MessageStreamInfo`物件的儲存狀態。
  + 大小上限必須大於或等於串流的區段大小。
+ 您可以將串流區段大小更新為小於串流大小上限的值。更新的設定會套用至新的客群。
+ 存留時間 (TTL) 屬性的更新適用於新的附加操作。如果您減少此值，串流管理員也可能會刪除超過 TTL 的現有區段。
+ 完整屬性策略的更新適用於新的附加操作。如果您設定策略來覆寫最舊的資料，串流管理員也可能會根據新設定覆寫現有的區段。
+ 寫入屬性排清的更新會套用至新訊息。
+ 匯出組態的更新會套用至新的匯出。更新請求必須包含您要支援的所有匯出組態。否則，串流管理員會刪除它們。
  + 當您更新匯出組態時，請指定目標匯出組態的識別符。
  + 若要新增匯出組態，請指定新匯出組態的唯一識別符。
  + 若要刪除匯出組態，請省略匯出組態。
+ 若要[更新](#streammanagerclient-update-message-stream)串流中匯出組態的起始序號，您必須指定小於最新序號的值。<a name="messagestreaminfo-describe-stream"></a>若要尋找此資訊，[請描述串流](#streammanagerclient-describe-message-stream)，然後檢查傳回`MessageStreamInfo`物件的儲存狀態。

 

## 刪除訊息串流
<a name="streammanagerclient-delete-message-stream"></a>

刪除串流。刪除串流時，磁碟中該串流的所有儲存資料都會刪除。

### 要求
<a name="streammanagerclient-delete-message-stream-reqs"></a>

此操作有下列需求：
+ <a name="streammanagerclient-min-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心版本：1.10.0
+ <a name="streammanagerclient-min-sdk-ggc-1.10.0"></a>最低 AWS IoT Greengrass 核心 SDK 版本：Python：1.5.0  \$1  Java：1.4.0  \$1  Node.js：1.6.0

### 範例
<a name="streammanagerclient-delete-message-stream-examples"></a>

下列程式碼片段會刪除名為 `StreamName` 的串流。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    client.delete_message_stream(stream_name="StreamName")
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[deleteMessageStream](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.delete_message_stream)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    client.deleteMessageStream("StreamName");
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[Delete\$1message\$1stream](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#deleteMessageStream-java.lang.String-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        await client.deleteMessageStream("StreamName");
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[deleteMessageStream](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#deleteMessageStream)

------

## 另請參閱
<a name="work-with-streams-see-also"></a>
+ [管理 AWS IoT Greengrass 核心上的資料串流](stream-manager.md)
+ [設定 AWS IoT Greengrass 串流管理員](configure-stream-manager.md)
+ [匯出支援 AWS 雲端 目的地的組態](stream-export-configurations.md)
+ [將資料串流匯出至 AWS 雲端 （主控台）](stream-manager-console.md)
+ [將資料串流匯出至 AWS 雲端 (CLI)](stream-manager-cli.md)
+ `StreamManagerClient` AWS IoT Greengrass 核心 SDK 參考中的 ：
  + [Python](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html)
  + [Java](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html)
  + [Node.js](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html)

# 匯出支援 AWS 雲端 目的地的組態
<a name="stream-export-configurations"></a>

使用者定義的 Lambda 函數使用 AWS IoT Greengrass 核心 SDK `StreamManagerClient`與串流管理員互動。當 Lambda [函數建立串流](work-with-streams.md#streammanagerclient-create-message-stream)或[更新串流](work-with-streams.md#streammanagerclient-create-message-stream)時，它會傳遞代表串流屬性的`MessageStreamDefinition`物件，包括匯出定義。`ExportDefinition` 物件包含為串流定義的匯出組態。串流管理員使用這些匯出組態來判斷匯出串流的位置和方式。

![\[ExportDefinition 屬性類型的物件模型圖表。\]](http://docs.aws.amazon.com/zh_tw/greengrass/v1/developerguide/images/stream-manager-exportconfigs.png)


您可以在串流上定義零或多個匯出組態，包括單一目的地類型的多個匯出組態。例如，您可以將串流匯出至兩個 AWS IoT Analytics 頻道和一個 Kinesis 資料串流。

對於失敗的匯出嘗試，串流管理員會持續重試 AWS 雲端 以最多五分鐘的間隔將資料匯出至 。重試嘗試次數沒有上限。

**注意**  
<a name="streammanagerclient-http-config"></a>`StreamManagerClient` 也提供目標目的地，可用來將串流匯出至 HTTP 伺服器。此目標僅供測試之用。它不穩定或不支援在生產環境中使用。

**Topics**
+ [AWS IoT Analytics 頻道](#export-to-iot-analytics)
+ [Amazon Kinesis 資料串流](#export-to-kinesis)
+ [AWS IoT SiteWise 資產屬性](#export-to-iot-sitewise)
+ [Amazon S3 物件](#export-to-s3)

您負責維護這些 AWS 雲端 資源。

## AWS IoT Analytics 頻道
<a name="export-to-iot-analytics"></a>

串流管理員支援自動匯出至 AWS IoT Analytics。 <a name="ita-export-destination"></a>AWS IoT Analytics 可讓您對資料執行進階分析，以協助做出商業決策並改善機器學習模型。如需詳細資訊，請參閱*AWS IoT Analytics 《 使用者指南*》中的[什麼是 AWS IoT Analytics？](https://docs.aws.amazon.com/iotanalytics/latest/userguide/welcome.html)。

在 AWS IoT Greengrass 核心 SDK 中，您的 Lambda 函數會使用 `IoTAnalyticsConfig` 來定義此目的地類型的匯出組態。如需詳細資訊，請參閱目標語言的 SDK 參考：
+ Python SDK 中的 [IoTAnalyticsConfig](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.IoTAnalyticsConfig) 
+ Java 開發套件中的 [IoTAnalyticsConfig](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/export/IoTAnalyticsConfig.html) 
+ Node.js SDK 中的 [IoTAnalyticsConfig](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.IoTAnalyticsConfig.html) 

### 要求
<a name="export-to-iot-analytics-reqs"></a>

此匯出目的地有下列需求：
+ 中的目標頻道 AWS IoT Analytics 必須與 Greengrass 群組位於相同 AWS 帳戶 和 AWS 區域 。
+ 必須[Greengrass 群組角色](group-role.md)允許將 `iotanalytics:BatchPutMessage`許可設為目標頻道。例如：

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "iotanalytics:BatchPutMessage"
              ],
              "Resource": [
              "arn:aws:iotanalytics:us-east-1:123456789012:channel/channel_1_name",
      "arn:aws:iotanalytics:us-east-1:123456789012:channel/channel_2_name"
              ]
          }
      ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以使用萬用字元`*`命名機制，授予對資源的精細或條件式存取。如需詳細資訊，請參閱《[IAM 使用者指南》中的新增和移除 IAM 政策](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。 **

### 匯出至 AWS IoT Analytics
<a name="export-streams-to-iot-analytics"></a>

若要建立匯出至 的串流 AWS IoT Analytics，您的 Lambda [函數會建立](work-with-streams.md#streammanagerclient-create-message-stream)包含一或多個`IoTAnalyticsConfig`物件的匯出定義串流。此物件會定義匯出設定，例如目標頻道、批次大小、批次間隔和優先順序。

當您的 Lambda 函數從裝置接收資料時，它們會將包含資料 Blob [的訊息附加](work-with-streams.md#streammanagerclient-append-message)到目標串流。

然後，串流管理員會根據串流匯出組態中定義的批次設定和優先順序匯出資料。

 

## Amazon Kinesis 資料串流
<a name="export-to-kinesis"></a>

串流管理員支援自動匯出至 Amazon Kinesis Data Streams。<a name="aks-export-destination"></a>Kinesis Data Streams 通常用於彙總大量資料，並將其載入資料倉儲或地圖縮減叢集。如需詳細資訊，請參閱[《Amazon Kinesis 開發人員指南》中的什麼是 Amazon Kinesis Data Streams？](https://docs.aws.amazon.com/streams/latest/dev/what-is-this-service.html)。 *Amazon Kinesis *

在 AWS IoT Greengrass 核心 SDK 中，您的 Lambda 函數會使用 `KinesisConfig` 來定義此目的地類型的匯出組態。如需詳細資訊，請參閱目標語言的 SDK 參考：
+ Python SDK 中的 [KinesisConfig](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.KinesisConfig) 
+ Java 開發套件中的 [KinesisConfig](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/export/KinesisConfig.html) 
+ Node.js SDK 中的 [KinesisConfig](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.KinesisConfig.html) 

### 要求
<a name="export-to-kinesis-reqs"></a>

此匯出目的地有下列需求：
+ Kinesis Data Streams 中的目標串流必須與 Greengrass 群組位於相同 AWS 帳戶 和 AWS 區域 。
+ 必須[Greengrass 群組角色](group-role.md)允許 以資料串流為目標的`kinesis:PutRecords`許可。例如：

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "kinesis:PutRecords"
              ],
              "Resource": [
              "arn:aws:kinesis:us-east-1:123456789012:stream/stream_1_name",
      "arn:aws:kinesis:us-east-1:123456789012:stream/stream_2_name"
              ]
          }
      ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以使用萬用字元`*`命名機制，授予對資源的精細或條件式存取。如需詳細資訊，請參閱《[IAM 使用者指南》中的新增和移除 IAM 政策](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。 **

### 匯出至 Kinesis Data Streams
<a name="export-streams-to-kinesis"></a>

若要建立匯出至 Kinesis Data Streams 的串流，您的 Lambda [函數會建立](work-with-streams.md#streammanagerclient-create-message-stream)包含一或多個`KinesisConfig`物件的匯出定義串流。此物件會定義匯出設定，例如目標資料串流、批次大小、批次間隔和優先順序。

當您的 Lambda 函數從裝置接收資料時，它們會將包含資料 Blob [的訊息附加](work-with-streams.md#streammanagerclient-append-message)到目標串流。然後，串流管理員會根據串流匯出組態中定義的批次設定和優先順序匯出資料。

串流管理員會為每個上傳至 Amazon Kinesis 的記錄產生唯一的隨機 UUID 做為分割區索引鍵。

 

## AWS IoT SiteWise 資產屬性
<a name="export-to-iot-sitewise"></a>

串流管理員支援自動匯出至 AWS IoT SiteWise。 <a name="itsw-export-destination"></a>AWS IoT SiteWise 可讓您大規模收集、組織和分析工業設備的資料。如需詳細資訊，請參閱*AWS IoT SiteWise 《 使用者指南*》中的[什麼是 AWS IoT SiteWise？](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/what-is-sitewise.html)。

在 AWS IoT Greengrass 核心 SDK 中，您的 Lambda 函數會使用 `IoTSiteWiseConfig` 來定義此目的地類型的匯出組態。如需詳細資訊，請參閱目標語言的 SDK 參考：
+ Python SDK 中的 [IoTSiteWiseConfig](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.IoTSiteWiseConfig) 
+ Java 開發套件中的 [IoTSiteWiseConfig](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/export/IoTSiteWiseConfig.html) 
+ Node.js SDK 中的 [IoTSiteWiseConfig](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.IoTSiteWiseConfig.html) 

**注意**  
AWS 也提供 [IoT SiteWise 連接器](iot-sitewise-connector.md)，這是您可以與 OPC-UA 來源搭配使用的預先建置解決方案。

### 要求
<a name="export-to-iot-sitewise-reqs"></a>

此匯出目的地有下列需求：
+ 中的目標資產屬性 AWS IoT SiteWise 必須與 Greengrass 群組位於相同 AWS 帳戶 和 AWS 區域 。
**注意**  
如需 AWS IoT SiteWise 支援的區域清單，請參閱《 *AWS 一般參考*》中的[AWS IoT SiteWise 端點和配額](https://docs.aws.amazon.com/general/latest/gr/iot-sitewise.html#iot-sitewise_region)。
+ 必須[Greengrass 群組角色](group-role.md)允許 以資產屬性為目標的`iotsitewise:BatchPutAssetPropertyValue`許可。下列範例政策使用 `iotsitewise:assetHierarchyPath`條件索引鍵授予對目標根資產及其子資產的存取權。您可以從`Condition`政策中移除 ，以允許存取所有 AWS IoT SiteWise 資產或指定個別資產ARNs。

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
               "Effect": "Allow",
               "Action": "iotsitewise:BatchPutAssetPropertyValue",
               "Resource": "*",
               "Condition": {
                   "StringLike": {
                       "iotsitewise:assetHierarchyPath": [
                           "/root node asset ID",
                           "/root node asset ID/*"
                       ]
                   }
               }
          }
      ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以使用萬用字元`*`命名機制，授予對資源的精細或條件式存取。如需詳細資訊，請參閱《[IAM 使用者指南》中的新增和移除 IAM 政策](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。 **

  如需重要的安全性資訊，請參閱*AWS IoT SiteWise 《 使用者指南*》中的 [ BatchPutAssetPropertyValue 授權](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/security_iam_service-with-iam.html#security_iam_service-with-iam-id-based-policies-batchputassetpropertyvalue-action)。

### 匯出至 AWS IoT SiteWise
<a name="export-streams-to-sitewise"></a>

若要建立匯出至 的串流 AWS IoT SiteWise，您的 Lambda [函數會建立](work-with-streams.md#streammanagerclient-create-message-stream)包含一或多個`IoTSiteWiseConfig`物件的匯出定義串流。此物件會定義匯出設定，例如批次大小、批次間隔和優先順序。

當您的 Lambda 函數從裝置接收資產屬性資料時，它們會將包含資料的訊息附加到目標串流。訊息是 JSON 序列化`PutAssetPropertyValueEntry`物件，其中包含一或多個資產屬性的屬性值。如需詳細資訊，請參閱[附加訊息](work-with-streams.md#streammanagerclient-append-message-sitewise)以取得 AWS IoT SiteWise 匯出目的地。

**注意**  
<a name="BatchPutAssetPropertyValue-data-reqs"></a>當您傳送資料至 時 AWS IoT SiteWise，您的資料必須符合 `BatchPutAssetPropertyValue`動作的要求。如需詳細資訊，請參閱《AWS IoT SiteWise API 參考》**中的 [BatchPutAssetPropertyValue](https://docs.aws.amazon.com/iot-sitewise/latest/APIReference/API_BatchPutAssetPropertyValue.html)。

然後，串流管理員會根據串流匯出組態中定義的批次設定和優先順序匯出資料。

 

您可以調整串流管理員設定和 Lambda 函數邏輯，以設計匯出策略。例如：
+ 對於近乎即時的匯出，請設定低批次大小和間隔設定，並在接收資料時將資料附加至串流。
+ 為了最佳化批次處理、降低頻寬限制或將成本降至最低，您的 Lambda 函數可以在將資料附加至串流之前，合併單一資產屬性收到的timestamp-quality-value(TQV) 資料點。一種策略是在一個訊息中批次處理最多 10 個不同的屬性資產組合或屬性別名的項目，而不是為相同的屬性傳送多個項目。這有助於串流管理員保持在[AWS IoT SiteWise 配額](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/quotas.html)內。

 

## Amazon S3 物件
<a name="export-to-s3"></a>

串流管理員支援自動匯出至 Amazon S3。<a name="s3-export-destination"></a>您可以使用 Amazon S3 來存放和擷取大量資料。如需詳細資訊，請參閱[《Amazon Simple Storage Service 開發人員指南》中的什麼是 Amazon S3？](https://docs.aws.amazon.com/AmazonS3/latest/dev/Welcome.html)。 **

在 AWS IoT Greengrass 核心 SDK 中，您的 Lambda 函數會使用 `S3ExportTaskExecutorConfig` 來定義此目的地類型的匯出組態。如需詳細資訊，請參閱目標語言的 SDK 參考：
+ Python SDK 中的 [S3ExportTaskExecutorConfig](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.S3ExportTaskExecutorConfig) 
+ Java 開發套件中的 [S3ExportTaskExecutorConfig](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/export/S3ExportTaskExecutorConfig.html) 
+ Node.js SDK 中的 [S3ExportTaskExecutorConfig](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.S3ExportTaskExecutorConfig.html) 

### 要求
<a name="export-to-s3-reqs"></a>

此匯出目的地有下列需求：
+ 目標 Amazon S3 儲存貯體必須與 Greengrass 群組 AWS 帳戶 位於相同的 中。
+ 如果 Greengrass 群組[的預設容器化](lambda-group-config.md#lambda-containerization-groupsettings)是 **Greengrass 容器**，您必須將 [STREAM\$1MANAGER\$1READ\$1ONLY\$1DIRS](configure-stream-manager.md#stream-manager-read-only-directories) 參數設定為使用根檔案系統下`/tmp`或不在其中的輸入檔案目錄。
+ 如果在 **Greengrass 容器**模式下執行的 Lambda 函數將輸入檔案寫入輸入檔案目錄，您必須為目錄建立本機磁碟區資源，並將目錄掛載到具有寫入許可的容器。這可確保檔案寫入根檔案系統，並在容器外部可見。如需詳細資訊，請參閱[使用 Lambda 函數和連接器存取本機資源](access-local-resources.md)。
+ 必須[Greengrass 群組角色](group-role.md)允許目標儲存貯體的下列許可。例如：

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "s3:PutObject",
                  "s3:AbortMultipartUpload",
                  "s3:ListMultipartUploadParts"
              ],
              "Resource": [
                  "arn:aws:s3:::bucket-1-name/*",
                  "arn:aws:s3:::bucket-2-name/*"
              ]
          }
      ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以使用萬用字元`*`命名機制，授予對資源的精細或條件式存取。如需詳細資訊，請參閱《[IAM 使用者指南》中的新增和移除 IAM 政策](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。 **

### 匯出至 Amazon S3
<a name="export-streams-to-s3"></a>

若要建立匯出至 Amazon S3 的串流，您的 Lambda 函數會使用 `S3ExportTaskExecutorConfig` 物件來設定匯出政策。政策會定義匯出設定，例如分段上傳閾值和優先順序。對於 Amazon S3 匯出，串流管理員會上傳從核心裝置上的本機檔案讀取的資料。若要啟動上傳，您的 Lambda 函數會將匯出任務附加至目標串流。匯出任務包含輸入檔案和目標 Amazon S3 物件的相關資訊。串流管理員會依照任務附加至串流的順序來執行任務。

**注意**  
<a name="bucket-not-key-must-exist"></a>目標儲存貯體必須已存在於您的 中 AWS 帳戶。如果指定金鑰的物件不存在，串流管理員會為您建立物件。

 此高階工作流程如下圖所示。

![\[Amazon S3 匯出的串流管理員工作流程圖表。\]](http://docs.aws.amazon.com/zh_tw/greengrass/v1/developerguide/images/stream-manager-s3.png)


串流管理員會使用分段上傳閾值屬性、[最小組件大小](configure-stream-manager.md#stream-manager-minimum-part-size)設定，以及輸入檔案的大小來判斷如何上傳資料。分段上傳閾值必須大於或等於最小分段大小。如果您想要平行上傳資料，您可以建立多個串流。

指定目標 Amazon S3 物件的金鑰可以在`!{timestamp:value}`預留位置中包含有效的 [Java DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html) 字串。您可以根據輸入檔案資料上傳的時間，使用這些時間戳記預留位置來分割 Amazon S3 中的資料。例如，下列金鑰名稱會解析為 等值`my-key/2020/12/31/data.txt`。

```
my-key/!{timestamp:YYYY}/!{timestamp:MM}/!{timestamp:dd}/data.txt
```

**注意**  
如果您想要監控串流的匯出狀態，請先建立狀態串流，然後將匯出串流設定為使用它。如需詳細資訊，請參閱[監控匯出任務](#monitor-export-status-s3)。

#### 管理輸入資料
<a name="manage-s3-input-data"></a>

您可以編寫程式碼，讓 IoT 應用程式用來管理輸入資料的生命週期。下列工作流程範例示範如何使用 Lambda 函數來管理此資料。

1. 本機程序會從裝置或周邊接收資料，然後將資料寫入核心裝置上的目錄中的檔案。這些是串流管理員的輸入檔案。
**注意**  
若要判斷您是否必須設定輸入檔案目錄的存取權，請參閱 [STREAM\$1MANAGER\$1READ\$1ONLY\$1DIRS](configure-stream-manager.md#stream-manager-read-only-directories) 參數。  
串流管理員在 中執行的程序會繼承群組[預設存取身分](lambda-group-config.md#lambda-access-identity-groupsettings)的所有檔案系統許可。串流管理員必須具有存取輸入檔案的許可。如有必要，您可以使用 `chmod(1)`命令來變更檔案的許可。

1. Lambda 函數會掃描目錄，並在[建立新檔案時將匯出任務附加](work-with-streams.md#streammanagerclient-append-message-export-task)至目標串流。任務是 JSON 序列化`S3ExportTaskDefinition`物件，可指定輸入檔案的 URL、目標 Amazon S3 儲存貯體和金鑰，以及選用的使用者中繼資料。

1. 串流管理員會讀取輸入檔案，並依附加任務的順序將資料匯出至 Amazon S3。<a name="bucket-not-key-must-exist"></a>目標儲存貯體必須已存在於您的 中 AWS 帳戶。如果指定金鑰的物件不存在，串流管理員會為您建立物件。

1. Lambda 函數會從狀態串流[讀取訊息](work-with-streams.md#streammanagerclient-read-messages)，以監控匯出狀態。匯出任務完成後，Lambda 函數可以刪除對應的輸入檔案。如需詳細資訊，請參閱[監控匯出任務](#monitor-export-status-s3)。

### 監控匯出任務
<a name="monitor-export-status-s3"></a>

您可以編寫程式碼，讓 IoT 應用程式用來監控 Amazon S3 匯出的狀態。您的 Lambda 函數必須建立狀態串流，然後將匯出串流設定為將狀態更新寫入狀態串流。單一狀態串流可以從匯出至 Amazon S3 的多個串流接收狀態更新。

首先，[建立](work-with-streams.md#streammanagerclient-create-message-stream)要用作狀態串流的串流。您可以設定串流的大小和保留政策，以控制狀態訊息的生命週期。例如：
+ `Memory` 如果您不想儲存狀態訊息，請將 `Persistence`設定為 。
+ `StrategyOnFull` 設定為 ，`OverwriteOldestData`以免遺失新的狀態訊息。

然後，建立或更新匯出串流以使用狀態串流。具體而言，請設定串流`S3ExportTaskExecutorConfig`匯出組態的狀態組態屬性。這會通知串流管理員將有關匯出任務的狀態訊息寫入狀態串流。在 `StatusConfig` 物件中，指定狀態串流的名稱和詳細程度。下列支援的值範圍從最詳細 (`ERROR`) 到最詳細 (`TRACE`)。預設值為 `INFO`。
+ `ERROR`
+ `WARN`
+ `INFO`
+ `DEBUG`
+ `TRACE`

 

下列範例工作流程顯示 Lambda 函數如何使用狀態串流來監控匯出狀態。

1. 如前一個工作流程所述，Lambda 函數[會將匯出任務附加](work-with-streams.md#streammanagerclient-append-message-export-task)至設定為將有關匯出任務的狀態訊息寫入狀態串流的串流。附加操作會傳回代表任務 ID 的序號。

1. Lambda 函數會從狀態串流循序[讀取訊息](work-with-streams.md#streammanagerclient-read-messages)，然後根據串流名稱和任務 ID 或根據訊息內容的匯出任務屬性來篩選訊息。例如，Lambda 函數可以依匯出任務的輸入檔案 URL 進行篩選，其由訊息內容中的`S3ExportTaskDefinition`物件表示。

   下列狀態碼表示匯出任務已達到已完成狀態：
   + `Success`。 上傳已成功完成。
   + `Failure`。 串流管理員發生錯誤，例如，指定的儲存貯體不存在。解決問題後，您可以再次將匯出任務附加到串流。
   + `Canceled`。 任務因為串流或匯出定義已刪除，或任務的time-to-live(TTL) 期間過期而中止。
**注意**  
任務也可能的狀態為 `InProgress`或 `Warning`。當事件傳回不會影響任務執行的錯誤時，串流管理員會發出警告。例如，無法清除中止的部分上傳會傳回警告。

1. 匯出任務完成後，Lambda 函數可以刪除對應的輸入檔案。

下列範例顯示 Lambda 函數如何讀取和處理狀態訊息。

------
#### [ Python ]

```
import time
from greengrasssdk.stream_manager import (
    ReadMessagesOptions,
    Status,
    StatusConfig,
    StatusLevel,
    StatusMessage,
    StreamManagerClient,
)
from greengrasssdk.stream_manager.util import Util

client = StreamManagerClient()
 
try:
    # Read the statuses from the export status stream
    is_file_uploaded_to_s3 = False
    while not is_file_uploaded_to_s3:
        try:
            messages_list = client.read_messages(
                "StatusStreamName", ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000)
            )
            for message in messages_list:
                # Deserialize the status message first.
                status_message = Util.deserialize_json_bytes_to_obj(message.payload, StatusMessage)

                # Check the status of the status message. If the status is "Success",
                # the file was successfully uploaded to S3.
                # If the status was either "Failure" or "Cancelled", the server was unable to upload the file to S3.
                # We will print the message for why the upload to S3 failed from the status message.
                # If the status was "InProgress", the status indicates that the server has started uploading
                # the S3 task.
                if status_message.status == Status.Success:
                    logger.info("Successfully uploaded file at path " + file_url + " to S3.")
                    is_file_uploaded_to_s3 = True
                elif status_message.status == Status.Failure or status_message.status == Status.Canceled:
                    logger.info(
                        "Unable to upload file at path " + file_url + " to S3. Message: " + status_message.message
                    )
                    is_file_uploaded_to_s3 = True
            time.sleep(5)
        except StreamManagerException:
            logger.exception("Exception while running")
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[read\$1messages](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.streammanagerclient.html#greengrasssdk.stream_manager.streammanagerclient.StreamManagerClient.read_messages) \$1 [StatusMessage](https://aws.github.io/aws-greengrass-core-sdk-python/_apidoc/greengrasssdk.stream_manager.data.html#greengrasssdk.stream_manager.data.StatusMessage)

------
#### [ Java ]

```
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient;
import com.amazonaws.greengrass.streammanager.client.utils.ValidateAndSerialize;
import com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions;
import com.amazonaws.greengrass.streammanager.model.Status;
import com.amazonaws.greengrass.streammanager.model.StatusConfig;
import com.amazonaws.greengrass.streammanager.model.StatusLevel;
import com.amazonaws.greengrass.streammanager.model.StatusMessage;

try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    try {
        boolean isS3UploadComplete = false;
        while (!isS3UploadComplete) {
            try {
                // Read the statuses from the export status stream
                List<Message> messages = client.readMessages("StatusStreamName",
                    new ReadMessagesOptions().withMinMessageCount(1L).withReadTimeoutMillis(1000L));
                for (Message message : messages) {
                    // Deserialize the status message first.
                    StatusMessage statusMessage = ValidateAndSerialize.deserializeJsonBytesToObj(message.getPayload(), StatusMessage.class);
                    // Check the status of the status message. If the status is "Success", the file was successfully uploaded to S3.
                    // If the status was either "Failure" or "Canceled", the server was unable to upload the file to S3.
                    // We will print the message for why the upload to S3 failed from the status message.
                    // If the status was "InProgress", the status indicates that the server has started uploading the S3 task.
                    if (Status.Success.equals(statusMessage.getStatus())) {
                        System.out.println("Successfully uploaded file at path " + FILE_URL + " to S3.");
                        isS3UploadComplete = true;
                     } else if (Status.Failure.equals(statusMessage.getStatus()) || Status.Canceled.equals(statusMessage.getStatus())) {
                        System.out.println(String.format("Unable to upload file at path %s to S3. Message %s",
                            statusMessage.getStatusContext().getS3ExportTaskDefinition().getInputUrl(),
                            statusMessage.getMessage()));
                        sS3UploadComplete = true;
                    }
                }
            } catch (StreamManagerException ignored) {
            } finally {
                // Sleep for sometime for the S3 upload task to complete before trying to read the status message.
                Thread.sleep(5000);
            }
        } catch (e) {
        // Properly handle errors.
    }
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[readMessages](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#readMessages-java.lang.String-com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions-) \$1 [StatusMessage](https://aws.github.io/aws-greengrass-core-sdk-java/com/amazonaws/greengrass/streammanager/model/StatusMessage.html)

------
#### [ Node.js ]

```
const {
    StreamManagerClient, ReadMessagesOptions,
    Status, StatusConfig, StatusLevel, StatusMessage,
    util,
} = require('aws-greengrass-core-sdk').StreamManager;

const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        let isS3UploadComplete = false;
        while (!isS3UploadComplete) {
            try {
                // Read the statuses from the export status stream
                const messages = await c.readMessages("StatusStreamName",
                    new ReadMessagesOptions()
                        .withMinMessageCount(1)
                        .withReadTimeoutMillis(1000));

                messages.forEach((message) => {
                    // Deserialize the status message first.
                    const statusMessage = util.deserializeJsonBytesToObj(message.payload, StatusMessage);
                    // Check the status of the status message. If the status is 'Success', the file was successfully uploaded to S3.
                    // If the status was either 'Failure' or 'Cancelled', the server was unable to upload the file to S3.
                    // We will print the message for why the upload to S3 failed from the status message.
                    // If the status was "InProgress", the status indicates that the server has started uploading the S3 task.
                    if (statusMessage.status === Status.Success) {
                        console.log(`Successfully uploaded file at path ${FILE_URL} to S3.`);
                        isS3UploadComplete = true;
                    } else if (statusMessage.status === Status.Failure || statusMessage.status === Status.Canceled) {
                        console.log(`Unable to upload file at path ${FILE_URL} to S3. Message: ${statusMessage.message}`);
                        isS3UploadComplete = true;
                    }
                });
                // Sleep for sometime for the S3 upload task to complete before trying to read the status message.
                await new Promise((r) => setTimeout(r, 5000));
            } catch (e) {
                // Ignored
            }
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[readMessages](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#readMessages) \$1 [StatusMessage](https://aws.github.io/aws-greengrass-core-sdk-js/aws-greengrass-core-sdk.StreamManager.StatusMessage.html)

------