

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 Amazon Kinesis Producer Library (KPL) 開發生產者
<a name="developing-producers-with-kpl"></a>

Amazon Kinesis Data Streams 生產者是將使用者資料記錄放入 Kinesis 資料串流 (又稱為*資料擷取*) 的應用程式。Amazon Kinesis Producer Library (KPL) 可簡化生產者應用程式開發，讓開發人員達到 Kinesis 資料串流的高寫入輸送量。

您可以使用 Amazon CloudWatch 對 KPL 進行監控。如需詳細資訊，請參閱[使用 Amazon CloudWatch 監控 Kinesis Producer Library](monitoring-with-kpl.md)。

**Topics**
+ [檢閱 KPL 的角色](#developing-producers-with-kpl-role)
+ [了解使用 KPL 的優點](#developing-producers-with-kpl-advantage)
+ [了解何時不使用 KPL](#developing-producers-with-kpl-when)
+ [安裝 KPL](kinesis-kpl-dl-install.md)
+ [從 KPL 0.x 遷移至 KPL 1.x](kpl-migration-1x.md)
+ [轉換至 KPL 的 Amazon Trust Services (ATS) 憑證](kinesis-kpl-upgrades.md)
+ [KPL 支援的平台](kinesis-kpl-supported-plats.md)
+ [KPL 關鍵概念](kinesis-kpl-concepts.md)
+ [整合 KPL 與生產者程式碼](kinesis-kpl-integration.md)
+ [使用 KPL 寫入 Kinesis 資料串流](kinesis-kpl-writing.md)
+ [設定 Amazon Kinesis Producer Library](kinesis-kpl-config.md)
+ [實作消費者取消彙總](kinesis-kpl-consumer-deaggregation.md)
+ [搭配 Amazon Data Firehose 使用 KPL](kpl-with-firehose.md)
+ [使用 KPL 搭配 AWS Glue 結構描述登錄檔](kpl-with-schemaregistry.md)
+ [設定 KPL 代理組態](kpl-proxy-configuration.md)
+ [KPL 版本生命週期政策](kpl-version-lifecycle-policy.md)

**注意**  
建議您升級至最新 KPL 版本。KPL 會定期更新為更新的版本，其中包括最新的相依性和安全性修補程式、錯誤修正，以及向後相容的新功能。如需詳細資訊，請參閱 [https://github.com/awslabs/amazon-kinesis-producer/releases/](https://github.com/awslabs/amazon-kinesis-producer/releases/)。

## 檢閱 KPL 的角色
<a name="developing-producers-with-kpl-role"></a>

KPL 是一套具高度可設定性的易用程式庫，能協助您對 Kinesis 資料串流進行寫入。此程式庫在您的生產者應用程式的程式碼與 Kinesis Data Streams API 動作之間擔任媒介。KPL 將執行以下主要任務：
+ 利用可設定的自動重試機制對一個或多個 Kinesis 資料串流進行寫入
+ 收集記錄並於每次請求時使用 `PutRecords` 將多筆記錄寫入多個碎片
+ 彙整使用者記錄以增加承載大小並提高傳輸量
+ 與 [Kinesis Client Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) (KCL) 無縫整合以取消彙整取用者上的批次記錄
+ 代表您提交 Amazon CloudWatch 指標以提供關於生產者效能的資訊

請注意，KPL 與 [AWS SDK](https://aws.amazon.com/tools/) 所提供的 Kinesis Data Streams API 不同。Kinesis Data Streams API 可協助您管理 Kinesis Data Streams 的許多層面 (包括建立串流、重新分片、放入與取得記錄)，而 KPL 提供專用於擷取資料的抽象層。如需 Kinesis Data Streams API 的相關資訊，請參閱 [Amazon Kinesis API 參考](https://docs.aws.amazon.com/kinesis/latest/APIReference/)。

## 了解使用 KPL 的優點
<a name="developing-producers-with-kpl-advantage"></a>

以下清單舉出使用 KPL 開發 Kinesis Data Streams 生產者的一些主要優點。

KPL 可用於同步或非同步使用案例。建議您使用具有較高效能的非同步界面，除非有具體的原因需要使用同步操作。如需以上兩種使用案例的詳細資訊及範例程式碼，請參閱[使用 KPL 寫入 Kinesis 資料串流](kinesis-kpl-writing.md)。

 **效能優勢**   
KPL 可協助建置高效能的生產者。試想以下情況：您的 Amazon EC2 執行個體擔任代理，從數以百計或千計的低功率裝置收集 100 位元組的事件並將記錄寫入 Kinesis 資料串流。這些 EC2 執行個體每秒均須將成千個事件寫入您的資料串流。為達到所需的傳輸量，生產者必須實作複雜的邏輯 (如批次處理或多執行緒) 和重試邏輯並在消費者端取消彙整記錄。KPL 將為您執行所有這些任務。

 **消費者端易用性**   
對於使用 KCL 的 Java 取用者端開發人員來說，KPL 整合毫不費力。當 KCL 擷取含有多筆 KPL 使用者記錄的彙整 Kinesis Data Streams 記錄時，會自動叫用 KPL 先擷取個別的使用者記錄，然後再將其傳回給使用者。  
取用者端開發人員若未使用 KCL 而是直接使用 API `GetRecords` 操作，則可利用 KPL Java 程式庫擷取個別的使用者記錄後再將其傳回給使用者。

 **生產者監控**   
您可使用 Amazon CloudWatch 和 KPL 收集、監控及分析您的 Kinesis Data Streams 生產者。KPL 將代表您向 CloudWatch 發出輸送量指標、錯誤指標及其他指標，並可設定成進行串流層級、碎片層級或生產者層級監控。

 **非同步架構**   
由於 KPL 可能會在將記錄傳送到 Kinesis Data Streams 之前緩衝記錄，因此不會強制發起人應用程式封鎖並等待確認記錄已送達伺服器，然後再繼續執行時間。將記錄放入 KPL 的呼叫一律立即傳回，不會等待傳送記錄或接收伺服器的回應。反而，其將建立 `Future` 物件用以稍後接收傳送記錄至 Kinesis Data Streams 的結果。這與 AWS SDK 中的非同步用戶端的行為相同。

## 了解何時不使用 KPL
<a name="developing-producers-with-kpl-when"></a>

KPL 可能引發程式庫中額外的處理延遲，最高達到 `RecordMaxBufferedTime` (使用者可設定)。`RecordMaxBufferedTime` 的值愈大，壓縮效率以及效能愈高。無法容忍此額外延遲的應用程式可能需要直接使用 AWS SDK。如需搭配 Kinesis Data Streams 使用 AWS SDK 的詳細資訊，請參閱 [使用 Amazon Kinesis Data Streams API 搭配 開發生產者 適用於 Java 的 AWS SDK](developing-producers-with-sdk.md)。如需 `RecordMaxBufferedTime` 以及 KPL 可供使用者設定的其他各項屬性的詳細資訊，請參閱 [設定 Amazon Kinesis Producer Library](kinesis-kpl-config.md)。

# 安裝 KPL
<a name="kinesis-kpl-dl-install"></a>

Amazon 為 macOS、Windows 和最近的 Linux 發行版本提供預先建置的 C\$1\$1 Amazon Kinesis Producer Library (KPL) 二進位檔 （如需支援的平台詳細資訊，請參閱下一節）。這些二進位檔封裝於 Java .jar 檔案中，若您使用 Maven 安裝該套件，即會自動叫用並予使用。如要尋找最新版的 KPL 和 KCL，請使用以下 Maven 搜尋連結：
+ [KPL](https://search.maven.org/#search|ga|1|amazon-kinesis-producer)
+ [KCL](https://search.maven.org/#search|ga|1|amazon-kinesis-client)

Linux 二進位檔已使用 GNU 編譯器套件 (GCC) 進行編譯並靜態連結到 Linux 上的 libstdc\$1\$1。這些二進位檔應能適用於任何附有 glibc 2.5 或更高版本的 64 位元 Linux 發行版本。

舊版 Linux 發行版本的使用者可以使用 GitHub 上與來源一起提供的建置說明來建置 KPL。若要從 GitHub 下載 KPL，請參閱 [Amazon Kinesis Producer Library](https://github.com/awslabs/amazon-kinesis-producer)。

**重要**  
Amazon Kinesis Producer Library (KPL) 0.x 將於 2026 年 1 月 30 日end-of-support。我們**強烈建議**您使用 0.x 版將 KPL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KPL 版本。若要尋找最新的 KPL 版本，請參閱 [Github 上的 KPL 頁面](https://github.com/awslabs/amazon-kinesis-producer)。如需從 KPL 0.x 遷移至 KPL 1.x 的資訊，請參閱 [從 KPL 0.x 遷移至 KPL 1.x](kpl-migration-1x.md)。

# 從 KPL 0.x 遷移至 KPL 1.x
<a name="kpl-migration-1x"></a>

本主題提供step-by-step說明，將您的消費者從 KPL 0.x 遷移至 KPL 1.x。KPL 1.x 推出對 適用於 Java 的 AWS SDK 2.x 的支援，同時保持與舊版的介面相容性。您不需要更新核心資料處理邏輯，即可遷移至 KPL 1.x。

1. **請確定您有下列先決條件：**
   + Java 開發套件 (JDK) 8 或更新版本
   + 適用於 Java 的 AWS SDK 2.x
   + Maven 或 Gradle 用於相依性管理

1. **新增相依性**

   如果您使用的是 Maven，請將下列相依性新增至 pom.xml 檔案。請務必將 groupId 從 更新`com.amazonaws`為 ，`software.amazon.kinesis`並將 版本更新`1.x.x`為最新的 KPL 版本。

   ```
   <dependency>
       <groupId>software.amazon.kinesis</groupId>
       <artifactId>amazon-kinesis-producer</artifactId>
       <version>1.x.x</version> <!-- Use the latest version -->
   </dependency>
   ```

   如果您使用的是 Gradle，請將以下內容新增至您的 `build.gradle` 檔案。請務必將 取代`1.x.x`為最新的 KPL 版本。

   ```
   implementation 'software.amazon.kinesis:amazon-kinesis-producer:1.x.x'
   ```

   您可以在 [Maven Central Repository](https://central.sonatype.com/search?q=amazon-kinesis-producer) 上檢查最新版本的 KPL。

1. **更新 KPL 的匯入陳述式**

   KPL 1.x 使用 適用於 Java 的 AWS SDK 2.x，並使用以 開頭的更新套件名稱`software.amazon.kinesis`，相較於以 開頭的先前 KPL 中的套件名稱`com.amazonaws.services.kinesis`。

   將 的匯入取代`com.amazonaws.services.kinesis`為 `software.amazon.kinesis`。下表列出您必須取代的匯入。  
**匯入替換項目**    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/streams/latest/dev/kpl-migration-1x.html)

1. **更新 AWS 登入資料提供者類別的匯入陳述式**

   遷移至 KPL 1.x 時，您必須在以 適用於 Java 的 AWS SDK 1.x 為基礎的 KPL 應用程式程式碼中，將匯入中的套件和類別更新為以 適用於 Java 的 AWS SDK 2.x 為基礎的對應套件和類別。KPL 應用程式中的常見匯入是登入資料提供者類別。如需[登入資料提供者變更](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-client-credentials.html)的完整清單，請參閱 適用於 Java 的 AWS SDK 2.x 遷移指南文件中的登入資料提供者變更。以下是您可能需要在 KPL 應用程式中進行的常見匯入變更。

   **在 KPL 0.x 中匯入**

   ```
   import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
   ```

   **在 KPL 1.x 中匯入**

   ```
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   ```

   如果您根據 適用於 Java 的 AWS SDK 1.x 匯入任何其他登入資料提供者，則必須將其更新為 適用於 Java 的 AWS SDK 2.x 同等登入資料提供者。如果您未從 適用於 Java 的 AWS SDK 1.x 匯入任何類別/套件，您可以忽略此步驟。

1. **在 KPL 組態中更新登入資料提供者組態**

   KPL 1.x 中的登入資料提供者組態需要 適用於 Java 的 AWS SDK 2.x 登入資料提供者。如果您要透過覆寫預設登入資料提供者`KinesisProducerConfiguration`，在 中傳遞 適用於 Java 的 AWS SDK 1.x 的登入資料提供者，您必須使用 適用於 Java 的 AWS SDK 2.x 登入資料提供者進行更新。如需[登入資料提供者變更](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-client-credentials.html)的完整清單，請參閱 適用於 Java 的 AWS SDK 2.x 遷移指南文件中的登入資料提供者變更。如果您沒有覆寫 KPL 組態中的預設登入資料提供者，您可以忽略此步驟。

   例如，如果您使用下列程式碼覆寫 KPL 的預設登入資料提供者：

   ```
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   // SDK v1 default credentials provider
   config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain());
   ```

   您必須使用下列程式碼更新它們，才能使用 適用於 Java 的 AWS SDK 2.x 登入資料提供者：

   ```
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   // New SDK v2 default credentials provider
   config.setCredentialsProvider(DefaultCredentialsProvider.create());
   ```

# 轉換至 KPL 的 Amazon Trust Services (ATS) 憑證
<a name="kinesis-kpl-upgrades"></a>

於 2018 年 2 月 9 日上午 9:00 (太平洋標準時間)，Amazon Kinesis Data Streams 已安裝 ATS 憑證。若要繼續使用 Amazon Kinesis Producer Library (KPL) 將記錄寫入 Kinesis Data Streams，您必須將 KPL 的安裝升級至 [0.12.6 版或更新版本。](http://search.maven.org/#artifactdetails|com.amazonaws|amazon-kinesis-producer|0.12.6|jar)此變更會影響所有 AWS 區域。

如需有關移至 ATS 的資訊，請參閱[如何準備 AWS移至其自己的憑證授權單位](https://aws.amazon.com/blogs/security/how-to-prepare-for-aws-move-to-its-own-certificate-authority/)。

若您遇到問題而需要技術支援，請透過 AWS Support 中心[建立案例](https://console.aws.amazon.com/support/v1#/case/create)。

# KPL 支援的平台
<a name="kinesis-kpl-supported-plats"></a>

Amazon Kinesis Producer Library (KPL) 是以 C\$1\$1 撰寫，並做為主要使用者程序的子程序執行。預先編譯的 64 位元原生二進位檔隨附於 Java 版本，由 Java 包裝函式管理。

Java 套件在以下作業系統執行無須安裝任何其他程式庫：
+ 核心為 2.6.18 (2006 年 9 月) 及更新版本的 Linux 發行版本
+ Apple iOS X 10.9 及更新版本
+ Windows Server 2008 及更新版本
**重要**  
所有 KPL 版本 (最高可達版本 0.14.0) 都支援 Windows Server 2008 和更新版本。  
從 KPL 版本 0.14.0 或更高版本開始，不支援 Windows 平台。

請注意，KPL 只有 64 位元版本。

## 來源碼
<a name="kinesis-kpl-supported-plats-source-code"></a>

如果 KPL 安裝所提供的二進位檔未能滿足您的環境，KPL 的核心已編寫成 C\$1\$1 模組。C\$1\$1 模組的原始碼和 Java 界面是根據 Amazon Public License 發行，可在 GitHub 上的 [Amazon Kinesis Producer Library ](https://github.com/awslabs/amazon-kinesis-producer)取得。儘管 KPL 可在已有符合標準的最新版 C\$1\$1 編譯器和 JRE 的任何平台上使用，Amazon 仍未正式支援未列於支援的平台清單中的任何平台。

# KPL 關鍵概念
<a name="kinesis-kpl-concepts"></a>

下列各節包含了解和受益於 Amazon Kinesis Producer Library (KPL) 的必要概念和術語。

**Topics**
+ [記錄](#kinesis-kpl-concepts-records)
+ [批次處理](#kinesis-kpl-concepts-batching)
+ [聚合](#kinesis-kpl-concepts-aggretation)
+ [收集](#kinesis-kpl-concepts-collection)

## 記錄
<a name="kinesis-kpl-concepts-records"></a>

本指南區分了 *KPL 使用者記錄*和 *Kinesis Data Streams 記錄*。當使用不帶限定詞的*記錄*一詞時，指的就是 *KPL 使用者記錄*。當我們參照 Kinesis Data Streams 記錄時，我們明確地說 *Kinesis Data Streams 記錄*。

KPL 使用者記錄是對使用者具有特別含意的資料 Blob。例子包括代表網站 UI 事件或 web 伺服器某個日誌項目的 JSON Blob。

Kinesis Data Streams 記錄是 Kinesis 資料串流服務 API 所定義之 `Record` 資料結構的執行個體。此種記錄包含分割區索引鍵、序號和資料 Blob。

## 批次處理
<a name="kinesis-kpl-concepts-batching"></a>

*批次處理*是指對多個項目執行單一動作，而不是對每個單獨項目重複執行該動作。

就此處而言，「項目」是一筆記錄，「動作」則是將記錄傳送至 Kinesis Data Streams。在非批次處理的情況下，您會將每筆記錄放入單獨的 Kinesis Data Streams 記錄中，然後發出一次 HTTP 請求將其傳送至 Kinesis Data Streams。透過批次處理，每次 HTTP 請求可攜帶多筆記錄而不只有一筆。

KPL 支援兩種批次處理方式：
+ *彙整* – 將多筆記錄存放於單筆 Kinesis Data Streams 記錄中。
+ *收集* – 使用 API `PutRecords` 操作，將多筆 Kinesis Data Streams 記錄傳送至 Kinesis 資料串流中的一個或多個碎片。

KPL 兩種批次處理方式的設計彼此共存並可單獨啟用或停用。預設情況下，兩者將一併啟用。

## 聚合
<a name="kinesis-kpl-concepts-aggretation"></a>

*彙整*是指將多筆記錄儲存於單筆 Kinesis Data Streams 記錄中。彙整使得客戶能夠增加每次 API 呼叫所傳送的記錄筆數，進而有效地提高生產者傳輸量。

Kinesis Data Streams 碎片支援每秒多達 1,000 筆 Kinesis Data Streams 記錄或 1 MB 輸送量。Kinesis Data Streams 每秒記錄筆數限制綁定記錄在 1 KB 以內的客戶。記錄彙整使得客戶能夠將多筆記錄合併為單筆 Kinesis Data Streams 記錄。客戶可藉此提高其每一碎片傳輸量。

假設區域 us-east-1 中有一個碎片目前以每秒 1，000 個記錄的固定速率執行，且每個記錄為 512 個位元組。透過 KPL 彙整，您可以將這 1,000 筆記錄壓縮成只有 10 筆 Kinesis Data Streams 記錄，進而使 RPS 降至 10 (每次 50 KB)。

## 收集
<a name="kinesis-kpl-concepts-collection"></a>

*收集*是指批次處理多筆 Kinesis Data Streams 記錄並透過對 API 操作 `PutRecords`​ 呼叫的單次 HTTP 請求來傳送記錄，而非以各自的 HTTP 請求來傳送每個 ​Kinesis Data Streams 記錄。

這與不使用收集相比可提高傳輸量，因為其減少了發出多次個別 HTTP 請求的額外負擔。事實上，`PutRecords` 本身即是專為達到此目的而設計。

收集與彙整不同，其將處理 Kinesis Data Streams 記錄群組。正在收集的 Kinesis Data Streams 記錄仍可包含來自使用者的多筆記錄。兩者間的關係可用以下視覺化方式表達：

```
record 0 --|
record 1   |        [ Aggregation ]
    ...    |--> Amazon Kinesis record 0 --|
    ...    |                              |
record A --|                              |
                                          |
    ...                   ...             |
                                          |
record K --|                              |
record L   |                              |      [ Collection ]
    ...    |--> Amazon Kinesis record C --|--> PutRecords Request
    ...    |                              |
record S --|                              |
                                          |
    ...                   ...             |
                                          |
record AA--|                              |
record BB  |                              |
    ...    |--> Amazon Kinesis record M --|
    ...    |
record ZZ--|
```

# 整合 KPL 與生產者程式碼
<a name="kinesis-kpl-integration"></a>

Amazon Kinesis Producer Library (KPL) 會以個別程序執行，並使用 IPC 與您的父使用者程序通訊。此架構有時稱為[微服務](http://en.wikipedia.org/wiki/Microservices)，選用的主要原因有二：

**1) 即使 KPL 當機也不會造成您的使用者程序當機**  
您的程序可執行與 Kinesis Producer Library (KPL) 無關的任務，而且就算 KPL 當機也仍能繼續操作。您的父使用者程序也可重新啟動 KPL 並恢復到完全運作狀態 (此功能由官方包裝函式提供)。

例如，若 web 伺服器傳送指標至 Kinesis Data Streams，就算 Kinesis Data Streams 部分已停止運作，伺服器仍能繼續處理頁面。整部伺服器會由於 KPL 中存在錯誤而當機，以致造成不必要的停機。

**2) 任意用戶端均可支援**  
客戶當中肯定會有人使用非正式支援的語言。這類客戶也應能輕鬆使用 KPL。

## 建議使用矩陣
<a name="kinesis-kpl-integration-usage"></a>

下列用量矩陣會列出不同使用者的建議設定，並建議您是否及如何使用 KPL。請切記，如果啟用彙整，則必須連帶使用取消彙整以便在消費者端擷取記錄。


| 生產者端語言 | 消費者端語言 | KCL 版本 | 檢查點邏輯 | 能否使用 KPL？ | 警告 | 
| --- | --- | --- | --- | --- | --- | 
| Java 除外的任何語言 | \$1 | \$1 | \$1 | 否 | N/A | 
| Java | Java | 直接使用 Java 開發套件 | N/A | 是 | 如果使用彙整，則呼叫 GetRecords 後必須使用現成提供的取消彙整程式庫。 | 
| Java | Java 除外的任何語言 | 直接使用軟體開發套件 | N/A | 是 | 必須停用彙整。 | 
| Java | Java | 1.3.x | N/A | 是 | 必須停用彙整。 | 
| Java | Java  | 1.4.x | 不帶任何引數呼叫檢查點 | 是 | 無 | 
| Java | Java | 1.4.x | 使用顯式序號呼叫檢查點 | 是 | 停用彙整，或將程式碼改為使用擴展序號執行檢查點作業。 | 
| Java | Java 除外的任何語言  | 1.3.x \$1 多語言協助程式 \$1 特定語言包裝函式 | N/A | 是 | 必須停用彙整。 | 

# 使用 KPL 寫入 Kinesis 資料串流
<a name="kinesis-kpl-writing"></a>

下列各節顯示從最基本生產者到完全非同步程式碼的進度中的範例程式碼。

## Barebones 生產者程式碼
<a name="kinesis-kpl-writing-code"></a>

以下是撰寫最低限度能夠運作的生產者所需的全部程式碼。Amazon Kinesis Producer Library (KPL) 使用者記錄會在背景處理。

```
// KinesisProducer gets credentials automatically like 
// DefaultAWSCredentialsProviderChain. 
// It also gets region automatically from the EC2 metadata service. 
KinesisProducer kinesis = new KinesisProducer();  
// Put some records 
for (int i = 0; i < 100; ++i) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
    // doesn't block       
    kinesis.addUserRecord("myStream", "myPartitionKey", data); 
}  
// Do other stuff ...
```

## 同步回應結果
<a name="kinesis-kpl-writing-synchronous"></a>

前述範例的程式碼並未檢查 KPL 使用者記錄是否成功。KPL 會就失敗狀況執行任何必要的重試。然而若您想要檢查結果，則可使用 `Future` 所傳回的 `addUserRecord` 物件進行檢查，如以下範例所示 (另顯示前述範例以供對照)：

```
KinesisProducer kinesis = new KinesisProducer();  

// Put some records and save the Futures 
List<Future<UserRecordResult>> putFutures = new LinkedList<Future<UserRecordResult>>(); 
for (int i = 0; i < 100; i++) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
    // doesn't block 
    putFutures.add(
        kinesis.addUserRecord("myStream", "myPartitionKey", data)); 
}  

// Wait for puts to finish and check the results 
for (Future<UserRecordResult> f : putFutures) {
    UserRecordResult result = f.get(); // this does block     
    if (result.isSuccessful()) {         
        System.out.println("Put record into shard " + 
                            result.getShardId());     
    } else {
        for (Attempt attempt : result.getAttempts()) {
            // Analyze and respond to the failure         
        }
    }
}
```

## 非同步回應結果
<a name="kinesis-kpl-writing-asynchronous"></a>

先前的範例是在`Future`物件`get()`上呼叫 ，這會封鎖執行時間。如果您不想封鎖執行時間，可以使用非同步回呼，如下列範例所示：

```
KinesisProducer kinesis = new KinesisProducer();

FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() {     
    @Override public void onFailure(Throwable t) {
        /* Analyze and respond to the failure  */ 
    };     
    @Override public void onSuccess(UserRecordResult result) { 
        /* Respond to the success */ 
    };
};

for (int i = 0; i < 100; ++i) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));      
    ListenableFuture<UserRecordResult> f = kinesis.addUserRecord("myStream", "myPartitionKey", data);     
    // If the Future is complete by the time we call addCallback, the callback will be invoked immediately.
    Futures.addCallback(f, myCallback); 
}
```

# 設定 Amazon Kinesis Producer Library
<a name="kinesis-kpl-config"></a>

儘管預設的設定應能適用於大多數使用案例，但您也許想要變更某些預設值，以依照您的需求量身打造 `KinesisProducer` 的行為。為此，您可以將 `KinesisProducerConfiguration` 類別的執行個體傳遞給 `KinesisProducer` 建構函數，例如：

```
KinesisProducerConfiguration config = new KinesisProducerConfiguration()
        .setRecordMaxBufferedTime(3000)
        .setMaxConnections(1)
        .setRequestTimeout(60000)
        .setRegion("us-west-1");
        
final KinesisProducer kinesisProducer = new KinesisProducer(config);
```

您也可以從屬性檔案載入組態：

```
KinesisProducerConfiguration config = KinesisProducerConfiguration.fromPropertiesFile("default_config.properties");
```

您可以替換使用者程序能夠存取的任何路徑和檔案名稱。此外，您亦可對以此方式建立的 `KinesisProducerConfiguration` 執行個體呼叫 set 方法以自訂組態。

屬性檔案指定參數時應使用各參數的帕斯卡命名法名稱。這類名稱與 `KinesisProducerConfiguration` 類別的 set 方法中所使用的名稱相吻。例如：

```
RecordMaxBufferedTime = 100
MaxConnections = 4
RequestTimeout = 6000
Region = us-west-1
```

如需組態參數用法規則及數值限制的詳細資訊，請參閱 [GitHub 上的範例組態屬性](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties)。

請注意，一旦 `KinesisProducer` 初始化，變更使用中的 `KinesisProducerConfiguration` 執行個體將不會再有任何作用。`KinesisProducer`​ 目前不支援動態重新配置。

# 實作消費者取消彙總
<a name="kinesis-kpl-consumer-deaggregation"></a>

自發行版本 1.4.0 起，KCL 支援自動取消彙整 KPL 使用者記錄。在您更新 KCL 後，使用舊版 KCL 撰寫的取用者應用程式將編譯程式碼而不會進行任何修改。不過，如果生產者端使用了 KPL 彙整，則有一項與檢查點作業相關的細微之處須留意：彙整的記錄中所有的子記錄具有相同的序號，所以若您需要區別子記錄，就必須隨檢查點存放額外的資料。這些額外的資料稱為*子序號*。

**Topics**
+ [從舊版 KCL 遷移](#kinesis-kpl-consumer-deaggregation-migration)
+ [使用 KPL 去彙總的 KCL 擴充功能](#kinesis-kpl-consumer-deaggregation-extensions)
+ [直接使用 GetRecords](#kinesis-kpl-consumer-deaggregation-getrecords)

## 從舊版 KCL 遷移
<a name="kinesis-kpl-consumer-deaggregation-migration"></a>

您不需要變更現有的呼叫，即可使用彙總執行檢查點。保證您仍能成功擷取存放於 Kinesis Data Streams 的所有記錄。KCL 現在提供兩個新的檢查點操作，以支援特定使用案例，如下所述。

如果您在 KPL 支援之前為 KCL 編寫現有程式碼，而且您的檢查點操作在沒有引數的情況下呼叫，則相當於對批次中最後一個 KPL 使用者記錄的序號進行檢查點。如果使用序號字串呼叫檢查點操作，則等同於對批次的指定序號及隱含的子序號 0 (零) 執行檢查點作業。

呼叫新的 KCL 檢查點操作而不帶任何引數如 `checkpoint()` 語意上等同於對批次中其上次 `Record` 呼叫的序號及隱含的子序號 0 (零) 執行檢查點作業。

呼叫新的 KCL 檢查點操作如 `checkpoint(Record record)` 語意上等同於對指定 `Record` 的序號及隱含的子序號 0 (零) 執行檢查點作業。若 `Record` 呼叫實際為 `UserRecord`，則會對 `UserRecord` 序號和子序號執行檢查點作業。

呼叫新的 KCL 檢查點操作如 `checkpoint(String sequenceNumber, long subSequenceNumber)` 會對指定的序號及指定的子序號執行明示檢查點作業。

上述任何情況下，當檢查點已存放於 Amazon DynamoDB 檢查點資料表之後，KCL 便能正確地恢復擷取記錄，就算應用程式當機並重新啟動也沒問題。如果序列中包含多筆記錄，則會從序號最近執行過檢查點作業的記錄中為下一個子序號的記錄開始擷取。若最近的檢查點包括前一序號記錄的最新子序號，將從下一個序號的記錄開始擷取。

下一節討論了消費者的序列和子序列檢查點的詳細資訊，必須避免略過和重複記錄。若停止並重新啟動消費者的記錄處理會略過 (或重複) 記錄無關緊要，您就可以執行現有的程式碼而無須修改。

## 使用 KPL 去彙總的 KCL 擴充功能
<a name="kinesis-kpl-consumer-deaggregation-extensions"></a>

KPL 去彙總可能涉及子序列檢查點。為方便使用子序列檢查點作業，KCL 增加了 `UserRecord` 類別：

```
public class UserRecord extends Record {     
    public long getSubSequenceNumber() {
    /* ... */
    }      
    @Override 
    public int hashCode() {
    /* contract-satisfying implementation */ 
    }      
    @Override 
    public boolean equals(Object obj) {
    /* contract-satisfying implementation */ 
    } 
}
```

現已使用此類別代替 `Record`。這不會破壞現有的程式碼，因為其為 `Record` 的子類別。`UserRecord` 類別同時代表實際的子記錄和未彙整的標準記錄。未彙整的記錄可想像成恰有一筆子記錄的已彙整記錄。

此外，`IRecordProcessorCheckpointer` 也增加了兩項新的操作：

```
public void checkpoint(Record record); 
public void checkpoint(String sequenceNumber, long subSequenceNumber);
```

若要開始使用子序號檢查點作業，您可進行以下轉換。更改以下形式的程式碼：

```
checkpointer.checkpoint(record.getSequenceNumber());
```

新形式的程式碼：

```
checkpointer.checkpoint(record);
```

建議您使用 `checkpoint(Record record)` 形式執行子序列檢查點作業。不過，若您已將 `sequenceNumbers` 存放在字串中用於檢查點作業，則現在亦應存放 `subSequenceNumber`，如以下範例所示：

```
String sequenceNumber = record.getSequenceNumber(); 
long subSequenceNumber = ((UserRecord) record).getSubSequenceNumber();  // ... do other processing  
checkpointer.checkpoint(sequenceNumber, subSequenceNumber);
```

從 轉換為`UserRecord` `Record`一律會成功，因為實作一律使用 `UserRecord`。除非需要對序號進行算術運算，否則這種方式並不建議。

處理 KPL 使用者記錄時，KCL 會將子序號寫入 Amazon DynamoDB 成為每一列的額外欄位。舊版 KCL 是使用 `AFTER_SEQUENCE_NUMBER` 在恢復檢查點作業時擷取記錄。目前有 KPL 支援的 KCL 則改用 `AT_SEQUENCE_NUMBER`。在擷取已對序號執行過檢查點作業的記錄時，會檢查已執行檢查點作業的子序號，且將視需要刪除子記錄 (若已對最後一筆子記錄執行檢查點作業，則可能全部刪除)。同樣地，未彙整的記錄可想像成只有一筆子記錄的已彙整記錄，所以同一套演算法對已彙整和未彙整的記錄都適用。

## 直接使用 GetRecords
<a name="kinesis-kpl-consumer-deaggregation-getrecords"></a>

您也可以選擇不使用 KCL 而是直接調用 API 操作 `GetRecords` 來擷取 Kinesis Data Streams 記錄。若要將這些擷取到的記錄解壓縮為原始 KPL 使用者記錄，請呼叫 `UserRecord.java` 的以下任一項靜態操作：

```
public static List<Record> deaggregate(List<Record> records)

public static List<UserRecord> deaggregate(List<UserRecord> records, BigInteger startingHashKey, BigInteger endingHashKey)
```

第一項操作使用 `0` 的預設值 `startingHashKey` (零) 以及 `2^128 -1` 的預設值 `endingHashKey`。

上述每一項操作都將取消彙總指定的 Kinesis Data Streams 記錄清單，轉成 KPL 使用者記錄清單。傳回的記錄清單將會捨棄顯式雜湊索引鍵或分割區索引鍵落在 `startingHashKey` (含) 以及 `endingHashKey` (含) 範圍外的任何 KPL 使用者記錄。

# 搭配 Amazon Data Firehose 使用 KPL
<a name="kpl-with-firehose"></a>

如果您使用 Kinesis Producer Library (KPL) 來寫入資料到 ​Kinesis 資料串流，則可使用彙整來合併您寫入至該 Kinesis 資料串流的記錄。如果您接著使用該資料串流做為 Firehose 交付串流的來源，Firehose 會先取消彙總記錄，再將記錄交付至目的地。如果您將交付串流設定為轉換資料，Firehose 會先取消彙總記錄，再將其交付至其中 AWS Lambda。如需更多資訊，請參閱[使用 Kinesis Data Streams 寫入至 Amazon Kinesis Data Firehose](https://docs.aws.amazon.com/firehose/latest/dev/writing-with-kinesis-streams.html)。

# 使用 KPL 搭配 AWS Glue 結構描述登錄檔
<a name="kpl-with-schemaregistry"></a>

您可以將 Kinesis 資料串流與 AWS Glue 結構描述登錄檔整合。 AWS Glue 結構描述登錄檔可讓您集中探索、控制和發展結構描述，同時確保已註冊結構描述持續驗證產生的資料。結構描述定義資料記錄的結構和格式。結構描述是可靠的資料發佈、耗用或儲存的版本化規格。 AWS Glue 結構描述登錄檔可讓您改善串流應用程式中end-to-end資料品質和資料控管。如需詳細資訊，請參閱 [AWS Glue 結構描述登錄檔](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)。設定此整合的方法之一是透過 KPL 和 Java 中的 Kinesis Client Library (KCL) 程式庫。

**重要**  
目前，Kinesis Data Streams AWS Glue 和結構描述登錄整合僅支援使用在 Java 中實作的 KPL 生產者的 Kinesis 資料串流。不提供多語言支援。

如需如何使用 KPL 設定 Kinesis Data Streams 與結構描述登錄檔整合的詳細說明，請參閱「使用 KPL/KCL 程式庫與資料互動」一節[：將 Amazon Kinesis Data Streams 與 AWS Glue 結構描述登錄檔整合](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds)。

# 設定 KPL 代理組態
<a name="kpl-proxy-configuration"></a>

對於無法直接連線至網際網路的應用程式，所有 AWS SDK 用戶端都支援使用 HTTP 或 HTTPS 代理。在一般的企業環境中，所有輸出網路流量都必須經過代理伺服器。如果您的應用程式使用 Kinesis Producer Library (KPL) 在使用代理伺服器 AWS 的環境中收集和傳送資料至 ，您的應用程式將需要 KPL 代理組態。KPL 是建置在 AWS Kinesis SDK 上的高階程式庫。它被分成一個原生處理程序和一個包裝函式。原生處理程序會執行處理和傳送記錄的所有作業，而包裝函式則會管理原生處理序並與其通訊。如需詳細資訊，請參閱[使用 Amazon Kinesis Producer Library 實作高效且可靠的生產者](https://aws.amazon.com/blogs/big-data/implementing-efficient-and-reliable-producers-with-the-amazon-kinesis-producer-library/)。

包裝函式是以 Java 撰寫的，而原生處理程序是使用 Kinesis SDK 以 C\$1\$1 撰寫的。KPL 版本 0.14.7 及更高版本現在支援 Java 包裝器中的代理組態，該包裝器可以將所有代理組態傳遞給原生處理程序。如需詳細資訊，請參閱 [https://github.com/awslabs/amazon-kinesis-producer/releases/tag/v0.14.7](https://github.com/awslabs/amazon-kinesis-producer/releases/tag/v0.14.7)。

您可以使用以下程式碼將代理組態新增到您的 KPL 應用程式。

```
KinesisProducerConfiguration configuration = new KinesisProducerConfiguration();
// Next 4 lines used to configure proxy 
configuration.setProxyHost("10.0.0.0"); // required
configuration.setProxyPort(3128); // default port is set to 443
configuration.setProxyUserName("username"); // no default 
configuration.setProxyPassword("password"); // no default

KinesisProducer kinesisProducer = new KinesisProducer(configuration);
```

# KPL 版本生命週期政策
<a name="kpl-version-lifecycle-policy"></a>

本主題概述 Amazon Kinesis Producer Library (KPL) 的版本生命週期政策。 會 AWS 定期提供 KPL 版本的新版本，以支援新功能和增強功能、錯誤修正、安全修補程式和相依性更新。我們建議您隨時掌握 KPL up-to-date以掌握最新功能、安全性更新和基礎相依性。**我們不**建議繼續使用不支援的 KPL 版本。

主要 KPL 版本的生命週期包含下列三個階段：
+ **一般可用性 (GA)** – 在此階段，完全支援主要版本。 AWS 提供一般次要和修補程式版本，包括對 Kinesis Data Streams 新功能或 API 更新的支援，以及錯誤和安全性修正。
+ **維護模式** – AWS 限制修補程式版本，以僅解決重大錯誤修正和安全問題。主要版本不會收到 Kinesis Data Streams 新功能或 APIs 的更新。
+ **End-of-support** – 主要版本將不再收到更新或版本。先前發佈的版本將繼續透過公有套件管理員提供，且程式碼將保留在 GitHub 上。使用者可自行決定是否使用已end-of-support版本。我們建議您升級至最新的主要版本。


| 主要版本 | 目前階段 | 版本日期 | 維護模式日期 | End-of-support日期 | 
| --- | --- | --- | --- | --- | 
| KPL 0.x | 維護模式 | 2015-06-02 | 2025-04-17 | 2026-01-30 | 
| KPL 1.x | 一般可用性 | 2024-12-15 | -- | -- | 