

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 管理 Greengrass 核心裝置上的資料串流
<a name="manage-data-streams"></a>

AWS IoT Greengrass 串流管理員可讓您更有效率且更可靠地將大量 IoT 資料傳輸至 AWS 雲端。串流管理員會在將資料串流匯出至 之前處理 AWS IoT Greengrass 核心上的資料串流 AWS 雲端。串流管理員會與常見的邊緣案例整合，例如機器學習 (ML) 推論，其中 AWS IoT Greengrass Core 裝置會在將資料匯出至 AWS 雲端 或本機儲存目的地之前處理和分析資料。

串流管理員提供通用界面，可簡化自訂元件開發，因此您不需要建置自訂串流管理功能。您的元件可以使用標準化機制來處理大量串流和管理本機資料保留政策。您可以為每個串流定義儲存類型、大小和資料保留的政策，以控制串流管理員處理和匯出資料的方式。

串流管理員可在間歇性或有限連線的環境中運作。您可以定義頻寬使用、逾時行為，以及 AWS IoT Greengrass 核心在連線或中斷連線時處理串流資料的方式。您也可以設定優先順序，以控制核心匯出串流至 的順序 AWS IoT Greengrass AWS 雲端。這可讓您比其他資料更快地處理關鍵資料。

您可以設定串流管理員自動將資料匯出至 ， AWS 雲端 以供儲存或進一步處理和分析。串流管理員支援匯出至下列 AWS 雲端 目的地：
+ 您針對資料執行進階分析的頻道 in AWS IoT Analytics. AWS IoT Analytics let，以協助做出商業決策並改善機器學習模型。如需詳細資訊，請參閱《 AWS IoT Analytics使用者指南**》中的[什麼是AWS IoT Analytics ？](https://docs.aws.amazon.com/iotanalytics/latest/userguide/welcome.html)。
+ Amazon Kinesis Data Streams 中的串流。您可以使用 Kinesis Data Streams 彙總大量資料，並將其載入資料倉儲或 MapReduce 叢集。如需詳細資訊，請參閱《Amazon Kinesis Data Streams 開發人員指南》**中的[什麼是 Amazon Kinesis Data Streams？](https://docs.aws.amazon.com/streams/latest/dev/what-is-this-service.html)。
+ 您大規模從工業設備收集、組織和分析資料的資產屬性 in. AWS IoT SiteWise AWS IoT SiteWise let。如需詳細資訊，請參閱*AWS IoT SiteWise 《 使用者指南*》中的[什麼是 AWS IoT SiteWise？](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/what-is-sitewise.html)。
+ Amazon Simple Storage Service Amazon S3 中的物件。您可以使用 Amazon S3 來存放和擷取大量資料。如需詳細資訊，請參閱[《Amazon Simple Storage Service 開發人員指南》中的什麼是 Amazon S3？](https://docs.aws.amazon.com/AmazonS3/latest/dev/Welcome.html)。 **

## 串流管理工作流程
<a name="stream-manager-workflow"></a>

您的 IoT 應用程式會透過 Stream Manager SDK 與串流管理員互動。

在簡單的工作流程中， AWS IoT Greengrass 核心上的元件會使用 IoT 資料，例如時間序列溫度和壓力指標。元件可能會篩選或壓縮資料，然後呼叫 Stream Manager SDK，將資料寫入串流管理員中的串流。串流管理員可以根據您為串流定義的政策， AWS 雲端 自動將串流匯出至 。元件也可以將資料直接傳送到本機資料庫或儲存儲存庫。

您的 IoT 應用程式可以包含讀取或寫入串流的多個自訂元件。這些元件可以讀取和寫入串流，以篩選、彙總和分析 AWS IoT Greengrass 核心裝置上的資料。這可讓您快速回應本機事件，並在資料從核心傳輸到 AWS 雲端 或本機目的地之前擷取寶貴的資訊。

若要開始使用，請將串流管理員元件部署到您的 AWS IoT Greengrass 核心裝置。在 部署中，設定串流管理員元件參數，以定義套用至 Greengrass 核心裝置上所有串流的設定。使用這些參數來根據您的業務需求和環境限制，控制串流管理員如何存放、處理和匯出串流。

設定串流管理員之後，您可以建立和部署 IoT 應用程式。這些通常是在 Stream Manager SDK `StreamManagerClient`中用來建立串流並與串流互動的自訂元件。建立串流時，您可以定義每個串流政策，例如匯出目的地、優先順序和持久性。

## 要求
<a name="stream-manager-requirements"></a>

下列需求適用於使用串流管理員：
+ 除了 AWS IoT Greengrass Core 軟體之外，串流管理員至少需要 70 MB RAM。您的總記憶體需求視您的工作負載而定。
+ AWS IoT Greengrass 元件必須使用 Stream Manager SDK 與串流管理員互動。串流管理員 SDK 提供下列語言：<a name="stream-manager-sdk-download-list"></a>
  + [適用於 Java 的 Stream Manager SDK](https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-java/) (1.1.0 版或更新版本）
  + [適用於 Node.js 的 Stream Manager SDK](https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-js/) (v1.1.0 或更新版本）
  + [適用於 Python 的 Stream Manager SDK](https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-python/) (1.1.0 版或更新版本）
+ AWS IoT Greengrass 元件必須在其配方中指定串流管理員元件 (`aws.greengrass.StreamManager`) 做為相依性，才能使用串流管理員。
**注意**  <a name="stream-manager-upgrade-note"></a>
如果您使用串流管理員將資料匯出至雲端，則無法將串流管理員元件的 2.0.7 版升級至 v2.0.8 和 v2.0.11 之間的版本。如果您是第一次部署串流管理員，強烈建議您部署最新版本的串流管理員元件。
+ 如果您定義串流的 AWS 雲端 匯出目的地，則必須建立匯出目標，並在 [Greengrass 裝置角色](device-service-role.md)中授予存取許可。視目的地而定，可能也會套用其他要求。如需詳細資訊，請參閱：<a name="export-destinations-links"></a>
  + [AWS IoT Analytics 頻道](stream-export-configurations.md#export-to-iot-analytics)
  + [Amazon Kinesis 資料串流](stream-export-configurations.md#export-to-kinesis)
  + [AWS IoT SiteWise 資產屬性](stream-export-configurations.md#export-to-iot-sitewise)
  + [Amazon S3 物件](stream-export-configurations.md#export-to-s3)

  您負責維護這些 AWS 雲端 資源。

## 資料安全
<a name="stream-manager-security"></a>

當您使用串流管理員時，請注意下列安全性考量。

### 本機資料安全性
<a name="stream-manager-security-stream-data"></a>

AWS IoT Greengrass 不會在核心裝置上的本機元件之間加密靜態或傳輸中的串流資料。
+ **靜態資料**。串流資料以本機之方式儲存在儲存目錄中。為了資料安全，如果啟用， AWS IoT Greengrass 則依賴檔案許可和全磁碟加密。您可以使用選用的 [STREAM\$1MANAGER\$1STORE\$1ROOT\$1DIR](configure-stream-manager.md#STREAM_MANAGER_STORE_ROOT_DIR) 參數來指定儲存目錄。如果您稍後將此參數變更為使用不同的儲存目錄， AWS IoT Greengrass 不會刪除先前的儲存目錄或其內容。
+ **本機傳輸中的資料**。 AWS IoT Greengrass 不會加密資料來源、 AWS IoT Greengrass 元件、串流管理員 SDK 和串流管理員之間本機傳輸中的串流資料。
+ **傳輸到 的資料 AWS 雲端**。串流管理員匯出至 的資料串流 AWS 雲端 使用標準 AWS 服務用戶端加密搭配 Transport Layer Security (TLS)。

### 用戶端身分驗證
<a name="stream-manager-security-client-authentication"></a>

串流管理員用戶端使用串流管理員 SDK 與串流管理員通訊。啟用用戶端身分驗證時，只有 Greengrass 元件可以與串流管理員中的串流互動。停用用戶端身分驗證時，在 Greengrass 核心裝置上執行的任何程序都可以與串流管理員中的串流互動。您應該只在商業案例需要時，才停用身分驗證。

您可以使用 [STREAM\$1MANAGER\$1AUTHENTICATE\$1CLIENT](configure-stream-manager.md#STREAM_MANAGER_AUTHENTICATE_CLIENT) 參數來設定用戶端身分驗證模式。您可以在將串流管理員元件部署至核心裝置時設定此參數。


****  

|   | 已啟用 | Disabled | 
| --- | --- | --- | 
| 參數值 | `true` (預設和建議) | `false` | 
| 允許用戶端 | 核心裝置上的 Greengrass 元件 | 核心裝置上的 Greengrass 元件 在 Greengrass 核心裝置上執行的其他程序 | 

## 另請參閱
<a name="stream-manager-see-also"></a>
+ [設定 AWS IoT Greengrass 串流管理員](configure-stream-manager.md)
+ [使用 StreamManagerClient 搭配串流](work-with-streams.md)
+ [匯出支援 AWS 雲端 目的地的組態](stream-export-configurations.md)

# 設定 AWS IoT Greengrass 串流管理員
<a name="configure-stream-manager"></a>

在 Greengrass 核心裝置上，串流管理員可以存放、處理和匯出 IoT 裝置資料。串流管理員提供您用來設定執行時間設定的參數。這些設定適用於 Greengrass 核心裝置上的所有串流。您可以在部署元件時，使用 AWS IoT Greengrass 主控台或 API 來設定串流管理員設定。變更會在部署完成後生效。

## 串流管理員參數
<a name="stream-manager-parameters"></a>

串流管理員提供下列參數，您可以在將元件部署到核心裝置時設定這些參數。所有參數都是選用的。

**儲存目錄**  <a name="STREAM_MANAGER_STORE_ROOT_DIR"></a>
參數名稱：`STREAM_MANAGER_STORE_ROOT_DIR`  
用於存放串流之本機資料夾的絕對路徑。此值必須以正斜線開頭 (例如 `/data`)。  
<a name="stream-manager-store-root-dir-parameter-folder-requirements"></a>您必須指定現有的資料夾，而且[執行串流管理員元件的系統使用者](configure-greengrass-core-v2.md#configure-component-user)必須具有讀取和寫入此資料夾的許可。例如，您可以執行下列命令來建立和設定資料夾 `/var/greengrass/streams`，您指定此資料夾做為串流管理員根資料夾。這些命令允許預設系統使用者 `ggc_user`讀取和寫入此資料夾。  

```
sudo mkdir /var/greengrass/streams
sudo chown ggc_user /var/greengrass/streams
sudo chmod 700 /var/greengrass/streams
```
如需保護串流資料安全的資訊，請參閱 [本機資料安全性](manage-data-streams.md#stream-manager-security-stream-data)。  
預設：`/greengrass/v2/work/aws.greengrass.StreamManager`

**伺服器連接埠**  
參數名稱：`STREAM_MANAGER_SERVER_PORT`  
用於與串流管理員通訊的本機連接埠號碼。預設值為 `8088`。  
您可以指定 `0`使用隨機的可用連接埠。

**驗證用戶端**  <a name="STREAM_MANAGER_AUTHENTICATE_CLIENT"></a>
參數名稱：`STREAM_MANAGER_AUTHENTICATE_CLIENT`  
表示用戶端是否須經過驗證才能與串流管理員互動。用戶端和串流管理員之間的所有互動都由串流管理員 SDK 控制。此參數決定哪些用戶端可以呼叫 Stream Manager SDK 來使用串流。如需詳細資訊，請參閱[用戶端身分驗證](manage-data-streams.md#stream-manager-security-client-authentication)。  
有效值為 `true` 或 `false`。預設值為 `true` (建議)。  
+ `true`。 僅允許 Greengrass 元件做為用戶端。元件使用內部 AWS IoT Greengrass 核心通訊協定來驗證 Stream Manager SDK。
+ `false`。 允許在 AWS IoT Greengrass Core 上執行的任何程序成為用戶端。除非您的商業案例需要，`false`否則請勿將值設定為 。例如，`false`只有在核心裝置上的非元件程序必須直接與串流管理員通訊時，才能使用 。

**最高頻寬**  
參數名稱：`STREAM_MANAGER_EXPORTER_MAX_BANDWIDTH`  
可用來匯出資料的平均最高頻寬 (以千位元數/秒為單位)。預設允許無限使用可用頻寬。

**執行緒集區大小**  
參數名稱：`STREAM_MANAGER_EXPORTER_THREAD_POOL_SIZE`  
可用於匯出資料的作用中執行緒數量上限。預設值為 `5`。  
最佳大小取決於您的硬體、串流磁碟區和規劃的匯出串流數量。如果匯出速度很慢，您可以調整此設定，找出適合您硬體和商務案例的最佳大小。核心裝置硬體的 CPU 和記憶體是限制因素。首先，您可以嘗試將此值設定為等同於裝置上處理器核心的數量。  
請小心不要設定高於硬體可支援的大小。每個串流都會耗用硬體資源，因此請嘗試限制受限裝置上的匯出串流數量。

**JVM 引數**  
參數名稱：`JVM_ARGS`  
自訂 Java 虛擬機器參數，以在啟動時傳遞給串流管理員。多個引數應用空格分隔。  
僅限必須覆寫 JVM 使用的預設設定時，才能使用此參數。例如，如果您計劃匯出大量串流，可能需要增加預設堆積大小。

**Logging level (記錄層級)**  
參數名稱：`LOG_LEVEL`  
元件的記錄層級。從下列日誌層級中進行選擇，此處依層級順序列出：  
+ `TRACE`
+ `DEBUG`
+ `INFO`
+ `WARN`
+ `ERROR`
預設：`INFO`

**分段上傳的大小下限**  <a name="stream-manager-minimum-part-size"></a>
參數名稱：`STREAM_MANAGER_EXPORTER_S3_DESTINATION_MULTIPART_UPLOAD_MIN_PART_SIZE_BYTES`  
分段中部分的大小下限 （以位元組為單位） 上傳至 Amazon S3。串流管理員使用此設定和輸入檔案的大小，來判斷如何在分段 PUT 請求中批次處理資料。預設和最小值為`5242880`位元組 (5 MB)。  
串流管理員使用串流的 `sizeThresholdForMultipartUploadBytes` 屬性來判斷要匯出至 Amazon S3，做為單一或分段上傳。使用者定義的 Greengrass 元件會在建立匯出至 Amazon S3 的串流時設定此閾值。預設閾值為 5 MB。

## 另請參閱
<a name="configure-stream-manager-see-also"></a>
+ [管理 Greengrass 核心裝置上的資料串流](manage-data-streams.md)
+ [使用 StreamManagerClient 搭配串流](work-with-streams.md)
+ [匯出支援 AWS 雲端 目的地的組態](stream-export-configurations.md)

# 建立使用串流管理員的自訂元件
<a name="use-stream-manager-in-custom-components"></a>

在自訂 Greengrass 元件中使用串流管理員來存放、處理和匯出 IoT 裝置資料。使用本節中的程序和範例來建立與串流管理員搭配使用的元件配方、成品和應用程式。如需如何開發和測試元件的詳細資訊，請參閱 [建立 AWS IoT Greengrass 元件](create-components.md)。

**Topics**
+ [定義使用串流管理員的元件配方](#stream-manager-recipes)
+ [連線至應用程式程式碼中的串流管理員](#connect-to-stream-manager)

## 定義使用串流管理員的元件配方
<a name="stream-manager-recipes"></a>

若要在自訂元件中使用串流管理員，您必須將`aws.greengrass.StreamManager`元件定義為相依性。您也必須提供 Stream Manager SDK。完成下列任務，以您選擇的語言下載和使用 Stream Manager SDK。

### 使用適用於 Java 的 Stream Manager 開發套件
<a name="use-stream-manager-sdk-java"></a>

適用於 Java 的 Stream Manager 開發套件以 JAR 檔案提供，您可以用來編譯元件。然後，您可以建立包含 Stream Manager SDK 的應用程式 JAR、將應用程式 JAR 定義為元件成品，並在元件生命週期中執行應用程式 JAR。

**使用適用於 Java 的 Stream Manager 開發套件**

1. 下載適用於 [Java JAR 的 Stream Manager SDK 檔案](https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-java/blob/main/sdk/aws-greengrass-stream-manager-sdk-java.jar)。

1. 執行下列其中一項操作，從 Java 應用程式和 Stream Manager SDK JAR 檔案建立元件成品：
   + 將應用程式建置為包含 Stream Manager SDK JAR 的 JAR 檔案，然後在元件配方中執行此 JAR 檔案。
   + 將 Stream Manager SDK JAR 定義為元件成品。當您在元件配方中執行應用程式時，請將該成品新增至 classpath。

   您的元件配方可能如下所示。此元件會執行 [StreamManagerS3.java](https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-java/blob/main/samples/StreamManagerS3/src/main/java/com/amazonaws/greengrass/examples/StreamManagerS3.java) 範例的修改版本，其中 `StreamManagerS3.jar`包含 Stream Manager SDK JAR。

------
#### [ JSON ]

   ```
   {
     "RecipeFormatVersion": "2020-01-25",
     "ComponentName": "com.example.StreamManagerS3Java",
     "ComponentVersion": "1.0.0",
     "ComponentDescription": "Uses stream manager to upload a file to an S3 bucket.",
     "ComponentPublisher": "Amazon",
     "ComponentDependencies": {
       "aws.greengrass.StreamManager": {
         "VersionRequirement": "^2.0.0"
       }
     },
     "Manifests": [
       {
         "Lifecycle": {
           "Run": "java -jar {artifacts:path}/StreamManagerS3.jar"
         },
         "Artifacts": [
           {
             "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Java/1.0.0/StreamManagerS3.jar"
           }
         ]
       }
     ]
   }
   ```

------
#### [ YAML ]

   ```
   ---
   RecipeFormatVersion: '2020-01-25'
   ComponentName: com.example.StreamManagerS3Java
   ComponentVersion: 1.0.0
   ComponentDescription: Uses stream manager to upload a file to an S3 bucket.
   ComponentPublisher: Amazon
   ComponentDependencies:
     aws.greengrass.StreamManager:
       VersionRequirement: "^2.0.0"
   Manifests:
     - Lifecycle:
         Run: java -jar {artifacts:path}/StreamManagerS3.jar
       Artifacts:
         - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Java/1.0.0/StreamManagerS3.jar
   ```

------

   如需如何開發和測試元件的詳細資訊，請參閱 [建立 AWS IoT Greengrass 元件](create-components.md)。

### 使用適用於 Python 的 Stream Manager SDK
<a name="use-stream-manager-sdk-python"></a>

適用於 Python 的 Stream Manager SDK 可作為原始程式碼提供，您可以包含在元件中。建立 Stream Manager SDK 的 ZIP 檔案、將 ZIP 檔案定義為元件成品，並在元件生命週期中安裝 SDK 的需求。

**使用適用於 Python 的 Stream Manager SDK**

1. 複製或下載 [aws-greengrass-stream-manager-sdk-python](https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-python) 儲存庫。

   ```
   git clone git@github.com:aws-greengrass/aws-greengrass-stream-manager-sdk-python.git
   ```

1. 建立包含 `stream_manager` 資料夾的 ZIP 檔案，其中包含適用於 Python 的 Stream Manager SDK 的原始程式碼。您可以提供此 ZIP 檔案做為元件成品，讓 AWS IoT Greengrass Core 軟體在安裝元件時解壓縮。請執行下列操作：

   1. 開啟資料夾，其中包含您在上一個步驟中複製或下載的儲存庫。

      ```
      cd aws-greengrass-stream-manager-sdk-python
      ```

   1. 將`stream_manager`資料夾壓縮至名為 的 ZIP 檔案`stream_manager_sdk.zip`。

------
#### [ Linux or Unix ]

      ```
      zip -rv stream_manager_sdk.zip stream_manager
      ```

------
#### [ Windows Command Prompt (CMD) ]

      ```
      tar -acvf stream_manager_sdk.zip stream_manager
      ```

------
#### [ PowerShell ]

      ```
      Compress-Archive stream_manager stream_manager_sdk.zip
      ```

------

   1. 確認`stream_manager_sdk.zip`檔案包含 `stream_manager` 資料夾及其內容。執行下列命令來列出 ZIP 檔案的內容。

------
#### [ Linux or Unix ]

      ```
      unzip -l stream_manager_sdk.zip
      ```

------
#### [ Windows Command Prompt (CMD) ]

      ```
      tar -tf stream_manager_sdk.zip
      ```

------

      輸出應看起來如下列內容。

      ```
      Archive:  aws-greengrass-stream-manager-sdk-python/stream_manager.zip
        Length      Date    Time    Name
      ---------  ---------- -----   ----
              0  02-24-2021 20:45   stream_manager/
            913  02-24-2021 20:45   stream_manager/__init__.py
           9719  02-24-2021 20:45   stream_manager/utilinternal.py
           1412  02-24-2021 20:45   stream_manager/exceptions.py
           1004  02-24-2021 20:45   stream_manager/util.py
              0  02-24-2021 20:45   stream_manager/data/
         254463  02-24-2021 20:45   stream_manager/data/__init__.py
          26515  02-24-2021 20:45   stream_manager/streammanagerclient.py
      ---------                     -------
         294026                     8 files
      ```

1. 將 Stream Manager SDK 成品複製到元件的成品資料夾。除了 Stream Manager SDK ZIP 檔案，您的元件會使用 SDK `requirements.txt`的檔案來安裝 Stream Manager SDK 的相依性。將 *\$1/greengrass-components* 取代為您用於本機開發之資料夾的路徑。

------
#### [ Linux or Unix ]

   ```
   cp {stream_manager_sdk.zip,requirements.txt} ~/greengrass-components/artifacts/com.example.StreamManagerS3Python/1.0.0/
   ```

------
#### [ Windows Command Prompt (CMD) ]

   ```
   robocopy . %USERPROFILE%\greengrass-components\artifacts\com.example.StreamManagerS3Python\1.0.0 stream_manager_sdk.zip
   robocopy . %USERPROFILE%\greengrass-components\artifacts\com.example.StreamManagerS3Python\1.0.0 requirements.txt
   ```

------
#### [ PowerShell ]

   ```
   cp .\stream_manager_sdk.zip,.\requirements.txt ~\greengrass-components\artifacts\com.example.StreamManagerS3Python\1.0.0\
   ```

------

1. 建立您的元件配方。在配方中，執行下列動作：

   1. 將 `stream_manager_sdk.zip`和 `requirements.txt` 定義為成品。

   1. 將 Python 應用程式定義為成品。

   1. 在安裝生命週期中，從 安裝 Stream Manager SDK 需求`requirements.txt`。

   1. 在執行生命週期中，將 Stream Manager SDK 附加至 `PYTHONPATH`，然後執行您的 Python 應用程式。

   您的元件配方可能如下所示。此元件會執行 [stream\$1manager\$1s3.py ](https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-python/blob/main/samples/stream_manager_s3.py)範例。

------
#### [ JSON ]

   ```
   {
     "RecipeFormatVersion": "2020-01-25",
     "ComponentName": "com.example.StreamManagerS3Python",
     "ComponentVersion": "1.0.0",
     "ComponentDescription": "Uses stream manager to upload a file to an S3 bucket.",
     "ComponentPublisher": "Amazon",
     "ComponentDependencies": {
       "aws.greengrass.StreamManager": {
         "VersionRequirement": "^2.0.0"
       }
     },
     "Manifests": [
       {
         "Platform": {
           "os": "linux"
         },
         "Lifecycle": {
           "install": "pip3 install --user -r {artifacts:path}/requirements.txt",
           "Run": "export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk; python3 {artifacts:path}/stream_manager_s3.py"
         },
         "Artifacts": [
           {
             "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip",
             "Unarchive": "ZIP"
           },
           {
             "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py"
           },
           {
             "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt"
           }
         ]
       },
       {
         "Platform": {
           "os": "windows"
         },
         "Lifecycle": {
           "install": "pip3 install --user -r {artifacts:path}/requirements.txt",
           "Run": "set \"PYTHONPATH=%PYTHONPATH%;{artifacts:decompressedPath}/stream_manager_sdk\" & py -3 {artifacts:path}/stream_manager_s3.py"
         },
         "Artifacts": [
           {
             "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip",
             "Unarchive": "ZIP"
           },
           {
             "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py"
           },
           {
             "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt"
           }
         ]
       }
     ]
   }
   ```

------
#### [ YAML ]

   ```
   ---
   RecipeFormatVersion: '2020-01-25'
   ComponentName: com.example.StreamManagerS3Python
   ComponentVersion: 1.0.0
   ComponentDescription: Uses stream manager to upload a file to an S3 bucket.
   ComponentPublisher: Amazon
   ComponentDependencies:
     aws.greengrass.StreamManager:
       VersionRequirement: "^2.0.0"
   Manifests:
     - Platform:
         os: linux
       Lifecycle:
         install: pip3 install --user -r {artifacts:path}/requirements.txt
         Run: |
           export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk
           python3 {artifacts:path}/stream_manager_s3.py
       Artifacts:
         - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip
           Unarchive: ZIP
         - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py
         - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt
     - Platform:
         os: windows
       Lifecycle:
         install: pip3 install --user -r {artifacts:path}/requirements.txt
         Run: |
           set "PYTHONPATH=%PYTHONPATH%;{artifacts:decompressedPath}/stream_manager_sdk"
           py -3 {artifacts:path}/stream_manager_s3.py
       Artifacts:
         - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_sdk.zip
           Unarchive: ZIP
         - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/stream_manager_s3.py
         - URI: s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3Python/1.0.0/requirements.txt
   ```

------

   如需如何開發和測試元件的詳細資訊，請參閱 [建立 AWS IoT Greengrass 元件](create-components.md)。

### 使用適用於 JavaScript 的 Stream Manager SDK
<a name="use-stream-manager-sdk-javascript"></a>

適用於 JavaScript 的 Stream Manager SDK 可作為原始程式碼提供，您可以包含在元件中。建立 Stream Manager SDK 的 ZIP 檔案、將 ZIP 檔案定義為元件成品，並在元件生命週期中安裝 SDK。

**使用適用於 JavaScript 的 Stream Manager SDK**

1. 複製或下載 [aws-greengrass-stream-manager-sdk-js](https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-js) 儲存庫。

   ```
   git clone git@github.com:aws-greengrass/aws-greengrass-stream-manager-sdk-js.git
   ```

1. 建立包含 `aws-greengrass-stream-manager-sdk` 資料夾的 ZIP 檔案，其中包含適用於 JavaScript 的 Stream Manager SDK 的原始程式碼。您可以提供此 ZIP 檔案做為元件成品，讓 AWS IoT Greengrass Core 軟體在安裝元件時解壓縮。請執行下列操作：

   1. 開啟資料夾，其中包含您在上一個步驟中複製或下載的儲存庫。

      ```
      cd aws-greengrass-stream-manager-sdk-js
      ```

   1. 將`aws-greengrass-stream-manager-sdk`資料夾壓縮至名為 的 ZIP 檔案`stream-manager-sdk.zip`。

------
#### [ Linux or Unix ]

      ```
      zip -rv stream-manager-sdk.zip aws-greengrass-stream-manager-sdk
      ```

------
#### [ Windows Command Prompt (CMD) ]

      ```
      tar -acvf stream-manager-sdk.zip aws-greengrass-stream-manager-sdk
      ```

------
#### [ PowerShell ]

      ```
      Compress-Archive aws-greengrass-stream-manager-sdk stream-manager-sdk.zip
      ```

------

   1. 確認`stream-manager-sdk.zip`檔案包含 `aws-greengrass-stream-manager-sdk` 資料夾及其內容。執行下列命令來列出 ZIP 檔案的內容。

------
#### [ Linux or Unix ]

      ```
      unzip -l stream-manager-sdk.zip
      ```

------
#### [ Windows Command Prompt (CMD) ]

      ```
      tar -tf stream-manager-sdk.zip
      ```

------

      輸出應看起來如下列內容。

      ```
      Archive:  stream-manager-sdk.zip
        Length      Date    Time    Name
      ---------  ---------- -----   ----
              0  02-24-2021 22:36   aws-greengrass-stream-manager-sdk/
            369  02-24-2021 22:36   aws-greengrass-stream-manager-sdk/package.json
           1017  02-24-2021 22:36   aws-greengrass-stream-manager-sdk/util.js
           8374  02-24-2021 22:36   aws-greengrass-stream-manager-sdk/utilInternal.js
           1937  02-24-2021 22:36   aws-greengrass-stream-manager-sdk/exceptions.js
              0  02-24-2021 22:36   aws-greengrass-stream-manager-sdk/data/
         353343  02-24-2021 22:36   aws-greengrass-stream-manager-sdk/data/index.js
          22599  02-24-2021 22:36   aws-greengrass-stream-manager-sdk/client.js
            216  02-24-2021 22:36   aws-greengrass-stream-manager-sdk/index.js
      ---------                     -------
         387855                     9 files
      ```

1. 將 Stream Manager SDK 成品複製到元件的成品資料夾。將 *\$1/greengrass-components* 取代為您用於本機開發之資料夾的路徑。

------
#### [ Linux or Unix ]

   ```
   cp stream-manager-sdk.zip ~/greengrass-components/artifacts/com.example.StreamManagerS3JS/1.0.0/
   ```

------
#### [ Windows Command Prompt (CMD) ]

   ```
   robocopy . %USERPROFILE%\greengrass-components\artifacts\com.example.StreamManagerS3JS\1.0.0 stream-manager-sdk.zip
   ```

------
#### [ PowerShell ]

   ```
   cp .\stream-manager-sdk.zip ~\greengrass-components\artifacts\com.example.StreamManagerS3JS\1.0.0\
   ```

------

1. 建立您的元件配方。在配方中，執行下列動作：

   1. `stream-manager-sdk.zip` 定義為成品。

   1. 將 JavaScript 應用程式定義為成品。

   1. 在安裝生命週期中，從`stream-manager-sdk.zip`成品安裝 Stream Manager SDK。此`npm install`命令會建立`node_modules`資料夾，其中包含 Stream Manager SDK 及其相依性。

   1. 在執行生命週期中，將 `node_modules` 資料夾附加至 `NODE_PATH`，然後執行 JavaScript 應用程式。

   您的元件配方可能如下所示。此元件會執行 [StreamManagerS3](https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-js/blob/main/samples/StreamManagerS3/index.js) 範例。

------
#### [ JSON ]

   ```
   {
     "RecipeFormatVersion": "2020-01-25",
     "ComponentName": "com.example.StreamManagerS3JS",
     "ComponentVersion": "1.0.0",
     "ComponentDescription": "Uses stream manager to upload a file to an S3 bucket.",
     "ComponentPublisher": "Amazon",
     "ComponentDependencies": {
       "aws.greengrass.StreamManager": {
         "VersionRequirement": "^2.0.0"
       }
     },
     "Manifests": [
       {
         "Platform": {
           "os": "linux"
         },
         "Lifecycle": {
           "install": "npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk",
           "Run": "export NODE_PATH=$NODE_PATH:{work:path}/node_modules; node {artifacts:path}/index.js"
         },
         "Artifacts": [
           {
             "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip",
             "Unarchive": "ZIP"
           },
           {
             "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js"
           }
         ]
       },
       {
         "Platform": {
           "os": "windows"
         },
         "Lifecycle": {
           "install": "npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk",
           "Run": "set \"NODE_PATH=%NODE_PATH%;{work:path}/node_modules\" & node {artifacts:path}/index.js"
         },
         "Artifacts": [
           {
             "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip",
             "Unarchive": "ZIP"
           },
           {
             "URI": "s3://amzn-s3-demo-bucket/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js"
           }
         ]
       }
     ]
   }
   ```

------
#### [ YAML ]

   ```
   ---
   RecipeFormatVersion: '2020-01-25'
   ComponentName: com.example.StreamManagerS3JS
   ComponentVersion: 1.0.0
   ComponentDescription: Uses stream manager to upload a file to an S3 bucket.
   ComponentPublisher: Amazon
   ComponentDependencies:
     aws.greengrass.StreamManager:
       VersionRequirement: "^2.0.0"
   Manifests:
     - Platform:
         os: linux
       Lifecycle:
         install: npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk
         Run: |
           export NODE_PATH=$NODE_PATH:{work:path}/node_modules
           node {artifacts:path}/index.js
       Artifacts:
         - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip
           Unarchive: ZIP
         - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js
     - Platform:
         os: windows
       Lifecycle:
         install: npm install {artifacts:decompressedPath}/stream-manager-sdk/aws-greengrass-stream-manager-sdk
         Run: |
           set "NODE_PATH=%NODE_PATH%;{work:path}/node_modules"
           node {artifacts:path}/index.js
       Artifacts:
         - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/stream-manager-sdk.zip
           Unarchive: ZIP
         - URI: s3://DOC-EXAMPLE-BUCKET/artifacts/com.example.StreamManagerS3JS/1.0.0/index.js
   ```

------

   如需如何開發和測試元件的詳細資訊，請參閱 [建立 AWS IoT Greengrass 元件](create-components.md)。

## 連線至應用程式程式碼中的串流管理員
<a name="connect-to-stream-manager"></a>

若要連線至應用程式中的串流管理員，`StreamManagerClient`請從串流管理員 SDK 建立 執行個體。此用戶端會連線至其預設連接埠 8088 上的串流管理員元件，或您指定的連接埠。如需建立執行個體`StreamManagerClient`後如何使用 的詳細資訊，請參閱 [使用 StreamManagerClient 搭配串流](work-with-streams.md)。

**Example 範例：使用預設連接埠連線至串流管理員**  

```
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient;

public class MyStreamManagerComponent {

    void connectToStreamManagerWithDefaultPort() {
        StreamManagerClient client = StreamManagerClientFactory.standard().build();
        
        // Use the client.
    }
}
```

```
from stream_manager import (
    StreamManagerClient
)
              
def connect_to_stream_manager_with_default_port():
    client = StreamManagerClient()
    
    # Use the client.
```

```
const {
    StreamManagerClient
} = require('aws-greengrass-stream-manager-sdk');

function connectToStreamManagerWithDefaultPort() {
    const client = new StreamManagerClient();
    
    // Use the client.
}
```

**Example 範例：使用非預設連接埠連線至串流管理員**  
如果您使用預設以外的連接埠設定串流管理員，則必須使用[程序間通訊](interprocess-communication.md)從元件組態擷取連接埠。  
`port` 組態參數包含您在部署串流管理員`STREAM_MANAGER_SERVER_PORT`時在 中指定的值。

```
void connectToStreamManagerWithCustomPort() {
    EventStreamRPCConnection eventStreamRpcConnection = IPCUtils.getEventStreamRpcConnection();
    GreengrassCoreIPCClient greengrassCoreIPCClient = new GreengrassCoreIPCClient(eventStreamRpcConnection);
    List<String> keyPath = new ArrayList<>();
    keyPath.add("port");

    GetConfigurationRequest request = new GetConfigurationRequest();
    request.setComponentName("aws.greengrass.StreamManager");
    request.setKeyPath(keyPath);
    GetConfigurationResponse response =
            greengrassCoreIPCClient.getConfiguration(request, Optional.empty()).getResponse().get();
    String port = response.getValue().get("port").toString();
    System.out.print("Stream Manager is running on port: " + port);

    final StreamManagerClientConfig config = StreamManagerClientConfig.builder()
            .serverInfo(StreamManagerServerInfo.builder().port(Integer.parseInt(port)).build()).build();

    StreamManagerClient client = StreamManagerClientFactory.standard().withClientConfig(config).build();
    
    // Use the client.
}
```

```
import awsiot.greengrasscoreipc
from awsiot.greengrasscoreipc.model import (
    GetConfigurationRequest
)
from stream_manager import (
    StreamManagerClient
)

TIMEOUT = 10

def connect_to_stream_manager_with_custom_port():
    # Use IPC to get the port from the stream manager component configuration.
    ipc_client = awsiot.greengrasscoreipc.connect()
    request = GetConfigurationRequest()
    request.component_name = "aws.greengrass.StreamManager"
    request.key_path = ["port"]
    operation = ipc_client.new_get_configuration()
    operation.activate(request)
    future_response = operation.get_response()
    response = future_response.result(TIMEOUT)
    stream_manager_port = str(response.value["port"])
    
    # Use port to create a stream manager client.
    stream_client = StreamManagerClient(port=stream_manager_port)
    
    # Use the client.
```

# 使用 StreamManagerClient 搭配串流
<a name="work-with-streams"></a>

在 Greengrass 核心裝置上執行的使用者定義 Greengrass 元件可以使用 Stream Manager SDK 中的 `StreamManagerClient` 物件，在串流[管理員](manage-data-streams.md)中建立串流，然後與串流互動。當元件建立串流時，它會定義串流的 AWS 雲端 目的地、優先順序和其他匯出和資料保留政策。若要將資料傳送至串流管理員，元件會將資料附加至串流。如果為串流定義匯出目的地，串流管理員會自動匯出串流。

**注意**  
<a name="stream-manager-clients"></a>一般而言，串流管理員的用戶端是使用者定義的 Greengrass 元件。如果您的商業案例需要，您也可以允許在 Greengrass 核心 （例如 Docker 容器） 上執行的非元件程序與串流管理員互動。如需詳細資訊，請參閱[用戶端身分驗證](manage-data-streams.md#stream-manager-security-client-authentication)。

本主題中的程式碼片段會示範用戶端如何呼叫`StreamManagerClient`方法以使用串流。如需方法及其引數的實作詳細資訊，請使用每個程式碼片段後列出的 SDK 參考連結。

如果您在 Lambda 函數中使用串流管理員，Lambda 函數應該在函數處理常式`StreamManagerClient`之外執行個體化。如果在處理常式內執行個體化，則該函數在每次叫用時都會建立 `client` 以及與串流管理員的連線。

**注意**  
如果您在處理常式內將 `StreamManagerClient` 執行個體化，則必須在 `client` 完成其工作時明確呼叫 `close()` 方法。否則，`client` 會將連線保持在開啟狀態，並將另一個執行緒保持在執行狀態，直到指令碼結束為止。

`StreamManagerClient` 支援下列操作：
+ [建立訊息串流](#streammanagerclient-create-message-stream)
+ [附加訊息](#streammanagerclient-append-message)
+ [讀取訊息](#streammanagerclient-read-messages)
+ [列出串流](#streammanagerclient-list-streams)
+ [描述訊息串流](#streammanagerclient-describe-message-stream)
+ [更新訊息串流](#streammanagerclient-update-message-stream)
+ [刪除訊息串流](#streammanagerclient-delete-message-stream)

## 建立訊息串流
<a name="streammanagerclient-create-message-stream"></a>

若要建立串流，使用者定義的 Greengrass 元件會呼叫建立方法並傳入`MessageStreamDefinition`物件。此物件會指定串流的唯一名稱，並定義串流管理員在達到串流大小上限時應如何處理新資料。您可以使用 `MessageStreamDefinition` 及其資料類型 (例如 `ExportDefinition`、`StrategyOnFull` 和 `Persistence`) 來定義其他串流屬性。其中包含：
+ 自動匯出的目標 AWS IoT SiteWise、 AWS IoT Analytics Kinesis Data Streams 和 Amazon S3 目的地。如需詳細資訊，請參閱[匯出支援 AWS 雲端 目的地的組態](stream-export-configurations.md)。
+ 匯出優先順序。串流管理員會先匯出優先順序較高的串流，然後再匯出較低的串流。
+  AWS IoT Analytics、Kinesis Data Streams 和 AWS IoT SiteWise 目的地的批次大小上限和批次間隔。符合任一條件時，串流管理員會匯出訊息。
+ 存留時間 (TTL)。保證串流資料可供處理的時間量。您應該確定資料可以在這段期間內使用。這不是刪除政策。TTL 期間後，資料可能不會立即刪除。
+ 串流持久性。選擇此選項可將串流儲存至檔案系統，以便在核心重新啟動期間保留資料，或將串流儲存在記憶體中。
+ 起始序號。指定要用作匯出中起始訊息的訊息序號。

如需 的詳細資訊`MessageStreamDefinition`，請參閱目標語言的 SDK 參考：
+ Java 開發套件中的 [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html) 
+ Node.js SDK 中的 [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html) 
+ Python SDK 中的 [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.MessageStreamDefinition) 

**注意**  
<a name="streammanagerclient-http-config"></a>`StreamManagerClient` 也提供目標目的地，可用來將串流匯出至 HTTP 伺服器。此目標僅供測試之用。它不穩定或不支援在生產環境中使用。

建立串流後，您的 Greengrass 元件可以將[訊息附加](#streammanagerclient-append-message)到串流，以傳送資料進行匯出，並從串流[讀取訊息](#streammanagerclient-read-messages)以進行本機處理。您建立的串流數量取決於您的硬體功能和商業案例。其中一個策略是為 AWS IoT Analytics 或 Kinesis 資料串流中的每個目標頻道建立串流，但您可以定義串流的多個目標。串流的生命週期相當耐久。

### 要求
<a name="streammanagerclient-create-message-stream-reqs"></a>

此操作有下列需求：
+ <a name="streammanagerclient-min-sm-sdk"></a>最低串流管理員 SDK 版本：Python：1.1.0  \$1  Java：1.1.0  \$1  Node.js：1.1.0

### 範例
<a name="streammanagerclient-create-message-stream-examples"></a>

以下程式碼片段會建立名為 `StreamName` 的串流。它定義 `MessageStreamDefinition`和次級資料類型中的串流屬性。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    client.create_message_stream(MessageStreamDefinition(
        name="StreamName",    # Required.
        max_size=268435456,    # Default is 256 MB.
        stream_segment_size=16777216,    # Default is 16 MB.
        time_to_live_millis=None,    # By default, no TTL is enabled.
        strategy_on_full=StrategyOnFull.OverwriteOldestData,    # Required.
        persistence=Persistence.File,    # Default is File.
        flush_on_write=False,    # Default is false.
        export_definition=ExportDefinition(    # Optional. Choose where/how the stream is exported to the AWS 雲端.
            kinesis=None,
            iot_analytics=None,
            iot_sitewise=None,
            s3_task_executor=None
        )
    ))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[create\$1message\$1stream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.create_message_stream) \$1 [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.MessageStreamDefinition)

------
#### [ Java ]

```
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
    client.createMessageStream(
            new MessageStreamDefinition()
                    .withName("StreamName") // Required.
                    .withMaxSize(268435456L)    // Default is 256 MB.
                    .withStreamSegmentSize(16777216L)    // Default is 16 MB.
                    .withTimeToLiveMillis(null)    // By default, no TTL is enabled.
                    .withStrategyOnFull(StrategyOnFull.OverwriteOldestData)    // Required.
                    .withPersistence(Persistence.File)    // Default is File.
                    .withFlushOnWrite(false)    // Default is false.
                    .withExportDefinition(    // Optional. Choose where/how the stream is exported to the AWS 雲端.
                            new ExportDefinition()
                                    .withKinesis(null)
                                    .withIotAnalytics(null)
                                    .withIotSitewise(null)
                                    .withS3(null)
                    )
 
    );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[createMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#createMessageStream-com.amazonaws.greengrass.streammanager.model.MessageStreamDefinition-) \$1 [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
  try {
    await client.createMessageStream(
      new MessageStreamDefinition()
        .withName("StreamName") // Required.
        .withMaxSize(268435456)  // Default is 256 MB.
        .withStreamSegmentSize(16777216)  // Default is 16 MB.
        .withTimeToLiveMillis(null)  // By default, no TTL is enabled.
        .withStrategyOnFull(StrategyOnFull.OverwriteOldestData)  // Required.
        .withPersistence(Persistence.File)  // Default is File.
        .withFlushOnWrite(false)  // Default is false.
        .withExportDefinition(  // Optional. Choose where/how the stream is exported to the AWS 雲端.
          new ExportDefinition()
            .withKinesis(null)
            .withIotAnalytics(null)
            .withIotSiteWise(null)
            .withS3(null)
        )
    );
  } catch (e) {
    // Properly handle errors.
  }
});
client.onError((err) => {
  // Properly handle connection errors.
  // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[createMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#createMessageStream) \$1 [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html)

------

如需設定匯出目的地的詳細資訊，請參閱 [匯出支援 AWS 雲端 目的地的組態](stream-export-configurations.md)。

## 附加訊息
<a name="streammanagerclient-append-message"></a>

若要將資料傳送至串流管理員以進行匯出，您的 Greengrass 元件會將資料附加至目標串流。匯出目的地會決定要傳遞至此方法的資料類型。

### 要求
<a name="streammanagerclient-append-message-reqs"></a>

此操作有下列需求：
+ <a name="streammanagerclient-min-sm-sdk"></a>最低串流管理員 SDK 版本：Python：1.1.0  \$1  Java：1.1.0  \$1  Node.js：1.1.0

### 範例
<a name="streammanagerclient-append-message-examples"></a>

#### AWS IoT Analytics 或 Kinesis Data Streams 匯出目的地
<a name="streammanagerclient-append-message-blob"></a>

下列程式碼片段附加一個訊息到名為 `StreamName` 的串流。對於 AWS IoT Analytics 或 Kinesis Data Streams 目的地，您的 Greengrass 元件會附加資料的 Blob。

此程式碼片段有下列需求：
+ <a name="streammanagerclient-min-sm-sdk"></a>最低串流管理員 SDK 版本：Python：1.1.0  \$1  Java：1.1.0  \$1  Node.js：1.1.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data')
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[end\$1message](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.append_message)

------
#### [ Java ]

```
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
    long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes());
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[appendMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array"));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[appendMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage)

------

#### AWS IoT SiteWise 匯出目的地
<a name="streammanagerclient-append-message-sitewise"></a>

下列程式碼片段附加一個訊息到名為 `StreamName` 的串流。對於 AWS IoT SiteWise 目的地，您的 Greengrass 元件會附加序列化`PutAssetPropertyValueEntry`物件。如需詳細資訊，請參閱[匯出至 AWS IoT SiteWise](stream-export-configurations.md#export-streams-to-sitewise)。

**注意**  
<a name="BatchPutAssetPropertyValue-data-reqs"></a>當您傳送資料至 時 AWS IoT SiteWise，您的資料必須符合 `BatchPutAssetPropertyValue`動作的要求。如需詳細資訊，請參閱《AWS IoT SiteWise API 參考》**中的 [BatchPutAssetPropertyValue](https://docs.aws.amazon.com/iot-sitewise/latest/APIReference/API_BatchPutAssetPropertyValue.html)。

此程式碼片段有下列需求：
+ <a name="streammanagerclient-min-sm-sdk"></a>最低串流管理員 SDK 版本：Python：1.1.0  \$1  Java：1.1.0  \$1  Node.js：1.1.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    # SiteWise requires unique timestamps in all messages and also needs timestamps not earlier
    # than 10 minutes in the past. Add some randomness to time and offset.

    # Note: To create a new asset property data, you should use the classes defined in the
    # greengrasssdk.stream_manager module.

    time_in_nanos = TimeInNanos(
        time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000)
    )
    variant = Variant(double_value=random.random())
    asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)]
    putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset)
    sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[end\$1message](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.append_message) \$1 [PutAssetPropertyValueEntry](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.PutAssetPropertyValueEntry)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    Random rand = new Random();
    // Note: To create a new asset property data, you should use the classes defined in the
    // com.amazonaws.greengrass.streammanager.model.sitewise package.
    List<AssetPropertyValue> entries = new ArrayList<>() ;

    // IoTSiteWise requires unique timestamps in all messages and also needs timestamps not earlier
    // than 10 minutes in the past. Add some randomness to time and offset.
    final int maxTimeRandomness = 60;
    final int maxOffsetRandomness = 10000;
    double randomValue = rand.nextDouble();
    TimeInNanos timestamp = new TimeInNanos()
            .withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness))
            .withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness)));
    AssetPropertyValue entry = new AssetPropertyValue()
            .withValue(new Variant().withDoubleValue(randomValue))
            .withQuality(Quality.GOOD)
            .withTimestamp(timestamp);
    entries.add(entry);

    PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry()
            .withEntryId(UUID.randomUUID().toString())
            .withPropertyAlias("PropertyAlias")
            .withPropertyValues(entries);
    long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[appendMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-) \$1 [PutAssetPropertyValueEntry](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/sitewise/PutAssetPropertyValueEntry.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const maxTimeRandomness = 60;
        const maxOffsetRandomness = 10000;
        const randomValue = Math.random();
        // Note: To create a new asset property data, you should use the classes defined in the
        // aws-greengrass-core-sdk StreamManager module.
        const timestamp = new TimeInNanos()
            .withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness))
            .withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness));
        const entry = new AssetPropertyValue()
            .withValue(new Variant().withDoubleValue(randomValue))
            .withQuality(Quality.GOOD)
            .withTimestamp(timestamp);

        const putAssetPropertyValueEntry =    new PutAssetPropertyValueEntry()
            .withEntryId(`${ENTRY_ID_PREFIX}${i}`)
            .withPropertyAlias("PropertyAlias")
            .withPropertyValues([entry]);
        const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[appendMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage) \$1 [PutAssetPropertyValueEntry](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.PutAssetPropertyValueEntry.html)

------

#### Amazon S3 匯出目的地
<a name="streammanagerclient-append-message-export-task"></a>

下列程式碼片段會將匯出任務附加至名為 的串流`StreamName`。對於 Amazon S3 目的地，您的 Greengrass 元件會附加序列化`S3ExportTaskDefinition`物件，其中包含來源輸入檔案和目標 Amazon S3 物件的相關資訊。如果指定的物件不存在，串流管理員會為您建立它。如需詳細資訊，請參閱[匯出至 Amazon S3](stream-export-configurations.md#export-streams-to-s3)。

此程式碼片段有下列需求：
+ <a name="streammanagerclient-min-sm-sdk"></a>最低串流管理員 SDK 版本：Python：1.1.0  \$1  Java：1.1.0  \$1  Node.js：1.1.0

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    # Append an Amazon S3 Task definition and print the sequence number.
    s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName")
    sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(s3_export_task_definition))
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[end\$1message](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.append_message) \$1 [S3ExportTaskDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.S3ExportTaskDefinition)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    // Append an Amazon S3 export task definition and print the sequence number.
    S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition()
        .withBucket("BucketName")
        .withKey("KeyName")
        .withInputUrl("URLToFile");
    long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition));
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[appendMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#appendMessage-java.lang.String-byte:A-) \$1 [S3ExportTaskDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/S3ExportTaskDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
     // Append an Amazon S3 export task definition and print the sequence number.
     const taskDefinition = new S3ExportTaskDefinition()
        .withBucket("BucketName")
        .withKey("KeyName")
        .withInputUrl("URLToFile");
        const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition)));
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[appendMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#appendMessage) \$1 [S3ExportTaskDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.S3ExportTaskDefinition.html)

------

## 讀取訊息
<a name="streammanagerclient-read-messages"></a>

從串流讀取訊息。

### 要求
<a name="streammanagerclient-read-messages-reqs"></a>

此操作有下列需求：
+ <a name="streammanagerclient-min-sm-sdk"></a>最低串流管理員 SDK 版本：Python：1.1.0  \$1  Java：1.1.0  \$1  Node.js：1.1.0

### 範例
<a name="streammanagerclient-read-messages-examples"></a>

下列程式碼片段可從名為 `StreamName` 的串流讀取訊息。讀取方法需要一個選用的 `ReadMessagesOptions` 物件以指定序號，從要讀取的最小、最大數字和讀取訊息的逾時開始讀取。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    message_list = client.read_messages(
        stream_name="StreamName",
        # By default, if no options are specified, it tries to read one message from the beginning of the stream.
        options=ReadMessagesOptions(
            desired_start_sequence_number=100,
            # Try to read from sequence number 100 or greater. By default, this is 0.
            min_message_count=10,
            # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
            max_message_count=100,    # Accept up to 100 messages. By default this is 1.
            read_timeout_millis=5000
            # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
        )
    )
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[Read\$1messages](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.read_messages) \$1 [ReadMessagesOptions](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.ReadMessagesOptions)

------
#### [ Java ]

```
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
    List<Message> messages = client.readMessages("StreamName",
            // By default, if no options are specified, it tries to read one message from the beginning of the stream.
            new ReadMessagesOptions()
                    // Try to read from sequence number 100 or greater. By default this is 0.
                    .withDesiredStartSequenceNumber(100L)
                    // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1.
                    .withMinMessageCount(10L)
                    // Accept up to 100 messages. By default this is 1.
                    .withMaxMessageCount(100L)
                    // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
                    .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis())
    );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[readMessages](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#readMessages-java.lang.String-com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions-) \$1 [ReadMessagesOptions](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/ReadMessagesOptions.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const messages = await client.readMessages("StreamName",
            // By default, if no options are specified, it tries to read one message from the beginning of the stream.
            new ReadMessagesOptions()
                // Try to read from sequence number 100 or greater. By default this is 0.
                .withDesiredStartSequenceNumber(100)
                // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1.
                .withMinMessageCount(10)
                // Accept up to 100 messages. By default this is 1.
                .withMaxMessageCount(100)
                // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception.
                .withReadTimeoutMillis(5 * 1000)
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[readMessages](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#readMessages) \$1 [ReadMessagesOptions](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.ReadMessagesOptions.html)

------

## 列出串流
<a name="streammanagerclient-list-streams"></a>

取得串流管理員中的串流清單。

### 要求
<a name="streammanagerclient-list-streams-reqs"></a>

此操作有下列需求：
+ <a name="streammanagerclient-min-sm-sdk"></a>最低串流管理員 SDK 版本：Python：1.1.0  \$1  Java：1.1.0  \$1  Node.js：1.1.0

### 範例
<a name="streammanagerclient-list-streams-examples"></a>

下列程式碼片段可獲取串流管理員中的串流清單 (依據名稱)。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    stream_names = client.list_streams()
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[list\$1streams](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.list_streams)

------
#### [ Java ]

```
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
    List<String> streamNames = client.listStreams();
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[listStreams](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#listStreams--)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const streams = await client.listStreams();
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[listStreams](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#listStreams)

------

## 描述訊息串流
<a name="streammanagerclient-describe-message-stream"></a>

取得串流的中繼資料，包括串流定義、大小和匯出狀態。

### 要求
<a name="streammanagerclient-describe-message-stream-reqs"></a>

此操作有下列需求：
+ <a name="streammanagerclient-min-sm-sdk"></a>最低串流管理員 SDK 版本：Python：1.1.0  \$1  Java：1.1.0  \$1  Node.js：1.1.0

### 範例
<a name="streammanagerclient-describe-message-stream-examples"></a>

下列程式碼片段可獲取名為 `StreamName` 的串流相關的中繼資料，包括串流定義、大小和匯出工具狀態。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    stream_description = client.describe_message_stream(stream_name="StreamName")
    if stream_description.export_statuses[0].error_message:
        # The last export of export destination 0 failed with some error
        # Here is the last sequence number that was successfully exported
        stream_description.export_statuses[0].last_exported_sequence_number
 
    if (stream_description.storage_status.newest_sequence_number >
            stream_description.export_statuses[0].last_exported_sequence_number):
        pass
        # The end of the stream is ahead of the last exported sequence number
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考： [describe\$1message\$1stream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.describe_message_stream)

------
#### [ Java ]

```
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
    MessageStreamInfo description = client.describeMessageStream("StreamName");
    String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage();
    if (lastErrorMessage != null && !lastErrorMessage.equals("")) {
        // The last export of export destination 0 failed with some error.
        // Here is the last sequence number that was successfully exported.
        description.getExportStatuses().get(0).getLastExportedSequenceNumber();
    }
 
    if (description.getStorageStatus().getNewestSequenceNumber() >
            description.getExportStatuses().get(0).getLastExportedSequenceNumber()) {
        // The end of the stream is ahead of the last exported sequence number.
    }
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考： [describeMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#describeMessageStream-java.lang.String-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const description = await client.describeMessageStream("StreamName");
        const lastErrorMessage = description.exportStatuses[0].errorMessage;
        if (lastErrorMessage) {
            // The last export of export destination 0 failed with some error.
            // Here is the last sequence number that was successfully exported.
            description.exportStatuses[0].lastExportedSequenceNumber;
        }
 
        if (description.storageStatus.newestSequenceNumber >
            description.exportStatuses[0].lastExportedSequenceNumber) {
            // The end of the stream is ahead of the last exported sequence number.
        }
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考： [describeMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#describeMessageStream)

------

## 更新訊息串流
<a name="streammanagerclient-update-message-stream"></a>

更新現有串流的屬性。如果您的需求在建立串流之後有所變更，建議您更新串流。例如：
+ 新增 AWS 雲端 目的地的新[匯出組態](stream-export-configurations.md)。
+ 增加串流的大小上限，以變更資料匯出或保留的方式。例如，串流大小結合您在完整設定上的策略，可能會導致資料遭到刪除或拒絕，然後串流管理員才能處理資料。
+ 暫停和繼續匯出；例如，如果匯出任務長時間執行，而且您想要對上傳資料進行評價。

您的 Greengrass 元件遵循此高階程序來更新串流：

1. [取得串流的描述。](#streammanagerclient-describe-message-stream)

1. 更新對應`MessageStreamDefinition`和次級物件的目標屬性。

1. 在更新的 中傳遞 `MessageStreamDefinition`。請務必包含更新串流的完整物件定義。未定義的屬性會還原為預設值。

   您可以指定要用作匯出中起始訊息的訊息序號。

### 要求
<a name="streammanagerclient-update-message-streamreqs"></a>

此操作有下列需求：
+ <a name="streammanagerclient-min-sm-sdk"></a>最低串流管理員 SDK 版本：Python：1.1.0  \$1  Java：1.1.0  \$1  Node.js：1.1.0

### 範例
<a name="streammanagerclient-update-message-stream-examples"></a>

下列程式碼片段會更新名為 的串流`StreamName`。它會更新匯出至 Kinesis Data Streams 之串流的多個屬性。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    message_stream_info = client.describe_message_stream(STREAM_NAME)
    message_stream_info.definition.max_size=536870912
    message_stream_info.definition.stream_segment_size=33554432
    message_stream_info.definition.time_to_live_millis=3600000
    message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData
    message_stream_info.definition.persistence=Persistence.Memory
    message_stream_info.definition.flush_on_write=False
    message_stream_info.definition.export_definition.kinesis=
        [KinesisConfig(    
            # Updating Export definition to add a Kinesis Stream configuration.
            identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))]
    client.update_message_stream(message_stream_info.definition)
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[updateMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.update_message_stream) \$1 [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.MessageStreamDefinition)

------
#### [ Java ]

```
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) {
    MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME);
    // Update the message stream with new values.
    client.updateMessageStream(
        messageStreamInfo.getDefinition()
            .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data.
            // Max Size update should be greater than initial Max Size defined in Create Message Stream request
            .withMaxSize(536870912L) // Update Max Size to 512 MB.
            .withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB.
            .withFlushOnWrite(true) // Update flush on write to true.
            .withPersistence(Persistence.Memory) // Update the persistence to Memory.
            .withTimeToLiveMillis(3600000L)    // Update TTL to 1 hour.
            .withExportDefinition(
                // Optional. Choose where/how the stream is exported to the AWS 雲端.
                messageStreamInfo.getDefinition().getExportDefinition().
                    // Updating Export definition to add a Kinesis Stream configuration.
                    .withKinesis(new ArrayList<KinesisConfig>() {{
                        add(new KinesisConfig()
                            .withIdentifier(EXPORT_IDENTIFIER)
                            .withKinesisStreamName("test"));
                        }})
            );
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[update\$1message\$1stream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#updateMessageStream-java.lang.String-) \$1 [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/MessageStreamDefinition.html)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        const messageStreamInfo = await c.describeMessageStream(STREAM_NAME);
        await client.updateMessageStream(
            messageStreamInfo.definition
                // Max Size update should be greater than initial Max Size defined in Create Message Stream request
                .withMaxSize(536870912)    // Default is 256 MB. Updating Max Size to 512 MB.
                .withStreamSegmentSize(33554432)    // Default is 16 MB. Updating Segment Size to 32 MB.
                .withTimeToLiveMillis(3600000)    // By default, no TTL is enabled. Update TTL to 1 hour.
                .withStrategyOnFull(StrategyOnFull.RejectNewData)    // Required. Updating Strategy on full to reject new data.
                .withPersistence(Persistence.Memory)    // Default is File. Update the persistence to Memory
                .withFlushOnWrite(true)    // Default is false. Updating to true.
                .withExportDefinition(    
                    // Optional. Choose where/how the stream is exported to the AWS 雲端.
                    messageStreamInfo.definition.exportDefinition
                        // Updating Export definition to add a Kinesis Stream configuration.
                        .withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())])
                )
        );
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[updateMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#updateMessageStream) \$1 [MessageStreamDefinition](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.MessageStreamDefinition.html)

------

### 更新串流的限制
<a name="streammanagerclient-update-constraints"></a>

更新串流時，適用下列限制條件。除非以下清單中另有說明，否則更新會立即生效。
+ 您無法更新串流的持久性。若要變更此行為，[請刪除串流](#streammanagerclient-delete-message-stream)並[建立定義新持久性政策的串流](#streammanagerclient-create-message-stream)。
+ 您只能在下列情況下更新串流的大小上限：
  + 大小上限必須大於或等於串流的目前大小。<a name="messagestreaminfo-describe-stream"></a>若要尋找此資訊，[請描述串流](#streammanagerclient-describe-message-stream)，然後檢查傳回`MessageStreamInfo`物件的儲存狀態。
  + 大小上限必須大於或等於串流的區段大小。
+ 您可以將串流區段大小更新為小於串流大小上限的值。更新的設定會套用至新的客群。
+ 存留時間 (TTL) 屬性的更新適用於新的附加操作。如果您減少此值，串流管理員也可能會刪除超過 TTL 的現有區段。
+ 完整屬性策略的更新適用於新的附加操作。如果您設定策略來覆寫最舊的資料，串流管理員也可能會根據新設定覆寫現有的區段。
+ 寫入屬性排清的更新會套用至新訊息。
+ 匯出組態的更新會套用至新的匯出。更新請求必須包含您要支援的所有匯出組態。否則，串流管理員會刪除它們。
  + 當您更新匯出組態時，請指定目標匯出組態的識別符。
  + 若要新增匯出組態，請指定新匯出組態的唯一識別符。
  + 若要刪除匯出組態，請省略匯出組態。
+ 若要[更新](#streammanagerclient-update-message-stream)串流中匯出組態的起始序號，您必須指定小於最新序號的值。<a name="messagestreaminfo-describe-stream"></a>若要尋找此資訊，[請描述串流](#streammanagerclient-describe-message-stream)，然後檢查傳回`MessageStreamInfo`物件的儲存狀態。

## 刪除訊息串流
<a name="streammanagerclient-delete-message-stream"></a>

刪除串流。刪除串流時，磁碟中該串流的所有儲存資料都會刪除。

### 要求
<a name="streammanagerclient-delete-message-stream-reqs"></a>

此操作有下列需求：
+ <a name="streammanagerclient-min-sm-sdk"></a>最低串流管理員 SDK 版本：Python：1.1.0  \$1  Java：1.1.0  \$1  Node.js：1.1.0

### 範例
<a name="streammanagerclient-delete-message-stream-examples"></a>

下列程式碼片段會刪除名為 `StreamName` 的串流。

------
#### [ Python ]

```
client = StreamManagerClient()
 
try:
    client.delete_message_stream(stream_name="StreamName")
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[deleteMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.delete_message_stream)

------
#### [ Java ]

```
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
    client.deleteMessageStream("StreamName");
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[Delete\$1message\$1stream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#deleteMessageStream-java.lang.String-)

------
#### [ Node.js ]

```
const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        await client.deleteMessageStream("StreamName");
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[deleteMessageStream](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#deleteMessageStream)

------

## 另請參閱
<a name="work-with-streams-see-also"></a>
+ [管理 Greengrass 核心裝置上的資料串流](manage-data-streams.md)
+ [設定 AWS IoT Greengrass 串流管理員](configure-stream-manager.md)
+ [匯出支援 AWS 雲端 目的地的組態](stream-export-configurations.md)
+ `StreamManagerClient` 串流管理員 SDK 參考中的 ：
  + [Python](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html)
  + [Java](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html)
  + [Node.js](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html)

# 匯出支援 AWS 雲端 目的地的組態
<a name="stream-export-configurations"></a>

使用者定義的 Greengrass 元件會在 Stream Manager SDK `StreamManagerClient`中使用 與串流管理員互動。當元件[建立串流](work-with-streams.md#streammanagerclient-create-message-stream)或[更新串流](work-with-streams.md#streammanagerclient-create-message-stream)時，它會傳遞代表串流屬性的`MessageStreamDefinition`物件，包括匯出定義。`ExportDefinition` 物件包含為串流定義的匯出組態。串流管理員使用這些匯出組態來判斷匯出串流的位置和方式。

![\[ExportDefinition 屬性類型的物件模型圖表。\]](http://docs.aws.amazon.com/zh_tw/greengrass/v2/developerguide/images/stream-manager-exportconfigs.png)


您可以在串流上定義零或多個匯出組態，包括單一目的地類型的多個匯出組態。例如，您可以將串流匯出至兩個 AWS IoT Analytics 頻道和一個 Kinesis 資料串流。

對於失敗的匯出嘗試，串流管理員會持續重試 AWS 雲端 以最多五分鐘的間隔將資料匯出至 。重試嘗試次數沒有上限。

**注意**  
<a name="streammanagerclient-http-config"></a>`StreamManagerClient` 也提供目標目的地，可用來將串流匯出至 HTTP 伺服器。此目標僅供測試之用。它不穩定或不支援在生產環境中使用。

**Topics**
+ [AWS IoT Analytics 頻道](#export-to-iot-analytics)
+ [Amazon Kinesis 資料串流](#export-to-kinesis)
+ [AWS IoT SiteWise 資產屬性](#export-to-iot-sitewise)
+ [Amazon S3 物件](#export-to-s3)

您需負責維護這些 AWS 雲端 資源。

## AWS IoT Analytics 頻道
<a name="export-to-iot-analytics"></a>

串流管理員支援自動匯出至 AWS IoT Analytics。 <a name="ita-export-destination"></a>AWS IoT Analytics 可讓您對資料執行進階分析，以協助做出商業決策並改善機器學習模型。如需詳細資訊，請參閱*AWS IoT Analytics 《 使用者指南*》中的[什麼是 AWS IoT Analytics？](https://docs.aws.amazon.com/iotanalytics/latest/userguide/welcome.html)。

在 Stream Manager SDK 中，您的 Greengrass 元件會使用 `IoTAnalyticsConfig` 來定義此目的地類型的匯出組態。如需詳細資訊，請參閱目標語言的 SDK 參考：
+ Python SDK 中的 [IoTAnalyticsConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.IoTAnalyticsConfig) 
+ Java 開發套件中的 [IoTAnalyticsConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/export/IoTAnalyticsConfig.html) 
+ Node.js SDK 中的 [IoTAnalyticsConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.IoTAnalyticsConfig.html) 

### 要求
<a name="export-to-iot-analytics-reqs"></a>

此匯出目的地有下列需求：
+ 中的目標頻道 AWS IoT Analytics 必須與 Greengrass 核心裝置位於相同的 AWS 帳戶 和 AWS 區域 中。
+ 必須[授權核心裝置與 AWS 服務互動](device-service-role.md)允許 以頻道為目標的`iotanalytics:BatchPutMessage`許可。例如：

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "iotanalytics:BatchPutMessage"
        ],
        "Resource": [
          "arn:aws:iotanalytics:us-east-1:123456789012:channel/channel_1_name",
          "arn:aws:iotanalytics:us-east-1:123456789012:channel/channel_2_name"
        ]
      }
    ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以使用萬用字元`*`命名機制，授予對資源的精細或條件式存取。如需詳細資訊，請參閱《[IAM 使用者指南》中的新增和移除 IAM 政策](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。 **

### 匯出至 AWS IoT Analytics
<a name="export-streams-to-iot-analytics"></a>

若要建立匯出至 的串流 AWS IoT Analytics，您的 Greengrass [元件會建立](work-with-streams.md#streammanagerclient-create-message-stream)包含一或多個`IoTAnalyticsConfig`物件的匯出定義串流。此物件會定義匯出設定，例如目標頻道、批次大小、批次間隔和優先順序。

當您的 Greengrass 元件從裝置接收資料時，它們會將包含資料 Blob [的訊息附加](work-with-streams.md#streammanagerclient-append-message)到目標串流。

然後，串流管理員會根據串流匯出組態中定義的批次設定和優先順序匯出資料。

## Amazon Kinesis 資料串流
<a name="export-to-kinesis"></a>

串流管理員支援自動匯出至 Amazon Kinesis Data Streams。<a name="aks-export-destination"></a>Kinesis Data Streams 通常用於彙總大量資料，並將其載入資料倉儲或 MapReduce 叢集。如需詳細資訊，請參閱[《Amazon Kinesis 開發人員指南》中的什麼是 Amazon Kinesis Data Streams？](https://docs.aws.amazon.com/streams/latest/dev/what-is-this-service.html)。 *Amazon Kinesis * 

在 Stream Manager SDK 中，您的 Greengrass 元件會使用 `KinesisConfig` 來定義此目的地類型的匯出組態。如需詳細資訊，請參閱目標語言的 SDK 參考：
+ Python SDK 中的 [KinesisConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.KinesisConfig) 
+ Java 開發套件中的 [KinesisConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/export/KinesisConfig.html) 
+ Node.js SDK 中的 [KinesisConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.KinesisConfig.html) 

### 要求
<a name="export-to-kinesis-reqs"></a>

此匯出目的地有下列需求：
+ Kinesis Data Streams 中的目標串流必須與 Greengrass 核心裝置位於相同的 AWS 帳戶 和 AWS 區域 中。
+ （建議） 串流管理員 v2.2.1 可改善將串流匯出至 Kinesis Data Streams 目的地的效能。若要使用此最新版本的改進功能，請將[串流管理員元件](stream-manager-component.md)升級至 v2.2.1，並使用 [Greengrass 權杖交換角色](device-service-role.md)中的`kinesis:ListShards`政策。
+ 必須[授權核心裝置與 AWS 服務互動](device-service-role.md)允許 以資料串流為目標的`kinesis:PutRecords`許可。例如：

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "kinesis:PutRecords"
        ],
        "Resource": [
          "arn:aws:kinesis:us-east-1:123456789012:stream/stream_1_name",
          "arn:aws:kinesis:us-east-1:123456789012:stream/stream_2_name"
        ]
      }
    ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以使用萬用字元`*`命名機制，授予對資源的精細或條件式存取。如需詳細資訊，請參閱《[IAM 使用者指南》中的新增和移除 IAM 政策](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。 **

### 匯出至 Kinesis Data Streams
<a name="export-streams-to-kinesis"></a>

若要建立匯出至 Kinesis Data Streams 的串流，您的 Greengrass [元件會建立](work-with-streams.md#streammanagerclient-create-message-stream)包含一或多個`KinesisConfig`物件的匯出定義串流。此物件會定義匯出設定，例如目標資料串流、批次大小、批次間隔和優先順序。

當您的 Greengrass 元件從裝置接收資料時，它們會將包含資料 Blob [的訊息附加](work-with-streams.md#streammanagerclient-append-message)到目標串流。然後，串流管理員會根據串流匯出組態中定義的批次設定和優先順序匯出資料。

串流管理員會為每個上傳至 Amazon Kinesis 的記錄產生唯一的隨機 UUID 做為分割區索引鍵。

## AWS IoT SiteWise 資產屬性
<a name="export-to-iot-sitewise"></a>

串流管理員支援自動匯出至 AWS IoT SiteWise。 <a name="itsw-export-destination"></a>AWS IoT SiteWise 可讓您大規模收集、組織和分析工業設備的資料。如需詳細資訊，請參閱*AWS IoT SiteWise 《 使用者指南*》中的[什麼是 AWS IoT SiteWise？](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/what-is-sitewise.html)。

在 Stream Manager SDK 中，您的 Greengrass 元件會使用 `IoTSiteWiseConfig` 來定義此目的地類型的匯出組態。如需詳細資訊，請參閱目標語言的 SDK 參考：
+ Python SDK 中的 [IoTSiteWiseConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.IoTSiteWiseConfig) 
+ Java 開發套件中的 [IoTSiteWiseConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/export/IoTSiteWiseConfig.html) 
+ Node.js SDK 中的 [IoTSiteWiseConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.IoTSiteWiseConfig.html) 

**注意**  
AWS 也提供 AWS IoT SiteWise 元件，提供預先建置的解決方案，可讓您用來從 OPC-UA 來源串流資料。如需詳細資訊，請參閱[IoT SiteWise OPC UA 收集器](iotsitewise-opcua-collector-component.md)。

### 需求
<a name="export-to-iot-sitewise-reqs"></a>

此匯出目的地有下列需求：
+ 中的目標資產屬性 AWS IoT SiteWise 必須與 Greengrass 核心裝置位於相同的 AWS 帳戶 和 AWS 區域 中。
**注意**  
如需 AWS 區域 AWS IoT SiteWise 支援的 清單，請參閱《 *AWS 一般參考*》中的[AWS IoT SiteWise 端點和配額](https://docs.aws.amazon.com/general/latest/gr/iot-sitewise.html#iot-sitewise_region)。
+ 必須[授權核心裝置與 AWS 服務互動](device-service-role.md)允許 以資產屬性為目標的`iotsitewise:BatchPutAssetPropertyValue`許可。下列範例政策使用 `iotsitewise:assetHierarchyPath`條件索引鍵來授予目標根資產及其子資產的存取權。您可以從`Condition`政策中移除 ，以允許存取所有 AWS IoT SiteWise 資產或指定個別資產ARNs。

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
         "Effect": "Allow",
         "Action": "iotsitewise:BatchPutAssetPropertyValue",
         "Resource": "*",
         "Condition": {
           "StringLike": {
             "iotsitewise:assetHierarchyPath": [
               "/root node asset ID",
               "/root node asset ID/*"
             ]
           }
         }
      }
    ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以使用萬用字元`*`命名機制，授予對資源的精細或條件式存取。如需詳細資訊，請參閱《[IAM 使用者指南》中的新增和移除 IAM 政策](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。 **

  如需重要的安全性資訊，請參閱*AWS IoT SiteWise 《 使用者指南*》中的 [ BatchPutAssetPropertyValue 授權](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/security_iam_service-with-iam.html#security_iam_service-with-iam-id-based-policies-batchputassetpropertyvalue-action)。

### 匯出至 AWS IoT SiteWise
<a name="export-streams-to-sitewise"></a>

若要建立匯出至 的串流 AWS IoT SiteWise，您的 Greengrass [元件會建立](work-with-streams.md#streammanagerclient-create-message-stream)包含一或多個`IoTSiteWiseConfig`物件的匯出定義串流。此物件會定義匯出設定，例如批次大小、批次間隔和優先順序。

當您的 Greengrass 元件從裝置接收資產屬性資料時，它們會將包含資料的訊息附加到目標串流。訊息是 JSON 序列化`PutAssetPropertyValueEntry`物件，其中包含一或多個資產屬性的屬性值。如需詳細資訊，請參閱[附加訊息](work-with-streams.md#streammanagerclient-append-message-sitewise)以取得 AWS IoT SiteWise 匯出目的地。

**注意**  
<a name="BatchPutAssetPropertyValue-data-reqs"></a>當您傳送資料至 時 AWS IoT SiteWise，您的資料必須符合 `BatchPutAssetPropertyValue`動作的要求。如需詳細資訊，請參閱《AWS IoT SiteWise API 參考》**中的 [BatchPutAssetPropertyValue](https://docs.aws.amazon.com/iot-sitewise/latest/APIReference/API_BatchPutAssetPropertyValue.html)。

然後，串流管理員會根據串流匯出組態中定義的批次設定和優先順序匯出資料。

您可以調整串流管理員設定和 Greengrass 元件邏輯，以設計匯出策略。例如：
+ 對於近乎即時的匯出，請設定低批次大小和間隔設定，並在接收資料時將資料附加至串流。
+ 為了最佳化批次處理、減輕頻寬限制或將成本降至最低，您的 Greengrass 元件可以在將資料附加至串流之前，將單一資產屬性收到的timestamp-quality-value(TQV) 資料點合併。一種策略是在一個訊息中批次處理最多 10 個不同的屬性資產組合或屬性別名的項目，而不是為相同的屬性傳送多個項目。這有助於串流管理員保持在[AWS IoT SiteWise 配額](https://docs.aws.amazon.com/iot-sitewise/latest/userguide/quotas.html)內。

## Amazon S3 物件
<a name="export-to-s3"></a>

串流管理員支援自動匯出至 Amazon S3。<a name="s3-export-destination"></a>您可以使用 Amazon S3 來存放和擷取大量資料。如需詳細資訊，請參閱《[Amazon Simple Storage Service 開發人員指南》中的什麼是 Amazon S3？](https://docs.aws.amazon.com/AmazonS3/latest/dev/Welcome.html)。 **

在 Stream Manager SDK 中，您的 Greengrass 元件會使用 `S3ExportTaskExecutorConfig` 來定義此目的地類型的匯出組態。如需詳細資訊，請參閱目標語言的 SDK 參考：
+ Python SDK 中的 [S3ExportTaskExecutorConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.S3ExportTaskExecutorConfig) 
+ Java 開發套件中的 [S3ExportTaskExecutorConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/export/S3ExportTaskExecutorConfig.html) 
+ Node.js SDK 中的 [S3ExportTaskExecutorConfig](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.S3ExportTaskExecutorConfig.html) 

### 要求
<a name="export-to-s3-reqs"></a>

此匯出目的地有下列需求：
+ 目標 Amazon S3 儲存貯體必須與 Greengrass 核心裝置 AWS 帳戶 位於相同的 中。
+ 如果在 **Greengrass 容器**模式下執行的 Lambda 函數將輸入檔案寫入輸入檔案目錄，您必須將目錄掛載為具有寫入許可的容器中的磁碟區。這可確保檔案寫入根檔案系統，並在容器外部執行的串流管理員元件可見。
+ 如果 Docker 容器元件將輸入檔案寫入輸入檔案目錄，您必須將目錄掛載為具有寫入許可的容器中的磁碟區。這可確保檔案寫入根檔案系統，並在容器外部執行的串流管理員元件可見。
+ 必須[授權核心裝置與 AWS 服務互動](device-service-role.md)允許目標儲存貯體的下列許可。例如：

------
#### [ JSON ]

****  

  ```
  {
    "Version":"2012-10-17",		 	 	 
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "s3:PutObject",
          "s3:AbortMultipartUpload",
          "s3:ListMultipartUploadParts"
        ],
        "Resource": [
          "arn:aws:s3:::bucket-1-name/*",
          "arn:aws:s3:::bucket-2-name/*"
        ]
      }
    ]
  }
  ```

------

  <a name="wildcards-grant-granular-conditional-access"></a>您可以使用萬用字元`*`命名機制，授予對資源的精細或條件式存取。如需詳細資訊，請參閱《[IAM 使用者指南》中的新增和移除 IAM 政策](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html)。 **

### 匯出至 Amazon S3
<a name="export-streams-to-s3"></a>

若要建立匯出至 Amazon S3 的串流，您的 Greengrass 元件會使用 `S3ExportTaskExecutorConfig` 物件來設定匯出政策。政策會定義匯出設定，例如分段上傳閾值和優先順序。對於 Amazon S3 匯出，串流管理員會上傳從核心裝置上的本機檔案讀取的資料。若要啟動上傳，您的 Greengrass 元件會將匯出任務附加至目標串流。匯出任務包含輸入檔案和目標 Amazon S3 物件的相關資訊。串流管理員會以任務附加至串流的順序執行任務。

**注意**  
 <a name="bucket-not-key-must-exist"></a>目標儲存貯體必須已存在於您的 中 AWS 帳戶。如果指定金鑰的物件不存在，串流管理員會為您建立物件。

串流管理員使用分段上傳閾值屬性、[最小分段大小](configure-stream-manager.md#stream-manager-minimum-part-size)設定和輸入檔案的大小來決定如何上傳資料。分段上傳閾值必須大於或等於最小分段大小。如果您想要平行上傳資料，您可以建立多個串流。

指定目標 Amazon S3 物件的金鑰可以在`!{timestamp:value}`預留位置中包含有效的 [Java DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html) 字串。您可以根據輸入檔案資料上傳的時間，使用這些時間戳記預留位置來分割 Amazon S3 中的資料。例如，下列金鑰名稱會解析為 等值`my-key/2020/12/31/data.txt`。

```
my-key/!{timestamp:YYYY}/!{timestamp:MM}/!{timestamp:dd}/data.txt
```

**注意**  
如果您想要監控串流的匯出狀態，請先建立狀態串流，然後將匯出串流設定為使用它。如需詳細資訊，請參閱[監控匯出任務](#monitor-export-status-s3)。

#### 管理輸入資料
<a name="manage-s3-input-data"></a>

您可以編寫程式碼，讓 IoT 應用程式用來管理輸入資料的生命週期。下列工作流程範例示範如何使用 Greengrass 元件來管理此資料。

1. 本機程序會從裝置或周邊接收資料，然後將資料寫入核心裝置上的目錄中的檔案。這些是串流管理員的輸入檔案。

1. Greengrass 元件會掃描目錄，並在[建立新檔案時將匯出任務附加](work-with-streams.md#streammanagerclient-append-message-export-task)至目標串流。任務是 JSON 序列化`S3ExportTaskDefinition`物件，可指定輸入檔案的 URL、目標 Amazon S3 儲存貯體和金鑰，以及選用的使用者中繼資料。

1. 串流管理員會讀取輸入檔案，並依附加任務的順序將資料匯出至 Amazon S3。<a name="bucket-not-key-must-exist"></a>目標儲存貯體必須已存在於您的 中 AWS 帳戶。如果指定金鑰的物件不存在，串流管理員會為您建立物件。

1. Greengrass 元件會從狀態串流[讀取訊息](work-with-streams.md#streammanagerclient-read-messages)，以監控匯出狀態。匯出任務完成後，Greengrass 元件可以刪除對應的輸入檔案。如需詳細資訊，請參閱[監控匯出任務](#monitor-export-status-s3)。

### 監控匯出任務
<a name="monitor-export-status-s3"></a>

您可以編寫程式碼，讓 IoT 應用程式用來監控 Amazon S3 匯出的狀態。您的 Greengrass 元件必須建立狀態串流，然後將匯出串流設定為將狀態更新寫入狀態串流。單一狀態串流可以從匯出至 Amazon S3 的多個串流接收狀態更新。

首先，[建立](work-with-streams.md#streammanagerclient-create-message-stream)要用作狀態串流的串流。您可以設定串流的大小和保留政策，以控制狀態訊息的生命週期。例如：
+ `Memory` 如果您不想儲存狀態訊息，請將 `Persistence`設定為 。
+ `StrategyOnFull` 設定為 ，`OverwriteOldestData`以免遺失新的狀態訊息。

然後，建立或更新匯出串流以使用狀態串流。具體而言，請設定串流`S3ExportTaskExecutorConfig`匯出組態的狀態組態屬性。此設定會告知串流管理員將有關匯出任務的狀態訊息寫入狀態串流。在 `StatusConfig` 物件中，指定狀態串流的名稱和詳細程度。下列支援的值範圍從最詳細 (`ERROR`) 到最詳細 (`TRACE`)。預設值為 `INFO`。
+ `ERROR`
+ `WARN`
+ `INFO`
+ `DEBUG`
+ `TRACE`

下列工作流程範例顯示 Greengrass 元件如何使用狀態串流來監控匯出狀態。

1. 如先前工作流程所述，Greengrass 元件[會將匯出任務附加](work-with-streams.md#streammanagerclient-append-message-export-task)至設定為將有關匯出任務的狀態訊息寫入狀態串流的串流。附加操作會傳回代表任務 ID 的序號。

1. Greengrass 元件會從狀態串流循序[讀取訊息](work-with-streams.md#streammanagerclient-read-messages)，然後根據串流名稱和任務 ID 或根據訊息內容的匯出任務屬性來篩選訊息。例如，Greengrass 元件可以依匯出任務的輸入檔案 URL 進行篩選，該 URL 由訊息內容中的`S3ExportTaskDefinition`物件表示。

   下列狀態碼表示匯出任務已達到已完成狀態：
   + `Success`。 上傳已成功完成。
   + `Failure`。 串流管理員遇到錯誤，例如，指定的儲存貯體不存在。解決問題後，您可以再次將匯出任務附加至串流。
   + `Canceled`。 任務已停止，因為串流或匯出定義已刪除，或任務的time-to-live(TTL) 期間已過期。
**注意**  
任務也可能的狀態為 `InProgress`或 `Warning`。當事件傳回不會影響任務執行的錯誤時，串流管理員會發出警告。例如，無法清除部分上傳會傳回警告。

1. 匯出任務完成後，Greengrass 元件可以刪除對應的輸入檔案。

下列範例顯示 Greengrass 元件如何讀取和處理狀態訊息。

------
#### [ Python ]

```
import time
from stream_manager import (
    ReadMessagesOptions,
    Status,
    StatusConfig,
    StatusLevel,
    StatusMessage,
    StreamManagerClient,
)
from stream_manager.util import Util

client = StreamManagerClient()
 
try:
    # Read the statuses from the export status stream
    is_file_uploaded_to_s3 = False
    while not is_file_uploaded_to_s3:
        try:
            messages_list = client.read_messages(
                "StatusStreamName", ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000)
            )
            for message in messages_list:
                # Deserialize the status message first.
                status_message = Util.deserialize_json_bytes_to_obj(message.payload, StatusMessage)

                # Check the status of the status message. If the status is "Success",
                # the file was successfully uploaded to S3.
                # If the status was either "Failure" or "Cancelled", the server was unable to upload the file to S3.
                # We will print the message for why the upload to S3 failed from the status message.
                # If the status was "InProgress", the status indicates that the server has started uploading
                # the S3 task.
                if status_message.status == Status.Success:
                    logger.info("Successfully uploaded file at path " + file_url + " to S3.")
                    is_file_uploaded_to_s3 = True
                elif status_message.status == Status.Failure or status_message.status == Status.Canceled:
                    logger.info(
                        "Unable to upload file at path " + file_url + " to S3. Message: " + status_message.message
                    )
                    is_file_uploaded_to_s3 = True
            time.sleep(5)
        except StreamManagerException:
            logger.exception("Exception while running")
except StreamManagerException:
    pass
    # Properly handle errors.
except ConnectionError or asyncio.TimeoutError:
    pass
    # Properly handle errors.
```

Python SDK 參考：[read\$1messages](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.streammanagerclient.html#stream_manager.streammanagerclient.StreamManagerClient.read_messages) \$1 [StatusMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-python/_apidoc/stream_manager.data.html#stream_manager.data.StatusMessage)

------
#### [ Java ]

```
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient;
import com.amazonaws.greengrass.streammanager.client.StreamManagerClientFactory;
import com.amazonaws.greengrass.streammanager.client.utils.ValidateAndSerialize;
import com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions;
import com.amazonaws.greengrass.streammanager.model.Status;
import com.amazonaws.greengrass.streammanager.model.StatusConfig;
import com.amazonaws.greengrass.streammanager.model.StatusLevel;
import com.amazonaws.greengrass.streammanager.model.StatusMessage;

 try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) {
    try {
        boolean isS3UploadComplete = false;
        while (!isS3UploadComplete) {
            try {
                // Read the statuses from the export status stream
                List<Message> messages = client.readMessages("StatusStreamName",
                    new ReadMessagesOptions().withMinMessageCount(1L).withReadTimeoutMillis(1000L));
                for (Message message : messages) {
                    // Deserialize the status message first.
                    StatusMessage statusMessage = ValidateAndSerialize.deserializeJsonBytesToObj(message.getPayload(), StatusMessage.class);
                    // Check the status of the status message. If the status is "Success", the file was successfully uploaded to S3.
                    // If the status was either "Failure" or "Canceled", the server was unable to upload the file to S3.
                    // We will print the message for why the upload to S3 failed from the status message.
                    // If the status was "InProgress", the status indicates that the server has started uploading the S3 task.
                    if (Status.Success.equals(statusMessage.getStatus())) {
                        System.out.println("Successfully uploaded file at path " + FILE_URL + " to S3.");
                        isS3UploadComplete = true;
                     } else if (Status.Failure.equals(statusMessage.getStatus()) || Status.Canceled.equals(statusMessage.getStatus())) {
                        System.out.println(String.format("Unable to upload file at path %s to S3. Message %s",
                            statusMessage.getStatusContext().getS3ExportTaskDefinition().getInputUrl(),
                            statusMessage.getMessage()));
                        sS3UploadComplete = true;
                    }
                }
            } catch (StreamManagerException ignored) {
            } finally {
                // Sleep for sometime for the S3 upload task to complete before trying to read the status message.
                Thread.sleep(5000);
            }
        } catch (e) {
        // Properly handle errors.
    }
} catch (StreamManagerException e) {
    // Properly handle exception.
}
```

Java 開發套件參考：[readMessages](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/client/StreamManagerClient.html#readMessages-java.lang.String-com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions-) \$1 [StatusMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-java/com/amazonaws/greengrass/streammanager/model/StatusMessage.html)

------
#### [ Node.js ]

```
const {
    StreamManagerClient, ReadMessagesOptions,
    Status, StatusConfig, StatusLevel, StatusMessage,
    util,
} = require(*'aws-greengrass-stream-manager-sdk'*);

const client = new StreamManagerClient();
client.onConnected(async () => {
    try {
        let isS3UploadComplete = false;
        while (!isS3UploadComplete) {
            try {
                // Read the statuses from the export status stream
                const messages = await c.readMessages("StatusStreamName",
                    new ReadMessagesOptions()
                        .withMinMessageCount(1)
                        .withReadTimeoutMillis(1000));

                messages.forEach((message) => {
                    // Deserialize the status message first.
                    const statusMessage = util.deserializeJsonBytesToObj(message.payload, StatusMessage);
                    // Check the status of the status message. If the status is 'Success', the file was successfully uploaded to S3.
                    // If the status was either 'Failure' or 'Cancelled', the server was unable to upload the file to S3.
                    // We will print the message for why the upload to S3 failed from the status message.
                    // If the status was "InProgress", the status indicates that the server has started uploading the S3 task.
                    if (statusMessage.status === Status.Success) {
                        console.log(`Successfully uploaded file at path ${FILE_URL} to S3.`);
                        isS3UploadComplete = true;
                    } else if (statusMessage.status === Status.Failure || statusMessage.status === Status.Canceled) {
                        console.log(`Unable to upload file at path ${FILE_URL} to S3. Message: ${statusMessage.message}`);
                        isS3UploadComplete = true;
                    }
                });
                // Sleep for sometime for the S3 upload task to complete before trying to read the status message.
                await new Promise((r) => setTimeout(r, 5000));
            } catch (e) {
                // Ignored
            }
    } catch (e) {
        // Properly handle errors.
    }
});
client.onError((err) => {
    // Properly handle connection errors.
    // This is called only when the connection to the StreamManager server fails.
});
```

Node.js SDK 參考：[readMessages](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StreamManagerClient.html#readMessages) \$1 [StatusMessage](https://aws-greengrass.github.io/aws-greengrass-stream-manager-sdk-js/aws-greengrass-core-sdk.StreamManager.StatusMessage.html)

------