

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 將資料寫入 Amazon Kinesis Data Streams
<a name="building-producers"></a>

*生產者*是將資料寫入 Amazon Kinesis Data Streams 的應用程式。您可以使用 適用於 Java 的 AWS SDK 和 Kinesis Producer Library (KPL) 為 Kinesis Data Streams 建置生產者。

如果您是 Kinesis Data Streams 的新手，請先熟悉一下 [什麼是 Amazon Kinesis Data Streams？](introduction.md) 及 [使用 AWS CLI 執行 Amazon Kinesis Data Streams 操作](getting-started.md) 所介紹的概念和術語。

**重要**  
Kinesis Data Streams 支援對資料串流變更資料記錄保留期間。如需詳細資訊，請參閱[變更資料保留期間](kinesis-extended-retention.md)。

若要將資料放入串流，您必須指定串流的名稱、分割區索引鍵以及要加入至串流的資料 Blob。分割區索引鍵用於決定串流中將要加入資料記錄的碎片。

碎片中的所有資料會傳送給負責處理碎片的同一個工作者。您應使用哪個分割區索引鍵取決於您的應用程式邏輯。通常，分割區索引鍵數目應該要比碎片數目大得多。這是因為分割區索引鍵將用於決定資料記錄如何對應到特定碎片。如果分割區索引鍵數目夠多，資料就能均勻地分佈於串流中的各個碎片。

**Topics**
+ [使用 Amazon Kinesis Producer Library (KPL) 開發生產者](developing-producers-with-kpl.md)
+ [使用 Amazon Kinesis Data Streams API 搭配 開發生產者 適用於 Java 的 AWS SDK](developing-producers-with-sdk.md)
+ [使用 Kinesis Agent 寫入 Amazon Kinesis Data Streams](writing-with-agents.md)
+ [使用其他服務寫入 Kinesis Data Streams AWS](using-other-services.md)
+ [使用第三方整合寫入 Kinesis Data Streams](using-other-services-third-party.md)
+ [Amazon Kinesis Data Streams 生產者疑難排解](troubleshooting-producers.md)
+ [最佳化 Kinesis Data Streams 生產者](advanced-producers.md)

# 使用 Amazon Kinesis Producer Library (KPL) 開發生產者
<a name="developing-producers-with-kpl"></a>

Amazon Kinesis Data Streams 生產者是將使用者資料記錄放入 Kinesis 資料串流 (又稱為*資料擷取*) 的應用程式。Amazon Kinesis Producer Library (KPL) 可簡化生產者應用程式開發，讓開發人員達到 Kinesis 資料串流的高寫入輸送量。

您可以使用 Amazon CloudWatch 對 KPL 進行監控。如需詳細資訊，請參閱[使用 Amazon CloudWatch 監控 Kinesis Producer Library](monitoring-with-kpl.md)。

**Topics**
+ [檢閱 KPL 的角色](#developing-producers-with-kpl-role)
+ [了解使用 KPL 的優點](#developing-producers-with-kpl-advantage)
+ [了解何時不使用 KPL](#developing-producers-with-kpl-when)
+ [安裝 KPL](kinesis-kpl-dl-install.md)
+ [從 KPL 0.x 遷移至 KPL 1.x](kpl-migration-1x.md)
+ [轉換至 KPL 的 Amazon Trust Services (ATS) 憑證](kinesis-kpl-upgrades.md)
+ [KPL 支援的平台](kinesis-kpl-supported-plats.md)
+ [KPL 關鍵概念](kinesis-kpl-concepts.md)
+ [整合 KPL 與生產者程式碼](kinesis-kpl-integration.md)
+ [使用 KPL 寫入 Kinesis 資料串流](kinesis-kpl-writing.md)
+ [設定 Amazon Kinesis Producer Library](kinesis-kpl-config.md)
+ [實作消費者取消彙總](kinesis-kpl-consumer-deaggregation.md)
+ [搭配 Amazon Data Firehose 使用 KPL](kpl-with-firehose.md)
+ [使用 KPL 搭配 AWS Glue 結構描述登錄檔](kpl-with-schemaregistry.md)
+ [設定 KPL 代理組態](kpl-proxy-configuration.md)
+ [KPL 版本生命週期政策](kpl-version-lifecycle-policy.md)

**注意**  
建議您升級至最新 KPL 版本。KPL 會定期更新為更新的版本，其中包括最新的相依性和安全性修補程式、錯誤修正，以及向後相容的新功能。如需詳細資訊，請參閱 [https://github.com/awslabs/amazon-kinesis-producer/releases/](https://github.com/awslabs/amazon-kinesis-producer/releases/)。

## 檢閱 KPL 的角色
<a name="developing-producers-with-kpl-role"></a>

KPL 是一套具高度可設定性的易用程式庫，能協助您對 Kinesis 資料串流進行寫入。此程式庫在您的生產者應用程式的程式碼與 Kinesis Data Streams API 動作之間擔任媒介。KPL 將執行以下主要任務：
+ 利用可設定的自動重試機制對一個或多個 Kinesis 資料串流進行寫入
+ 收集記錄並於每次請求時使用 `PutRecords` 將多筆記錄寫入多個碎片
+ 彙整使用者記錄以增加承載大小並提高傳輸量
+ 與 [Kinesis Client Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) (KCL) 無縫整合以取消彙整取用者上的批次記錄
+ 代表您提交 Amazon CloudWatch 指標以提供關於生產者效能的資訊

請注意，KPL 與 [AWS SDK](https://aws.amazon.com/tools/) 所提供的 Kinesis Data Streams API 不同。Kinesis Data Streams API 可協助您管理 Kinesis Data Streams 的許多層面 (包括建立串流、重新分片、放入與取得記錄)，而 KPL 提供專用於擷取資料的抽象層。如需 Kinesis Data Streams API 的相關資訊，請參閱 [Amazon Kinesis API 參考](https://docs.aws.amazon.com/kinesis/latest/APIReference/)。

## 了解使用 KPL 的優點
<a name="developing-producers-with-kpl-advantage"></a>

以下清單舉出使用 KPL 開發 Kinesis Data Streams 生產者的一些主要優點。

KPL 可用於同步或非同步使用案例。建議您使用具有較高效能的非同步界面，除非有具體的原因需要使用同步操作。如需以上兩種使用案例的詳細資訊及範例程式碼，請參閱[使用 KPL 寫入 Kinesis 資料串流](kinesis-kpl-writing.md)。

 **效能優勢**   
KPL 可協助建置高效能的生產者。試想以下情況：您的 Amazon EC2 執行個體擔任代理，從數以百計或千計的低功率裝置收集 100 位元組的事件並將記錄寫入 Kinesis 資料串流。這些 EC2 執行個體每秒均須將成千個事件寫入您的資料串流。為達到所需的傳輸量，生產者必須實作複雜的邏輯 (如批次處理或多執行緒) 和重試邏輯並在消費者端取消彙整記錄。KPL 將為您執行所有這些任務。

 **消費者端易用性**   
對於使用 KCL 的 Java 取用者端開發人員來說，KPL 整合毫不費力。當 KCL 擷取含有多筆 KPL 使用者記錄的彙整 Kinesis Data Streams 記錄時，會自動叫用 KPL 先擷取個別的使用者記錄，然後再將其傳回給使用者。  
取用者端開發人員若未使用 KCL 而是直接使用 API `GetRecords` 操作，則可利用 KPL Java 程式庫擷取個別的使用者記錄後再將其傳回給使用者。

 **生產者監控**   
您可使用 Amazon CloudWatch 和 KPL 收集、監控及分析您的 Kinesis Data Streams 生產者。KPL 將代表您向 CloudWatch 發出輸送量指標、錯誤指標及其他指標，並可設定成進行串流層級、碎片層級或生產者層級監控。

 **非同步架構**   
由於 KPL 可能會在將記錄傳送到 Kinesis Data Streams 之前緩衝記錄，因此不會強制發起人應用程式封鎖並等待確認記錄已送達伺服器，然後再繼續執行時間。將記錄放入 KPL 的呼叫一律立即傳回，不會等待傳送記錄或接收伺服器的回應。反而，其將建立 `Future` 物件用以稍後接收傳送記錄至 Kinesis Data Streams 的結果。這與 AWS SDK 中的非同步用戶端的行為相同。

## 了解何時不使用 KPL
<a name="developing-producers-with-kpl-when"></a>

KPL 可能引發程式庫中額外的處理延遲，最高達到 `RecordMaxBufferedTime` (使用者可設定)。`RecordMaxBufferedTime` 的值愈大，壓縮效率以及效能愈高。無法容忍此額外延遲的應用程式可能需要直接使用 AWS SDK。如需搭配 Kinesis Data Streams 使用 AWS SDK 的詳細資訊，請參閱 [使用 Amazon Kinesis Data Streams API 搭配 開發生產者 適用於 Java 的 AWS SDK](developing-producers-with-sdk.md)。如需 `RecordMaxBufferedTime` 以及 KPL 可供使用者設定的其他各項屬性的詳細資訊，請參閱 [設定 Amazon Kinesis Producer Library](kinesis-kpl-config.md)。

# 安裝 KPL
<a name="kinesis-kpl-dl-install"></a>

Amazon 為 macOS、Windows 和最近的 Linux 發行版本提供預先建置的 C\$1\$1 Amazon Kinesis Producer Library (KPL) 二進位檔 （如需支援的平台詳細資訊，請參閱下一節）。這些二進位檔封裝於 Java .jar 檔案中，若您使用 Maven 安裝該套件，即會自動叫用並予使用。如要尋找最新版的 KPL 和 KCL，請使用以下 Maven 搜尋連結：
+ [KPL](https://search.maven.org/#search|ga|1|amazon-kinesis-producer)
+ [KCL](https://search.maven.org/#search|ga|1|amazon-kinesis-client)

Linux 二進位檔已使用 GNU 編譯器套件 (GCC) 進行編譯並靜態連結到 Linux 上的 libstdc\$1\$1。這些二進位檔應能適用於任何附有 glibc 2.5 或更高版本的 64 位元 Linux 發行版本。

舊版 Linux 發行版本的使用者可以使用 GitHub 上與來源一起提供的建置說明來建置 KPL。若要從 GitHub 下載 KPL，請參閱 [Amazon Kinesis Producer Library](https://github.com/awslabs/amazon-kinesis-producer)。

**重要**  
Amazon Kinesis Producer Library (KPL) 0.x 將於 2026 年 1 月 30 日end-of-support。我們**強烈建議**您使用 0.x 版將 KPL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KPL 版本。若要尋找最新的 KPL 版本，請參閱 [Github 上的 KPL 頁面](https://github.com/awslabs/amazon-kinesis-producer)。如需從 KPL 0.x 遷移至 KPL 1.x 的資訊，請參閱 [從 KPL 0.x 遷移至 KPL 1.x](kpl-migration-1x.md)。

# 從 KPL 0.x 遷移至 KPL 1.x
<a name="kpl-migration-1x"></a>

本主題提供step-by-step說明，將您的消費者從 KPL 0.x 遷移至 KPL 1.x。KPL 1.x 推出對 適用於 Java 的 AWS SDK 2.x 的支援，同時保持與舊版的介面相容性。您不需要更新核心資料處理邏輯，即可遷移至 KPL 1.x。

1. **請確定您有下列先決條件：**
   + Java 開發套件 (JDK) 8 或更新版本
   + 適用於 Java 的 AWS SDK 2.x
   + Maven 或 Gradle 用於相依性管理

1. **新增相依性**

   如果您使用的是 Maven，請將下列相依性新增至 pom.xml 檔案。請務必將 groupId 從 更新`com.amazonaws`為 ，`software.amazon.kinesis`並將 版本更新`1.x.x`為最新的 KPL 版本。

   ```
   <dependency>
       <groupId>software.amazon.kinesis</groupId>
       <artifactId>amazon-kinesis-producer</artifactId>
       <version>1.x.x</version> <!-- Use the latest version -->
   </dependency>
   ```

   如果您使用的是 Gradle，請將以下內容新增至您的 `build.gradle` 檔案。請務必將 取代`1.x.x`為最新的 KPL 版本。

   ```
   implementation 'software.amazon.kinesis:amazon-kinesis-producer:1.x.x'
   ```

   您可以在 [Maven Central Repository](https://central.sonatype.com/search?q=amazon-kinesis-producer) 上檢查最新版本的 KPL。

1. **更新 KPL 的匯入陳述式**

   KPL 1.x 使用 適用於 Java 的 AWS SDK 2.x，並使用以 開頭的更新套件名稱`software.amazon.kinesis`，相較於以 開頭的先前 KPL 中的套件名稱`com.amazonaws.services.kinesis`。

   將 的匯入取代`com.amazonaws.services.kinesis`為 `software.amazon.kinesis`。下表列出您必須取代的匯入。  
**匯入替換項目**    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/streams/latest/dev/kpl-migration-1x.html)

1. **更新 AWS 登入資料提供者類別的匯入陳述式**

   遷移至 KPL 1.x 時，您必須在以 適用於 Java 的 AWS SDK 1.x 為基礎的 KPL 應用程式程式碼中，將匯入中的套件和類別更新為以 適用於 Java 的 AWS SDK 2.x 為基礎的對應套件和類別。KPL 應用程式中的常見匯入是登入資料提供者類別。如需[登入資料提供者變更](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-client-credentials.html)的完整清單，請參閱 適用於 Java 的 AWS SDK 2.x 遷移指南文件中的登入資料提供者變更。以下是您可能需要在 KPL 應用程式中進行的常見匯入變更。

   **在 KPL 0.x 中匯入**

   ```
   import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
   ```

   **在 KPL 1.x 中匯入**

   ```
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   ```

   如果您根據 適用於 Java 的 AWS SDK 1.x 匯入任何其他登入資料提供者，則必須將其更新為 適用於 Java 的 AWS SDK 2.x 同等登入資料提供者。如果您未從 適用於 Java 的 AWS SDK 1.x 匯入任何類別/套件，您可以忽略此步驟。

1. **在 KPL 組態中更新登入資料提供者組態**

   KPL 1.x 中的登入資料提供者組態需要 適用於 Java 的 AWS SDK 2.x 登入資料提供者。如果您要透過覆寫預設登入資料提供者`KinesisProducerConfiguration`，在 中傳遞 適用於 Java 的 AWS SDK 1.x 的登入資料提供者，您必須使用 適用於 Java 的 AWS SDK 2.x 登入資料提供者進行更新。如需[登入資料提供者變更](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-client-credentials.html)的完整清單，請參閱 適用於 Java 的 AWS SDK 2.x 遷移指南文件中的登入資料提供者變更。如果您沒有覆寫 KPL 組態中的預設登入資料提供者，您可以忽略此步驟。

   例如，如果您使用下列程式碼覆寫 KPL 的預設登入資料提供者：

   ```
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   // SDK v1 default credentials provider
   config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain());
   ```

   您必須使用下列程式碼更新它們，才能使用 適用於 Java 的 AWS SDK 2.x 登入資料提供者：

   ```
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   // New SDK v2 default credentials provider
   config.setCredentialsProvider(DefaultCredentialsProvider.create());
   ```

# 轉換至 KPL 的 Amazon Trust Services (ATS) 憑證
<a name="kinesis-kpl-upgrades"></a>

於 2018 年 2 月 9 日上午 9:00 (太平洋標準時間)，Amazon Kinesis Data Streams 已安裝 ATS 憑證。若要繼續使用 Amazon Kinesis Producer Library (KPL) 將記錄寫入 Kinesis Data Streams，您必須將 KPL 的安裝升級至 [0.12.6 版或更新版本。](http://search.maven.org/#artifactdetails|com.amazonaws|amazon-kinesis-producer|0.12.6|jar)此變更會影響所有 AWS 區域。

如需有關移至 ATS 的資訊，請參閱[如何準備 AWS移至其自己的憑證授權單位](https://aws.amazon.com/blogs/security/how-to-prepare-for-aws-move-to-its-own-certificate-authority/)。

若您遇到問題而需要技術支援，請透過 AWS Support 中心[建立案例](https://console.aws.amazon.com/support/v1#/case/create)。

# KPL 支援的平台
<a name="kinesis-kpl-supported-plats"></a>

Amazon Kinesis Producer Library (KPL) 是以 C\$1\$1 撰寫，並做為主要使用者程序的子程序執行。預先編譯的 64 位元原生二進位檔隨附於 Java 版本，由 Java 包裝函式管理。

Java 套件在以下作業系統執行無須安裝任何其他程式庫：
+ 核心為 2.6.18 (2006 年 9 月) 及更新版本的 Linux 發行版本
+ Apple iOS X 10.9 及更新版本
+ Windows Server 2008 及更新版本
**重要**  
所有 KPL 版本 (最高可達版本 0.14.0) 都支援 Windows Server 2008 和更新版本。  
從 KPL 版本 0.14.0 或更高版本開始，不支援 Windows 平台。

請注意，KPL 只有 64 位元版本。

## 來源碼
<a name="kinesis-kpl-supported-plats-source-code"></a>

如果 KPL 安裝所提供的二進位檔未能滿足您的環境，KPL 的核心已編寫成 C\$1\$1 模組。C\$1\$1 模組的原始碼和 Java 界面是根據 Amazon Public License 發行，可在 GitHub 上的 [Amazon Kinesis Producer Library ](https://github.com/awslabs/amazon-kinesis-producer)取得。儘管 KPL 可在已有符合標準的最新版 C\$1\$1 編譯器和 JRE 的任何平台上使用，Amazon 仍未正式支援未列於支援的平台清單中的任何平台。

# KPL 關鍵概念
<a name="kinesis-kpl-concepts"></a>

下列各節包含了解和受益於 Amazon Kinesis Producer Library (KPL) 的必要概念和術語。

**Topics**
+ [記錄](#kinesis-kpl-concepts-records)
+ [批次處理](#kinesis-kpl-concepts-batching)
+ [聚合](#kinesis-kpl-concepts-aggretation)
+ [收集](#kinesis-kpl-concepts-collection)

## 記錄
<a name="kinesis-kpl-concepts-records"></a>

本指南區分了 *KPL 使用者記錄*和 *Kinesis Data Streams 記錄*。當使用不帶限定詞的*記錄*一詞時，指的就是 *KPL 使用者記錄*。當我們參照 Kinesis Data Streams 記錄時，我們明確地說 *Kinesis Data Streams 記錄*。

KPL 使用者記錄是對使用者具有特別含意的資料 Blob。例子包括代表網站 UI 事件或 web 伺服器某個日誌項目的 JSON Blob。

Kinesis Data Streams 記錄是 Kinesis 資料串流服務 API 所定義之 `Record` 資料結構的執行個體。此種記錄包含分割區索引鍵、序號和資料 Blob。

## 批次處理
<a name="kinesis-kpl-concepts-batching"></a>

*批次處理*是指對多個項目執行單一動作，而不是對每個單獨項目重複執行該動作。

就此處而言，「項目」是一筆記錄，「動作」則是將記錄傳送至 Kinesis Data Streams。在非批次處理的情況下，您會將每筆記錄放入單獨的 Kinesis Data Streams 記錄中，然後發出一次 HTTP 請求將其傳送至 Kinesis Data Streams。透過批次處理，每次 HTTP 請求可攜帶多筆記錄而不只有一筆。

KPL 支援兩種批次處理方式：
+ *彙整* – 將多筆記錄存放於單筆 Kinesis Data Streams 記錄中。
+ *收集* – 使用 API `PutRecords` 操作，將多筆 Kinesis Data Streams 記錄傳送至 Kinesis 資料串流中的一個或多個碎片。

KPL 兩種批次處理方式的設計彼此共存並可單獨啟用或停用。預設情況下，兩者將一併啟用。

## 聚合
<a name="kinesis-kpl-concepts-aggretation"></a>

*彙整*是指將多筆記錄儲存於單筆 Kinesis Data Streams 記錄中。彙整使得客戶能夠增加每次 API 呼叫所傳送的記錄筆數，進而有效地提高生產者傳輸量。

Kinesis Data Streams 碎片支援每秒多達 1,000 筆 Kinesis Data Streams 記錄或 1 MB 輸送量。Kinesis Data Streams 每秒記錄筆數限制綁定記錄在 1 KB 以內的客戶。記錄彙整使得客戶能夠將多筆記錄合併為單筆 Kinesis Data Streams 記錄。客戶可藉此提高其每一碎片傳輸量。

假設區域 us-east-1 中有一個碎片目前以每秒 1，000 個記錄的固定速率執行，且每個記錄為 512 個位元組。透過 KPL 彙整，您可以將這 1,000 筆記錄壓縮成只有 10 筆 Kinesis Data Streams 記錄，進而使 RPS 降至 10 (每次 50 KB)。

## 收集
<a name="kinesis-kpl-concepts-collection"></a>

*收集*是指批次處理多筆 Kinesis Data Streams 記錄並透過對 API 操作 `PutRecords`​ 呼叫的單次 HTTP 請求來傳送記錄，而非以各自的 HTTP 請求來傳送每個 ​Kinesis Data Streams 記錄。

這與不使用收集相比可提高傳輸量，因為其減少了發出多次個別 HTTP 請求的額外負擔。事實上，`PutRecords` 本身即是專為達到此目的而設計。

收集與彙整不同，其將處理 Kinesis Data Streams 記錄群組。正在收集的 Kinesis Data Streams 記錄仍可包含來自使用者的多筆記錄。兩者間的關係可用以下視覺化方式表達：

```
record 0 --|
record 1   |        [ Aggregation ]
    ...    |--> Amazon Kinesis record 0 --|
    ...    |                              |
record A --|                              |
                                          |
    ...                   ...             |
                                          |
record K --|                              |
record L   |                              |      [ Collection ]
    ...    |--> Amazon Kinesis record C --|--> PutRecords Request
    ...    |                              |
record S --|                              |
                                          |
    ...                   ...             |
                                          |
record AA--|                              |
record BB  |                              |
    ...    |--> Amazon Kinesis record M --|
    ...    |
record ZZ--|
```

# 整合 KPL 與生產者程式碼
<a name="kinesis-kpl-integration"></a>

Amazon Kinesis Producer Library (KPL) 會以個別程序執行，並使用 IPC 與您的父使用者程序通訊。此架構有時稱為[微服務](http://en.wikipedia.org/wiki/Microservices)，選用的主要原因有二：

**1) 即使 KPL 當機也不會造成您的使用者程序當機**  
您的程序可執行與 Kinesis Producer Library (KPL) 無關的任務，而且就算 KPL 當機也仍能繼續操作。您的父使用者程序也可重新啟動 KPL 並恢復到完全運作狀態 (此功能由官方包裝函式提供)。

例如，若 web 伺服器傳送指標至 Kinesis Data Streams，就算 Kinesis Data Streams 部分已停止運作，伺服器仍能繼續處理頁面。整部伺服器會由於 KPL 中存在錯誤而當機，以致造成不必要的停機。

**2) 任意用戶端均可支援**  
客戶當中肯定會有人使用非正式支援的語言。這類客戶也應能輕鬆使用 KPL。

## 建議使用矩陣
<a name="kinesis-kpl-integration-usage"></a>

下列用量矩陣會列出不同使用者的建議設定，並建議您是否及如何使用 KPL。請切記，如果啟用彙整，則必須連帶使用取消彙整以便在消費者端擷取記錄。


| 生產者端語言 | 消費者端語言 | KCL 版本 | 檢查點邏輯 | 能否使用 KPL？ | 警告 | 
| --- | --- | --- | --- | --- | --- | 
| Java 除外的任何語言 | \$1 | \$1 | \$1 | 否 | N/A | 
| Java | Java | 直接使用 Java 開發套件 | N/A | 是 | 如果使用彙整，則呼叫 GetRecords 後必須使用現成提供的取消彙整程式庫。 | 
| Java | Java 除外的任何語言 | 直接使用軟體開發套件 | N/A | 是 | 必須停用彙整。 | 
| Java | Java | 1.3.x | N/A | 是 | 必須停用彙整。 | 
| Java | Java  | 1.4.x | 不帶任何引數呼叫檢查點 | 是 | 無 | 
| Java | Java | 1.4.x | 使用顯式序號呼叫檢查點 | 是 | 停用彙整，或將程式碼改為使用擴展序號執行檢查點作業。 | 
| Java | Java 除外的任何語言  | 1.3.x \$1 多語言協助程式 \$1 特定語言包裝函式 | N/A | 是 | 必須停用彙整。 | 

# 使用 KPL 寫入 Kinesis 資料串流
<a name="kinesis-kpl-writing"></a>

下列各節顯示從最基本生產者到完全非同步程式碼的進度中的範例程式碼。

## Barebones 生產者程式碼
<a name="kinesis-kpl-writing-code"></a>

以下是撰寫最低限度能夠運作的生產者所需的全部程式碼。Amazon Kinesis Producer Library (KPL) 使用者記錄會在背景處理。

```
// KinesisProducer gets credentials automatically like 
// DefaultAWSCredentialsProviderChain. 
// It also gets region automatically from the EC2 metadata service. 
KinesisProducer kinesis = new KinesisProducer();  
// Put some records 
for (int i = 0; i < 100; ++i) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
    // doesn't block       
    kinesis.addUserRecord("myStream", "myPartitionKey", data); 
}  
// Do other stuff ...
```

## 同步回應結果
<a name="kinesis-kpl-writing-synchronous"></a>

前述範例的程式碼並未檢查 KPL 使用者記錄是否成功。KPL 會就失敗狀況執行任何必要的重試。然而若您想要檢查結果，則可使用 `Future` 所傳回的 `addUserRecord` 物件進行檢查，如以下範例所示 (另顯示前述範例以供對照)：

```
KinesisProducer kinesis = new KinesisProducer();  

// Put some records and save the Futures 
List<Future<UserRecordResult>> putFutures = new LinkedList<Future<UserRecordResult>>(); 
for (int i = 0; i < 100; i++) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
    // doesn't block 
    putFutures.add(
        kinesis.addUserRecord("myStream", "myPartitionKey", data)); 
}  

// Wait for puts to finish and check the results 
for (Future<UserRecordResult> f : putFutures) {
    UserRecordResult result = f.get(); // this does block     
    if (result.isSuccessful()) {         
        System.out.println("Put record into shard " + 
                            result.getShardId());     
    } else {
        for (Attempt attempt : result.getAttempts()) {
            // Analyze and respond to the failure         
        }
    }
}
```

## 非同步回應結果
<a name="kinesis-kpl-writing-asynchronous"></a>

先前的範例是在`Future`物件`get()`上呼叫 ，這會封鎖執行時間。如果您不想封鎖執行時間，可以使用非同步回呼，如下列範例所示：

```
KinesisProducer kinesis = new KinesisProducer();

FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() {     
    @Override public void onFailure(Throwable t) {
        /* Analyze and respond to the failure  */ 
    };     
    @Override public void onSuccess(UserRecordResult result) { 
        /* Respond to the success */ 
    };
};

for (int i = 0; i < 100; ++i) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));      
    ListenableFuture<UserRecordResult> f = kinesis.addUserRecord("myStream", "myPartitionKey", data);     
    // If the Future is complete by the time we call addCallback, the callback will be invoked immediately.
    Futures.addCallback(f, myCallback); 
}
```

# 設定 Amazon Kinesis Producer Library
<a name="kinesis-kpl-config"></a>

儘管預設的設定應能適用於大多數使用案例，但您也許想要變更某些預設值，以依照您的需求量身打造 `KinesisProducer` 的行為。為此，您可以將 `KinesisProducerConfiguration` 類別的執行個體傳遞給 `KinesisProducer` 建構函數，例如：

```
KinesisProducerConfiguration config = new KinesisProducerConfiguration()
        .setRecordMaxBufferedTime(3000)
        .setMaxConnections(1)
        .setRequestTimeout(60000)
        .setRegion("us-west-1");
        
final KinesisProducer kinesisProducer = new KinesisProducer(config);
```

您也可以從屬性檔案載入組態：

```
KinesisProducerConfiguration config = KinesisProducerConfiguration.fromPropertiesFile("default_config.properties");
```

您可以替換使用者程序能夠存取的任何路徑和檔案名稱。此外，您亦可對以此方式建立的 `KinesisProducerConfiguration` 執行個體呼叫 set 方法以自訂組態。

屬性檔案指定參數時應使用各參數的帕斯卡命名法名稱。這類名稱與 `KinesisProducerConfiguration` 類別的 set 方法中所使用的名稱相吻。例如：

```
RecordMaxBufferedTime = 100
MaxConnections = 4
RequestTimeout = 6000
Region = us-west-1
```

如需組態參數用法規則及數值限制的詳細資訊，請參閱 [GitHub 上的範例組態屬性](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties)。

請注意，一旦 `KinesisProducer` 初始化，變更使用中的 `KinesisProducerConfiguration` 執行個體將不會再有任何作用。`KinesisProducer`​ 目前不支援動態重新配置。

# 實作消費者取消彙總
<a name="kinesis-kpl-consumer-deaggregation"></a>

自發行版本 1.4.0 起，KCL 支援自動取消彙整 KPL 使用者記錄。在您更新 KCL 後，使用舊版 KCL 撰寫的取用者應用程式將編譯程式碼而不會進行任何修改。不過，如果生產者端使用了 KPL 彙整，則有一項與檢查點作業相關的細微之處須留意：彙整的記錄中所有的子記錄具有相同的序號，所以若您需要區別子記錄，就必須隨檢查點存放額外的資料。這些額外的資料稱為*子序號*。

**Topics**
+ [從舊版 KCL 遷移](#kinesis-kpl-consumer-deaggregation-migration)
+ [使用 KPL 去彙總的 KCL 擴充功能](#kinesis-kpl-consumer-deaggregation-extensions)
+ [直接使用 GetRecords](#kinesis-kpl-consumer-deaggregation-getrecords)

## 從舊版 KCL 遷移
<a name="kinesis-kpl-consumer-deaggregation-migration"></a>

您不需要變更現有的呼叫，即可使用彙總執行檢查點。保證您仍能成功擷取存放於 Kinesis Data Streams 的所有記錄。KCL 現在提供兩個新的檢查點操作，以支援特定使用案例，如下所述。

如果您在 KPL 支援之前為 KCL 編寫現有程式碼，而且您的檢查點操作在沒有引數的情況下呼叫，則相當於對批次中最後一個 KPL 使用者記錄的序號進行檢查點。如果使用序號字串呼叫檢查點操作，則等同於對批次的指定序號及隱含的子序號 0 (零) 執行檢查點作業。

呼叫新的 KCL 檢查點操作而不帶任何引數如 `checkpoint()` 語意上等同於對批次中其上次 `Record` 呼叫的序號及隱含的子序號 0 (零) 執行檢查點作業。

呼叫新的 KCL 檢查點操作如 `checkpoint(Record record)` 語意上等同於對指定 `Record` 的序號及隱含的子序號 0 (零) 執行檢查點作業。若 `Record` 呼叫實際為 `UserRecord`，則會對 `UserRecord` 序號和子序號執行檢查點作業。

呼叫新的 KCL 檢查點操作如 `checkpoint(String sequenceNumber, long subSequenceNumber)` 會對指定的序號及指定的子序號執行明示檢查點作業。

上述任何情況下，當檢查點已存放於 Amazon DynamoDB 檢查點資料表之後，KCL 便能正確地恢復擷取記錄，就算應用程式當機並重新啟動也沒問題。如果序列中包含多筆記錄，則會從序號最近執行過檢查點作業的記錄中為下一個子序號的記錄開始擷取。若最近的檢查點包括前一序號記錄的最新子序號，將從下一個序號的記錄開始擷取。

下一節討論了消費者的序列和子序列檢查點的詳細資訊，必須避免略過和重複記錄。若停止並重新啟動消費者的記錄處理會略過 (或重複) 記錄無關緊要，您就可以執行現有的程式碼而無須修改。

## 使用 KPL 去彙總的 KCL 擴充功能
<a name="kinesis-kpl-consumer-deaggregation-extensions"></a>

KPL 去彙總可能涉及子序列檢查點。為方便使用子序列檢查點作業，KCL 增加了 `UserRecord` 類別：

```
public class UserRecord extends Record {     
    public long getSubSequenceNumber() {
    /* ... */
    }      
    @Override 
    public int hashCode() {
    /* contract-satisfying implementation */ 
    }      
    @Override 
    public boolean equals(Object obj) {
    /* contract-satisfying implementation */ 
    } 
}
```

現已使用此類別代替 `Record`。這不會破壞現有的程式碼，因為其為 `Record` 的子類別。`UserRecord` 類別同時代表實際的子記錄和未彙整的標準記錄。未彙整的記錄可想像成恰有一筆子記錄的已彙整記錄。

此外，`IRecordProcessorCheckpointer` 也增加了兩項新的操作：

```
public void checkpoint(Record record); 
public void checkpoint(String sequenceNumber, long subSequenceNumber);
```

若要開始使用子序號檢查點作業，您可進行以下轉換。更改以下形式的程式碼：

```
checkpointer.checkpoint(record.getSequenceNumber());
```

新形式的程式碼：

```
checkpointer.checkpoint(record);
```

建議您使用 `checkpoint(Record record)` 形式執行子序列檢查點作業。不過，若您已將 `sequenceNumbers` 存放在字串中用於檢查點作業，則現在亦應存放 `subSequenceNumber`，如以下範例所示：

```
String sequenceNumber = record.getSequenceNumber(); 
long subSequenceNumber = ((UserRecord) record).getSubSequenceNumber();  // ... do other processing  
checkpointer.checkpoint(sequenceNumber, subSequenceNumber);
```

從 轉換為`UserRecord` `Record`一律會成功，因為實作一律使用 `UserRecord`。除非需要對序號進行算術運算，否則這種方式並不建議。

處理 KPL 使用者記錄時，KCL 會將子序號寫入 Amazon DynamoDB 成為每一列的額外欄位。舊版 KCL 是使用 `AFTER_SEQUENCE_NUMBER` 在恢復檢查點作業時擷取記錄。目前有 KPL 支援的 KCL 則改用 `AT_SEQUENCE_NUMBER`。在擷取已對序號執行過檢查點作業的記錄時，會檢查已執行檢查點作業的子序號，且將視需要刪除子記錄 (若已對最後一筆子記錄執行檢查點作業，則可能全部刪除)。同樣地，未彙整的記錄可想像成只有一筆子記錄的已彙整記錄，所以同一套演算法對已彙整和未彙整的記錄都適用。

## 直接使用 GetRecords
<a name="kinesis-kpl-consumer-deaggregation-getrecords"></a>

您也可以選擇不使用 KCL 而是直接調用 API 操作 `GetRecords` 來擷取 Kinesis Data Streams 記錄。若要將這些擷取到的記錄解壓縮為原始 KPL 使用者記錄，請呼叫 `UserRecord.java` 的以下任一項靜態操作：

```
public static List<Record> deaggregate(List<Record> records)

public static List<UserRecord> deaggregate(List<UserRecord> records, BigInteger startingHashKey, BigInteger endingHashKey)
```

第一項操作使用 `0` 的預設值 `startingHashKey` (零) 以及 `2^128 -1` 的預設值 `endingHashKey`。

上述每一項操作都將取消彙總指定的 Kinesis Data Streams 記錄清單，轉成 KPL 使用者記錄清單。傳回的記錄清單將會捨棄顯式雜湊索引鍵或分割區索引鍵落在 `startingHashKey` (含) 以及 `endingHashKey` (含) 範圍外的任何 KPL 使用者記錄。

# 搭配 Amazon Data Firehose 使用 KPL
<a name="kpl-with-firehose"></a>

如果您使用 Kinesis Producer Library (KPL) 來寫入資料到 ​Kinesis 資料串流，則可使用彙整來合併您寫入至該 Kinesis 資料串流的記錄。如果您接著使用該資料串流做為 Firehose 交付串流的來源，Firehose 會先取消彙總記錄，再將記錄交付至目的地。如果您將交付串流設定為轉換資料，Firehose 會先取消彙總記錄，再將其交付至其中 AWS Lambda。如需更多資訊，請參閱[使用 Kinesis Data Streams 寫入至 Amazon Kinesis Data Firehose](https://docs.aws.amazon.com/firehose/latest/dev/writing-with-kinesis-streams.html)。

# 使用 KPL 搭配 AWS Glue 結構描述登錄檔
<a name="kpl-with-schemaregistry"></a>

您可以將 Kinesis 資料串流與 AWS Glue 結構描述登錄檔整合。 AWS Glue 結構描述登錄檔可讓您集中探索、控制和發展結構描述，同時確保已註冊結構描述持續驗證產生的資料。結構描述定義資料記錄的結構和格式。結構描述是可靠的資料發佈、耗用或儲存的版本化規格。 AWS Glue 結構描述登錄檔可讓您改善串流應用程式中end-to-end資料品質和資料控管。如需詳細資訊，請參閱 [AWS Glue 結構描述登錄檔](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)。設定此整合的方法之一是透過 KPL 和 Java 中的 Kinesis Client Library (KCL) 程式庫。

**重要**  
目前，Kinesis Data Streams AWS Glue 和結構描述登錄整合僅支援使用在 Java 中實作的 KPL 生產者的 Kinesis 資料串流。不提供多語言支援。

如需如何使用 KPL 設定 Kinesis Data Streams 與結構描述登錄檔整合的詳細說明，請參閱「使用 KPL/KCL 程式庫與資料互動」一節[：將 Amazon Kinesis Data Streams 與 AWS Glue 結構描述登錄檔整合](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds)。

# 設定 KPL 代理組態
<a name="kpl-proxy-configuration"></a>

對於無法直接連線至網際網路的應用程式，所有 AWS SDK 用戶端都支援使用 HTTP 或 HTTPS 代理。在一般的企業環境中，所有輸出網路流量都必須經過代理伺服器。如果您的應用程式使用 Kinesis Producer Library (KPL) 在使用代理伺服器 AWS 的環境中收集和傳送資料至 ，您的應用程式將需要 KPL 代理組態。KPL 是建置在 AWS Kinesis SDK 上的高階程式庫。它被分成一個原生處理程序和一個包裝函式。原生處理程序會執行處理和傳送記錄的所有作業，而包裝函式則會管理原生處理序並與其通訊。如需詳細資訊，請參閱[使用 Amazon Kinesis Producer Library 實作高效且可靠的生產者](https://aws.amazon.com/blogs/big-data/implementing-efficient-and-reliable-producers-with-the-amazon-kinesis-producer-library/)。

包裝函式是以 Java 撰寫的，而原生處理程序是使用 Kinesis SDK 以 C\$1\$1 撰寫的。KPL 版本 0.14.7 及更高版本現在支援 Java 包裝器中的代理組態，該包裝器可以將所有代理組態傳遞給原生處理程序。如需詳細資訊，請參閱 [https://github.com/awslabs/amazon-kinesis-producer/releases/tag/v0.14.7](https://github.com/awslabs/amazon-kinesis-producer/releases/tag/v0.14.7)。

您可以使用以下程式碼將代理組態新增到您的 KPL 應用程式。

```
KinesisProducerConfiguration configuration = new KinesisProducerConfiguration();
// Next 4 lines used to configure proxy 
configuration.setProxyHost("10.0.0.0"); // required
configuration.setProxyPort(3128); // default port is set to 443
configuration.setProxyUserName("username"); // no default 
configuration.setProxyPassword("password"); // no default

KinesisProducer kinesisProducer = new KinesisProducer(configuration);
```

# KPL 版本生命週期政策
<a name="kpl-version-lifecycle-policy"></a>

本主題概述 Amazon Kinesis Producer Library (KPL) 的版本生命週期政策。 會 AWS 定期提供 KPL 版本的新版本，以支援新功能和增強功能、錯誤修正、安全修補程式和相依性更新。我們建議您隨時掌握 KPL up-to-date以掌握最新功能、安全性更新和基礎相依性。**我們不**建議繼續使用不支援的 KPL 版本。

主要 KPL 版本的生命週期包含下列三個階段：
+ **一般可用性 (GA)** – 在此階段，完全支援主要版本。 AWS 提供一般次要和修補程式版本，包括對 Kinesis Data Streams 新功能或 API 更新的支援，以及錯誤和安全性修正。
+ **維護模式** – AWS 限制修補程式版本，以僅解決重大錯誤修正和安全問題。主要版本不會收到 Kinesis Data Streams 新功能或 APIs 的更新。
+ **End-of-support** – 主要版本將不再收到更新或版本。先前發佈的版本將繼續透過公有套件管理員提供，且程式碼將保留在 GitHub 上。使用者可自行決定是否使用已end-of-support版本。我們建議您升級至最新的主要版本。


| 主要版本 | 目前階段 | 版本日期 | 維護模式日期 | End-of-support日期 | 
| --- | --- | --- | --- | --- | 
| KPL 0.x | 維護模式 | 2015-06-02 | 2025-04-17 | 2026-01-30 | 
| KPL 1.x | 一般可用性 | 2024-12-15 | -- | -- | 

# 使用 Amazon Kinesis Data Streams API 搭配 開發生產者 適用於 Java 的 AWS SDK
<a name="developing-producers-with-sdk"></a>

您可以使用 Amazon Kinesis Data Streams API 搭配適用於 Java 的 AWS SDK 來開發生產者。如果您是 Kinesis Data Streams 的新手，請先熟悉一下 [什麼是 Amazon Kinesis Data Streams？](introduction.md) 及 [使用 AWS CLI 執行 Amazon Kinesis Data Streams 操作](getting-started.md) 所介紹的概念和術語。

本文範例會討論 [Kinesis Data Streams API](https://docs.aws.amazon.com/kinesis/latest/APIReference/) 並使用[適用於 Java 的AWS SDK](https://aws.amazon.com/sdk-for-java/) 將資料加入 (放入) 串流。不過，對於大部分的使用案例，則應使用 Kinesis Data Streams KPL 程式庫為宜。如需詳細資訊，請參閱[使用 Amazon Kinesis Producer Library (KPL) 開發生產者](developing-producers-with-kpl.md)。

本章的 Java 範例程式碼示範如何執行基本的 Kinesis Data Streams API 操作，並依操作類型按照邏輯進行劃分。這些範例不代表可立即生產的程式碼，無法檢查出所有可能的例外狀況，也不可視為任何潛在安全或效能疑慮的原因。此外，您亦可使用其他程式設計語言呼叫 [Kinesis Data Streams API](https://docs.aws.amazon.com/kinesis/latest/APIReference/)。如需所有 AWS SDKs的詳細資訊，請參閱[使用 Amazon Web Services 開始開發](https://aws.amazon.com/developers/getting-started/)。

每項任務皆有其先決條件；例如，若要加入資料至串流，您必須先建立串流，而建立串流則需事先建立用戶端。如需詳細資訊，請參閱[建立和管理 Kinesis 資料串流](working-with-streams.md)。

**Topics**
+ [將資料新增至串流](#kinesis-using-sdk-java-add-data-to-stream)
+ [使用 AWS Glue 結構描述登錄檔與資料互動](kinesis-integration-glue-schema-registry.md)

## 將資料新增至串流
<a name="kinesis-using-sdk-java-add-data-to-stream"></a>

一旦建立了串流之後，您即可將資料以記錄的形式加入至該串流。記錄是一種資料結構，其中包含所要處理的資料 Blob 形式的資料。當您將資料存放於記錄後，Kinesis Data Streams 即絲毫不會檢查、解譯或變更該資料。每筆記錄也各有其相關聯的序號和分割區索引鍵。

Kinesis Data Streams API 提供兩種不同的操作可新增資料至串流：[https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) 和 [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html)。`PutRecords` 操作會透過 HTTP 請求將多筆記錄傳送至您的串流，而單數的 `PutRecord` 操作則是一次傳送一筆記錄至您的串流 (需針對每筆記錄發出單獨的 HTTP 請求)。大多數應用程式均應使用 `PutRecords` 為宜，因為這將使每個資料生產者達到更高的傳輸量。如需上述各項操作的詳細資訊，請分別參閱以下各小節。

**Topics**
+ [使用 PutRecords 新增多筆記錄](#kinesis-using-sdk-java-putrecords)
+ [使用 PutRecord 新增單一記錄](#kinesis-using-sdk-java-putrecord)

務請切記，若您的來源應用程式使用 Kinesis Data Streams API 加入資料至串流，很可能會有一個或多個取用者應用程式同時處理從串流取出的資料。如需有關取用者如何使用 Kinesis Data Streams API 取得資料的詳細資訊，請參閱 [從串流取得資料](developing-consumers-with-sdk.md#kinesis-using-sdk-java-get-data)。

**重要**  
[變更資料保留期間](kinesis-extended-retention.md)

### 使用 PutRecords 新增多筆記錄
<a name="kinesis-using-sdk-java-putrecords"></a>

[https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) 操作會透過單次請求將多筆記錄傳送至 Kinesis Data Streams。藉由使用 `PutRecords`，生產者傳送資料至其 Kinesis 資料串流時將可達到更高的輸送量。每個 `PutRecords` 請求最高可支援 500 筆記錄。請求中的每筆記錄最大可為 1 MB，整個請求的最高限制為 5 MB，包括分區索引鍵。如同以下所述的單數 `PutRecord` 操作，`PutRecords` 也使用序號和分割區索引鍵。不過，`PutRecord` 的 `SequenceNumberForOrdering` 參數並未包含在 `PutRecords` 呼叫中。`PutRecords` 操作將嘗試依照請求的自然順序處理所有記錄。

每筆資料記錄都有獨一無二的序號。此序號是由 Kinesis Data Streams 在您呼叫 `client.putRecords` 將資料記錄加入至串流後所指派。同一分割區索引鍵的序號通常會隨著時間而增加；逐次 `PutRecords` 請求的間隔期間愈長，序號將變得愈大。

**注意**  
序號不能用做為同一串流中各資料集的索引。若要按照邏輯分隔資料集，請使用分割區索引鍵或為每個資料集建立個別串流。

`PutRecords` 請求可以附上具有不同分割區索引鍵的記錄。請求是以整個串流當成範圍；每次請求均能附上任意組合的分割區索引鍵和記錄，總數最多可達到請求的限額。使用許多不同的分割區索引鍵對具有許多不同碎片的串流發出請求，通常會比使用少量的分割區索引鍵對少量的碎片發出請求更快。分割區索引鍵數目應該要比碎片數目大得多，以減少延遲並達到最高的傳輸量。

#### PutRecords 範例
<a name="kinesis-using-sdk-java-putrecords-example"></a>

以下程式碼會建立 100 筆具有循序分割區索引鍵的資料記錄，並將其放入名為 `DataStream` 的串流。

```
        AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
        
        clientBuilder.setRegion(regionName);
        clientBuilder.setCredentials(credentialsProvider);
        clientBuilder.setClientConfiguration(config);
        
        AmazonKinesis kinesisClient = clientBuilder.build();
 
        PutRecordsRequest putRecordsRequest  = new PutRecordsRequest();
        putRecordsRequest.setStreamName(streamName);
        List <PutRecordsRequestEntry> putRecordsRequestEntryList  = new ArrayList<>(); 
        for (int i = 0; i < 100; i++) {
            PutRecordsRequestEntry putRecordsRequestEntry  = new PutRecordsRequestEntry();
            putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes()));
            putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i));
            putRecordsRequestEntryList.add(putRecordsRequestEntry); 
        }

        putRecordsRequest.setRecords(putRecordsRequestEntryList);
        PutRecordsResult putRecordsResult  = kinesisClient.putRecords(putRecordsRequest);
        System.out.println("Put Result" + putRecordsResult);
```

`PutRecords` 回應包含回應 `Records` 的陣列。回應陣列中的每筆記錄依照自然順序 (請求和回應的內容由上而下) 與請求陣列中的某一記錄直接相關。回應 `Records` 陣列一律包含與請求陣列相同數目的記錄。

#### 使用 PutRecords 時處理失敗
<a name="kinesis-using-sdk-java-putrecords-handling-failures"></a>

預設情況下，請求中個別記錄的失敗不會造成停止處理 `PutRecords` 請求中的後續記錄。這表示回應 `Records` 陣列包含已成功處理和未成功處理的記錄。您必須偵測未成功處理的記錄，並將其納入到後續呼叫中。

成功的記錄包含 `SequenceNumber` 和 `ShardID` 值，未成功的記錄則包含 `ErrorCode` 和 `ErrorMessage` 值。`ErrorCode` 參數反映錯誤類型，且可能是下列值的其中之一：`ProvisionedThroughputExceededException` 或 `InternalFailure`。`ErrorMessage` 提供關於`ProvisionedThroughputExceededException` 例外的更詳細資訊，包括帳戶 ID、串流名稱、以及遭節制的記錄之碎片 ID。以下範例所示的 `PutRecords` 請求中有三筆記錄。第二筆記錄發生失敗，會反映在回應中。

**Example PutRecords 請求語法**  

```
{
    "Records": [
        {
    	"Data": "XzxkYXRhPl8w",
	    "PartitionKey": "partitionKey1"
        },
        {
    	"Data": "AbceddeRFfg12asd",
	    "PartitionKey": "partitionKey1"	
        },
        {
    	"Data": "KFpcd98*7nd1",
	    "PartitionKey": "partitionKey3"
        }
    ],
    "StreamName": "myStream"
}
```

**Example PutRecords 回應語法**  

```
{
    "FailedRecordCount”: 1,
    "Records": [
        {
	    "SequenceNumber": "21269319989900637946712965403778482371",
	    "ShardId": "shardId-000000000001"

        },
        {
	    “ErrorCode":”ProvisionedThroughputExceededException”,
	    “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."

        },
        {
	    "SequenceNumber": "21269319989999637946712965403778482985",
	    "ShardId": "shardId-000000000002"
        }
    ]
}
```

未成功處理的記錄可納入到後續的 `PutRecords` 請求中。首先，查看 `putRecordsResult` 中的 `FailedRecordCount` 參數以確認請求中是否有失敗的記錄。若有，即應將 `putRecordsEntry` 非 `ErrorCode` 的每個 `null` 加入至後續的請求。如需此處理常式類型的範例，請參閱以下程式碼。

**Example PutRecords 失敗處理常式**  

```
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
putRecordsRequest.setStreamName(myStreamName);
List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
for (int j = 0; j < 100; j++) {
    PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
    putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes()));
    putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j));
    putRecordsRequestEntryList.add(putRecordsRequestEntry);
}

putRecordsRequest.setRecords(putRecordsRequestEntryList);
PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);

while (putRecordsResult.getFailedRecordCount() > 0) {
    final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>();
    final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords();
    for (int i = 0; i < putRecordsResultEntryList.size(); i++) {
        final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i);
        final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i);
        if (putRecordsResultEntry.getErrorCode() != null) {
            failedRecordsList.add(putRecordRequestEntry);
        }
    }
    putRecordsRequestEntryList = failedRecordsList;
    putRecordsRequest.setRecords(putRecordsRequestEntryList);
    putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);
}
```

### 使用 PutRecord 新增單一記錄
<a name="kinesis-using-sdk-java-putrecord"></a>

逐次呼叫 [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html) 對單一記錄進行操作。`PutRecords`所述的 [使用 PutRecords 新增多筆記錄](#kinesis-using-sdk-java-putrecords) 操作方為首選，除非您的應用程式具體需要始終透過單次請求傳送單一記錄，或因其他緣故無法使用 `PutRecords`。

每筆資料記錄都有獨一無二的序號。此序號是由 Kinesis Data Streams 在您呼叫 `client.putRecord` 將資料記錄加入至串流後所指派。同一分割區索引鍵的序號通常會隨著時間而增加；逐次 `PutRecord` 請求的間隔期間愈長，序號將變得愈大。

 快速連續進行放置操作時，不保證傳回的序號會增加，因為放置操作對 Kinesis Data Streams 基本上是同時發生。為保證同一分割區索引鍵的序號嚴格遞增，請使用 `SequenceNumberForOrdering` 參數，如 [PutRecord 範例](#kinesis-using-sdk-java-putrecord-example)的程式碼範例所示。

 無論您是否使用 `SequenceNumberForOrdering`，Kinesis Data Streams 透過 `GetRecords` 呼叫接收的記錄都將依照序號嚴格排序。

**注意**  
序號不能用做為同一串流中各資料集的索引。若要按照邏輯分隔資料集，請使用分割區索引鍵或為每個資料集建立個別串流。

分割區索引鍵用於將串流中的資料分組。資料記錄是根據其分割區索引鍵指派給串流中的碎片。具體而言，Kinesis Data Streams 使用分割區索引鍵做為雜湊函數的輸入，由該函數將分割區索引鍵 (和相關聯的資料) 對應到特定碎片。

 經過此雜湊處理機制，具有相同分割區索引鍵的所有資料記錄會對應到串流中的同一碎片。然而，若分割區索引鍵數目多過碎片數目，某些碎片即必定包含具有不同分割區索引鍵的記錄。從設計的角度來看，為確保您的所有碎片獲得充分利用，碎片數目 (由 `setShardCount` 的 `CreateStreamRequest` 方法指定) 應遠少於獨一分割區索引鍵的數目，且流向單一分割區索引鍵的資料量應遠少於碎片容量。

#### PutRecord 範例
<a name="kinesis-using-sdk-java-putrecord-example"></a>

以下程式碼會建立 10 筆跨兩個分割區索引鍵分佈的資料記錄，並將其放入名為 `myStreamName` 的串流。

```
for (int j = 0; j < 10; j++) 
{
  PutRecordRequest putRecordRequest = new PutRecordRequest();
  putRecordRequest.setStreamName( myStreamName );
  putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() ));
  putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 ));  
  putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord );
  PutRecordResult putRecordResult = client.putRecord( putRecordRequest );
  sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber();
}
```

上述程式碼範例使用 `setSequenceNumberForOrdering` 保證每個分割區索引鍵內的順序嚴格遞增。為求有效使用此參數，將目前記錄 `SequenceNumberForOrdering` (記錄 *n*) 設為前一記錄 (記錄 *n-1*) 的序號。為取得已加入至串流的記錄其序號，則對 `getSequenceNumber` 的結果呼叫 `putRecord`。

`SequenceNumberForOrdering` 參數可確保嚴格遞增分割區索引鍵的序號。`SequenceNumberForOrdering` 不提供跨多個分割區索引鍵的記錄排序。

# 使用 AWS Glue 結構描述登錄檔與資料互動
<a name="kinesis-integration-glue-schema-registry"></a>

您可以將 Kinesis 資料串流與 AWS Glue 結構描述登錄檔整合。 AWS Glue 結構描述登錄檔可讓您集中探索、控制和發展結構描述，同時確保已註冊結構描述持續驗證產生的資料。結構描述定義資料記錄的結構和格式。結構描述是可靠的資料發佈、耗用或儲存的版本化規格。 AWS Glue 結構描述登錄檔可讓您改善串流應用程式中end-to-end資料品質和資料控管。如需詳細資訊，請參閱 [AWS Glue 結構描述登錄檔](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)。設定此整合的其中一個方法是透過 AWS Java SDK 中提供的 `PutRecords` 和 `PutRecord` Kinesis Data Streams API。

如需如何使用 PutRecords 和 PutRecord Kinesis Data Streams APIs 設定 Kinesis Data Streams 與結構描述登錄整合的詳細說明，請參閱[使用案例：整合 Amazon Kinesis Data Streams 與 AWS Glue 結構描述登錄](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds)檔中的「使用 Kinesis Data Streams APIs 與資料互動」一節。

# 使用 Kinesis Agent 寫入 Amazon Kinesis Data Streams
<a name="writing-with-agents"></a>

Kinesis 代理程式是獨立的 Java 軟體應用程式，可讓您輕鬆收集資料並將資料傳送至 Kinesis Data Streams。此代理程式將持續監控一組檔案，並且傳送新資料到您的串流。代理程式會處理檔案輪換、檢查點，並在故障時重試。它以可靠、及時和簡單的方式提供所有資料。它也會發出 Amazon CloudWatch 指標，協助您更有效地監控串流程序並進行疑難排解。

根據預設，記錄會從各個檔案根據換行符號 (`'\n'`) 字元進行剖析。不過，代理程式也可以設定為剖析多行記錄 (請參閱[指定代理程式組態設定](#agent-config-settings))。

您可以在以 Linux 為基礎的伺服器環境安裝代理程式，例如 Web 伺服器、日誌伺服器，及資料庫伺服器。安裝代理程式後，請透過指定要監控的檔案和資料的串流以進行設定。代理程式設定妥後，其將持續從檔案收集資料並以可靠的方式傳送資料至串流。

**Topics**
+ [完成 Kinesis Agent 的先決條件](#prereqs)
+ [下載並安裝代理程式](#download-install)
+ [設定和啟動代理程式](#config-start)
+ [指定代理程式組態設定](#agent-config-settings)
+ [監控多個檔案目錄並寫入多個串流](#sim-writes)
+ [使用代理程式預先處理資料](#pre-processing)
+ [使用代理程式 CLI 命令](#cli-commands)
+ [常見問答集](#agent-faq)

## 完成 Kinesis Agent 的先決條件
<a name="prereqs"></a>
+ 您的作業系統必須是 Amazon Linux AMI 2015.09 版或更新版本，或 Red Hat Enterprise Linux 版本 7 或更新版本。
+ 如果您使用 Amazon EC2 執行您的代理程式，則請啟動您的 EC2 執行個體。
+ 使用下列其中一種方法管理您的 AWS 登入資料：
  + 當您啟動 EC2 執行個體時，指定 IAM 角色。
  + 設定代理程式時指定 AWS 登入資料 （請參閱 [awsAccessKeyId](#awsAccessKeyId) 和 [awsSecretAccessKey](#awsSecretAccessKey))。
  + 編輯 `/etc/sysconfig/aws-kinesis-agent` 以指定您的區域和 AWS 存取金鑰。
  + 如果您的 EC2 執行個體位於不同的 AWS 帳戶中，請建立 IAM 角色以提供 Kinesis Data Streams 服務的存取權，並在設定代理程式時指定該角色 （請參閱 [assumeRoleARN](#assumeRoleARN) 和 [assumeRoleExternalId](#assumeRoleExternalId))。使用上述其中一種方法來指定其他帳戶中具有擔任此角色許可之使用者的 AWS 登入資料。
+ 您指定的 IAM 角色或 AWS 登入資料必須具有執行 Kinesis Data Streams [PutRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) 操作的許可，代理程式才能將資料傳送至您的串流。若您啟用 CloudWatch 監控代理程式，則另需具備執行 CloudWatch [PutMetricData](https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html) 操作的許可。如需詳細資訊，請參閱 [使用 IAM 控制對 Amazon Kinesis Data Streams 資源的存取](controlling-access.md)、[使用 Amazon CloudWatch 監控 Kinesis Data Streams Agent 運作狀態](agent-health.md) 和 [CloudWatch 存取控制](https://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/UsingIAM.html)。

## 下載並安裝代理程式
<a name="download-install"></a>

首先，連接至您的執行個體。如需詳細資訊，請參閱《*Amazon EC2 使用者指南*》中的[連線至您的執行個體](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-connect-to-instance-linux.html)。如果您無法連線，請參閱《*Amazon EC2 使用者指南*》中的[連線至執行個體的故障診斷](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/TroubleshootingInstancesConnecting.html)。

**使用 Amazon Linux AMI 設定代理程式**  
使用以下命令來下載和安裝代理程式：

```
sudo yum install –y aws-kinesis-agent
```

**使用 Red Hat Enterprise Linux 設定代理程式**  
使用以下命令來下載和安裝代理程式：

```
sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm
```

**使用 GitHub 設定代理程式**

1. 從 [awlabs/amazon-kinesis-agent](https://github.com/awslabs/amazon-kinesis-agent) 下載代理程式。

1. 瀏覽到下載目錄並執行下列命令以安裝代理程式：

   ```
   sudo ./setup --install
   ```

**若要在 Docker 容器中設定代理程式**  
Kinesis 代理程式也可以透過 [amazonlinux](https://docs.aws.amazon.com/AmazonECR/latest/userguide/amazon_linux_container_image.html) 容器基礎在容器中執行。使用以下 Dockerfile，然後執行 `docker build`。

```
FROM amazonlinux

RUN yum install -y aws-kinesis-agent which findutils
COPY agent.json /etc/aws-kinesis/agent.json

CMD ["start-aws-kinesis-agent"]
```

## 設定和啟動代理程式
<a name="config-start"></a>

**設定和啟動代理程式**

1. 開啟並編輯組態檔案 (如果使用預設檔案存取許可，即以超級使用者身分執行)：`/etc/aws-kinesis/agent.json`

   在此組態檔案中，指定代理程式從中收集資料的檔案 (`"filePattern"`)，以及代理程式將向其傳送資料的串流名稱 (`"kinesisStream"`)。請注意，檔案名稱是一種模式，代理程式可辨識檔案輪換。您可以輪換檔案或建立新的檔案，每秒不超過一次。代理程式利用檔案建立時間戳記以判斷要追蹤哪些檔案，然後傳送至您的串流；如果每秒建立新檔案或輪換檔案超過一次，將導致代理程式無法正確區分這些檔案。

   ```
   { 
      "flows": [
           { 
               "filePattern": "/tmp/app.log*", 
               "kinesisStream": "yourkinesisstream"
           } 
      ] 
   }
   ```

1. 手動啟動代理程式：

   ```
   sudo service aws-kinesis-agent start
   ```

1. (選用) 設定代理程式在系統啟動時開始執行：

   ```
   sudo chkconfig aws-kinesis-agent on
   ```

代理程式現在已做為系統服務在背景執行。其將持續監控指定的檔案，並將資料傳送至指定的串流。代理程式的活動記錄於 `/var/log/aws-kinesis-agent/aws-kinesis-agent.log`。

## 指定代理程式組態設定
<a name="agent-config-settings"></a>

代理程式支援兩種必要的組態設定 `filePattern` 和 `kinesisStream`，以及用於其他功能的選用組態設定。您可以由 `/etc/aws-kinesis/agent.json` 指定必要及選用的組態。

當您變更組態檔案時，必須使用下列命令停止及啟動代理程式：

```
sudo service aws-kinesis-agent stop
sudo service aws-kinesis-agent start
```

或者，您可以使用下列命令：

```
sudo service aws-kinesis-agent restart
```

以下是一般組態設定。


| 組態設定 | Description | 
| --- | --- | 
| <a name="assumeRoleARN"></a>assumeRoleARN |  要由使用者擔任的角色 ARN。如需詳細資訊，請參閱《[IAM 使用者指南》中的使用 IAM 角色跨 AWS 帳戶委派存取權](https://docs.aws.amazon.com/IAM/latest/UserGuide/tutorial_cross-account-with-roles.html)。 **  | 
| <a name="assumeRoleExternalId"></a>assumeRoleExternalId |  選用的識別符決定誰可以擔任此角色。如需詳細資訊，請參閱《IAM 使用者指南》**中的[如何使用外部 ID](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html)。  | 
| <a name="awsAccessKeyId"></a>awsAccessKeyId |  AWS 會覆寫預設登入資料的存取金鑰 ID。此設定優先於所有其他登入資料供應商。  | 
| <a name="awsSecretAccessKey"></a>awsSecretAccessKey |  AWS 覆寫預設登入資料的私密金鑰。此設定優先於所有其他登入資料供應商。  | 
| cloudwatch.emitMetrics |  如設定為 (true)，將啟用代理程式發出指標至 CloudWatch。 預設：true  | 
| cloudwatch.endpoint |  適用於 CloudWatch 的區域端點。 預設：`monitoring.us-east-1.amazonaws.com`  | 
| kinesis.endpoint |  適用於 Kinesis Data Streams 的區域端點。 預設：`kinesis.us-east-1.amazonaws.com`  | 

以下是流程組態設定。


| 組態設定 | Description | 
| --- | --- | 
| dataProcessingOptions |  將每個剖析的記錄傳送到串流之前會套用於這些記錄的處理選項的清單。此處理選項會在指定的資料夾執行。如需詳細資訊，請參閱[使用代理程式預先處理資料](#pre-processing)。  | 
| kinesisStream |  [必要] 串流的名稱。  | 
| filePattern |  【必要】 必須相符的目錄和檔案模式，才能由代理程式挑選。符合此模式的所有檔案皆需將讀取許可授予 `aws-kinesis-agent-user`。對於包含這些檔案的目錄，必須將讀取和執行許可授予 `aws-kinesis-agent-user`。  | 
| initialPosition |  檔案開始進行剖析的初始位置。有效值為 `START_OF_FILE` 和 `END_OF_FILE`。 預設：`END_OF_FILE`  | 
| maxBufferAgeMillis |  代理程式將資料傳送到串流之前先緩衝資料的時間上限 (毫秒)。 數值範圍：1,000 到 900,000 (1 秒到 15 分鐘) 預設：60,000 (1 分鐘)  | 
| maxBufferSizeBytes |  代理程式將資料傳送到串流之前先緩衝資料的容量上限 (位元組)。 數值範圍：1 到 4,194,304 (4 MB) 預設：4,194,304 (4 MB)  | 
| maxBufferSizeRecords |  代理程式將資料傳送到串流之前先緩衝資料的記錄數上限。 數值範圍：1 到 500 預設：500  | 
| minTimeBetweenFilePollsMillis |  代理程式輪詢和剖析檔案以找出新資料的時間間隔 (以毫秒為單位)。 數值範圍：1 或以上 預設：100  | 
| multiLineStartPattern |  用於識別記錄開始處的模式。記錄是由符合模式的一列及不符合模式的任何幾列所組成。有效值為常規運算式。根據預設，每個新日誌檔中的新列會剖析為一筆記錄。  | 
| partitionKeyOption |  產生分割區索引鍵的方法。有效值為 `RANDOM` (隨機產生的整數) 和 `DETERMINISTIC` (根據資料計算得出的雜湊值)。 預設：`RANDOM`  | 
| skipHeaderLines |  代理程式剖析監控檔案開頭部分時略過的列數。 數值範圍：0 或以上 預設：0 (零)  | 
| truncatedRecordTerminator |  記錄大小超過 Kinesis Data Streams 記錄大小限制時，代理程式將用來截斷剖析之記錄的字串。(1,000 KB) 預設：`'\n'` (換行符號)  | 

## 監控多個檔案目錄並寫入多個串流
<a name="sim-writes"></a>

透過指定多個流程組態設定，您可以設定代理程式來監控多個檔案目錄，然後將資料傳送到多個串流。在下列組態範例中，代理程式會監控兩個檔案目錄，並分別將資料傳送至 Kinesis 串流和 Firehose 交付串流。請注意，您可以為 Kinesis Data Streams 和 Firehose 指定不同的端點，因此 Kinesis 串流和 Firehose 交付串流不需要位於相同的區域。

```
{
    "cloudwatch.emitMetrics": true,
    "kinesis.endpoint": "https://your/kinesis/endpoint", 
    "firehose.endpoint": "https://your/firehose/endpoint", 
    "flows": [
        {
            "filePattern": "/tmp/app1.log*", 
            "kinesisStream": "yourkinesisstream"
        }, 
        {
            "filePattern": "/tmp/app2.log*",
            "deliveryStream": "yourfirehosedeliverystream" 
        }
    ] 
}
```

如需搭配 Firehose 使用代理程式的詳細資訊，請參閱[搭配 Kinesis 代理程式寫入 Amazon Data Firehose](https://docs.aws.amazon.com/firehose/latest/dev/writing-with-agents.html)。

## 使用代理程式預先處理資料
<a name="pre-processing"></a>

代理程式可預先處理經由受監控檔案所剖析的記錄，然後再將其傳送至您的串流。您可以將 `dataProcessingOptions` 組態設定新增到您的檔案流程以啟用此功能。可新增一或多個處理選項，這些選項將會依照指定的順序執行。

代理程式支援下列處理選項。由於代理程式是開放原始碼，您可以進一步開發和擴展其處理選項。您可以從 [Kinesis 代理程式](https://github.com/awslabs/amazon-kinesis-agent)下載代理程式。處理選項

`SINGLELINE`  
藉由移除換行字元、前方空格及結尾空格，將多列記錄轉換為單列記錄。  

```
{
    "optionName": "SINGLELINE"
}
```

`CSVTOJSON`  
將記錄從分隔符號區隔格式轉換為 JSON 格式。  

```
{
    "optionName": "CSVTOJSON",
    "customFieldNames": [ "field1", "field2", ... ],
    "delimiter": "yourdelimiter"
}
```  
`customFieldNames`  
[必要] 欄位名稱在每個 JSON 鍵值對中做為鍵。例如，若您指定 `["f1", "f2"]`，記錄「v1, v2」將轉換為 `{"f1":"v1","f2":"v2"}`。  
`delimiter`  
在記錄做為分隔符號的字串。預設為逗號 (,)。

`LOGTOJSON`  
將記錄從日誌格式轉換為 JSON 格式。支援的日誌格式為 **Apache Common Log**、**Apache Combined Log**、**Apache Error Log**、以及 **RFC3164 Syslog**。  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "logformat",
    "matchPattern": "yourregexpattern",
    "customFieldNames": [ "field1", "field2", … ]
}
```  
`logFormat`  
[必要] 日誌項目格式。以下是可能的值：  
+ `COMMONAPACHELOG` – Apache Common Log 格式。根據預設，每個日誌項目皆有以下模式：「`%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes}`」。
+ `COMBINEDAPACHELOG` – Apache Combined Log 格式。根據預設，每個日誌項目皆有以下模式：「`%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent}`」。
+ `APACHEERRORLOG` – Apache Error Log 格式。根據預設，每個日誌項目皆有以下模式：「`[%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message}`」。
+ `SYSLOG` – RFC3164 Syslog 格式。根據預設，每個日誌項目皆有以下模式：「`%{timestamp} %{hostname} %{program}[%{processid}]: %{message}`」。  
`matchPattern`  
用於從日誌項目擷取值的正規運算式模式。如果您的日誌項目不屬於任一種預先定義的日誌格式，則將使用此設定。使用此設定時，您還必須指定 `customFieldNames`。  
`customFieldNames`  
自訂欄位名稱在每個 JSON 鍵值對中做為鍵。您可以使用此設定來定義從 `matchPattern` 擷取的值的欄位名稱，或覆寫預先定義的日誌格式的預設欄位名稱。

**Example ：LOGTOJSON 組態**  <a name="example-logtojson"></a>
這裡提供一個 Apache Common Log 項目轉換為 JSON 格式的 `LOGTOJSON` 組態範例：  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "COMMONAPACHELOG"
}
```
轉換前：  

```
64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291
```
轉換後：  

```
{"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}
```

**Example ：使用自訂欄位的 LOGTOJSON 組態**  <a name="example-logtojson-custom-fields"></a>
以下是另一個 `LOGTOJSON` 組態範例：  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "COMMONAPACHELOG",
    "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"]
}
```
使用此組態設定，前一個範例的相同 Apache Common Log 項目轉換為 JSON 格式如下：  

```
{"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}
```

**Example ：轉換 Apache Common Log 項目**  <a name="example-apache-common-log-entry"></a>
以下流程組態將 Apache Common Log 項目轉換為 JSON 格式的單列記錄：  

```
{ 
    "flows": [
        {
            "filePattern": "/tmp/app.log*", 
            "kinesisStream": "my-stream",
            "dataProcessingOptions": [
                {
                    "optionName": "LOGTOJSON",
                    "logFormat": "COMMONAPACHELOG"
                }
            ]
        }
    ] 
}
```

**Example ：轉換多列記錄**  <a name="example-convert-multiline"></a>
以下流程組態剖析第一行從「`[SEQUENCE=`」開始的多列記錄。每筆記錄都會先轉換為單列記錄。然後，根據定位鍵分隔符號從記錄中擷取值。擷取的值會對應到指定的 `customFieldNames` 值以形成 JSON 格式的單列記錄。  

```
{ 
    "flows": [
        {
            "filePattern": "/tmp/app.log*", 
            "kinesisStream": "my-stream",
            "multiLineStartPattern": "\\[SEQUENCE=",
            "dataProcessingOptions": [
                {
                    "optionName": "SINGLELINE"
                },
                {
                    "optionName": "CSVTOJSON",
                    "customFieldNames": [ "field1", "field2", "field3" ],
                    "delimiter": "\\t"
                }
            ]
        }
    ] 
}
```

**Example ：使用匹配模式的 LOGTOJSON 組態**  <a name="example-logtojson-match-pattern"></a>
以下是 Apache Common Log 項目轉換為 JSON 格式的 `LOGTOJSON` 組態範例，省略最後欄位 (位元組)：  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "COMMONAPACHELOG",
    "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})",
    "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"]
}
```
轉換前：  

```
123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200
```
轉換後：  

```
{"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}
```

## 使用代理程式 CLI 命令
<a name="cli-commands"></a>

在系統啟動時自動開始執行代理程式：

```
sudo chkconfig aws-kinesis-agent on
```

檢查代理程式的狀態：

```
sudo service aws-kinesis-agent status
```

停止代理程式：

```
sudo service aws-kinesis-agent stop
```

從這個位置讀取代理程式的日誌檔案：

```
/var/log/aws-kinesis-agent/aws-kinesis-agent.log
```

解除安裝代理程式：

```
sudo yum remove aws-kinesis-agent
```

## 常見問答集
<a name="agent-faq"></a>

### 是否有適用於 Windows 的 Kinesis 代理程式？
<a name="agent-faq-1"></a>

[適用於 Windows 的 Kinesis 代理程式](https://docs.aws.amazon.com/kinesis-agent-windows/latest/userguide/what-is-kinesis-agent-windows.html)是不同於適用於 Linux 平台的 Kinesis 代理程式的軟體。

### 為什麼 Kinesis 代理程式會減速和/或 `RecordSendErrors` 增加？
<a name="agent-faq-2"></a>

這通常是由於來自 Kinesis 的限流。檢查 Kinesis Data Streams 的 `WriteProvisionedThroughputExceeded` 指標或 Firehose Delivery Streams 的 `ThrottledRecords` 指標。這些指標中從 0 開始的任何增量，均表示需要提升串流限制。如需詳細資訊，請參閱 [Kinesis Data Stream 限制](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html)和 [Amazon Firehose 交付串流](https://docs.aws.amazon.com/firehose/latest/dev/limits.html)。

排除限流之後，請查看 Kinesis 代理程式是否設定為追蹤大量小型檔案。Kinesis 代理程式追蹤新檔案時會有延遲，因此 Kinesis 代理程式應追蹤少量較大的檔案。嘗試將日誌檔案合併至較大的檔案中。

### 為什麼我會遇到 `java.lang.OutOfMemoryError` 例外狀況？
<a name="agent-faq-4"></a>

Kinesis 代理程式沒有足夠的記憶體可以處理其目前的工作負載。嘗試增加 `/usr/bin/start-aws-kinesis-agent` 中的 `JAVA_START_HEAP` 和 `JAVA_MAX_HEAP` 並重新啟動代理程式。

### 為什麼我會遇到 `IllegalStateException : connection pool shut down` 例外狀況？
<a name="agent-faq-5"></a>

Kinesis 代理程式沒有足夠的連線可以處理其目前的工作負載。嘗試在位於 `/etc/aws-kinesis/agent.json` 的一般代理程式組態設定中增加 `maxConnections` 和 `maxSendingThreads`。這些欄位的預設值是可用執行期處理器的 12 倍。如需進階代理程式組態設定的詳細資訊，請參閱 [AgentConfiguration.java](https://github.com/awslabs/amazon-kinesis-agent/blob/master/src/com/amazon/kinesis/streaming/agent/config/AgentConfiguration.java)。

### 如何使用 Kinesis 代理程式對另一個問題進行偵錯？
<a name="agent-faq-6"></a>

可以在 `/etc/aws-kinesis/log4j.xml` 中啟用 `DEBUG` 層級日誌。

### 我應該如何對 Kinesis Agent 進行設定？
<a name="agent-faq-7"></a>

`maxBufferSizeBytes` 越小，Kinesis 代理程式傳送資料的頻率就越高。這可能很好，因為這樣會減少記錄的交付時間，但也增加了 Kinesis 的每秒請求。

### 為什麼 Kinesis 代理程式傳送重複的日誌？
<a name="agent-faq-8"></a>

發生這種情況是由於檔案追蹤組態錯誤。請確保每個 `fileFlow’s filePattern` 僅與一個檔案相符。如果正在 `copytruncate` 模式中使用 `logrotate` 模式下，也可能發生這種情況。嘗試將模式變更為預設模式，或建立模式以避免重複。如需有關處理重複記錄的詳細資訊，請參閱[處理重複記錄](https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-duplicates.html)。

# 使用其他服務寫入 Kinesis Data Streams AWS
<a name="using-other-services"></a>

下列 AWS 服務可以直接與 Amazon Kinesis Data Streams 整合，以將資料寫入 Kinesis 資料串流。檢閱您感興趣的每個服務的資訊，並參考提供的參考。

**Topics**
+ [使用 寫入 Kinesis Data Streams AWS Amplify](using-other-services-amplify.md)
+ [使用 Amazon Aurora 寫入 Kinesis Data Streams](using-other-services-aurora.md)
+ [使用 Amazon CloudFront 寫入 Kinesis Data Streams](using-other-services-CloudFront.md)
+ [使用 Amazon CloudWatch Logs 寫入 Kinesis Data Streams](using-other-services-cw-logs.md)
+ [使用 Amazon Connect 寫入 Kinesis Data Streams](using-other-services-connect.md)
+ [使用 寫入 Kinesis Data Streams AWS Database Migration Service](using-other-services-migration.md)
+ [使用 Amazon DynamoDB 寫入 Kinesis Data Streams](using-other-services-ddb.md)
+ [使用 Amazon EventBridge 寫入 Kinesis Data Streams](using-other-services-eventbridges.md)
+ [使用 寫入 Kinesis Data Streams AWS IoT Core](using-other-services-iot-core.md)
+ [使用 Amazon Relational Database Service 寫入 Kinesis Data Streams](using-other-services-rds.md)
+ [usingAmazon Pinpoint 寫入 Kinesis Data Streams](using-other-services-pinpoint.md)
+ [使用 Amazon Quantum Ledger Database (Amazon QLDB) 寫入 Kinesis Data Streams](using-other-services-quantum-ledger.md)

# 使用 寫入 Kinesis Data Streams AWS Amplify
<a name="using-other-services-amplify"></a>

您可以使用 Amazon Kinesis Data Streams 從使用 AWS Amplify 建置的行動應用程式串流資料，以進行即時處理。然後，您可以建立即時儀表板、擷取例外狀況並產生提醒、驅動建議，以及做出其他即時業務或營運決策。您也可以將資料傳送至其他 服務，例如 Amazon Simple Storage Service、Amazon DynamoDB 和 Amazon Redshift。

如需詳細資訊，請參閱 *AWS Amplify 開發人員中心*中的[使用 Amazon Kinesis](https://docs.amplify.aws/react/build-a-backend/more-features/analytics/streaming-data/)。

# 使用 Amazon Aurora 寫入 Kinesis Data Streams
<a name="using-other-services-aurora"></a>

您可以使用 Amazon Kinesis Data Streams 來監控 Amazon Aurora 資料庫叢集上的活動。使用資料庫活動串流，您的 Aurora 資料庫叢集可即時將活動推送至 Amazon Kinesis Data Stream。然後，可以建置應用程式以進行合規管理，以取用這些活動、進行稽核並產生提醒。您也可以使用 Amazon Firehose 來存放資料。

如需詳細資訊，請參閱《Amazon Aurora 資料庫開發人員指南》**中的[資料庫活動串流](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.html)。

# 使用 Amazon CloudFront 寫入 Kinesis Data Streams
<a name="using-other-services-CloudFront"></a>

您可以使用 Amazon Kinesis Data Streams 搭配 CloudFront 即時日誌，並即時取得對分佈提出之請求的相關資訊。然後，您可以建置自己的 [Kinesis 資料串流取用](https://docs.aws.amazon.com/streams/latest/dev/building-consumers.html)者，或使用 Amazon Data Firehose 將日誌資料傳送至 Amazon S3、Amazon Redshift、Amazon OpenSearch Service 或第三方日誌處理服務。

如需詳細資訊，請參閱《Amazon CloudFront 開發人員指南》中的[即時日誌](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/real-time-logs.html)。

# 使用 Amazon CloudWatch Logs 寫入 Kinesis Data Streams
<a name="using-other-services-cw-logs"></a>

您可以使用 CloudWatch 訂閱從 Amazon CloudWatch Logs 存取日誌事件的即時摘要，並將其交付至 Kinesis 資料串流以進行處理、分析和載入至其他系統。

如需更多資訊，請參閱**《Amazon CloudWatch Logs 使用者指南》中的[使用訂閱即時處理日誌資料](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Subscriptions.html)。

# 使用 Amazon Connect 寫入 Kinesis Data Streams
<a name="using-other-services-connect"></a>

您可以使用 Kinesis Data Streams，從 Amazon Connect 執行個體即時匯出聯絡人記錄和客服人員事件。您也可以從 Amazon Connect Customer Profiles 啟用資料串流，以自動接收 Kinesis 資料串流的更新，以建立新的設定檔或變更現有的設定檔。

然後，您可以建置取用者應用程式，以即時處理和分析資料。例如，使用聯絡記錄和客戶個人檔案資料，您可以讓來源系統資料 (例如 CRM 和行銷自動化工具) 與最新資訊保持同步。使用客服人員事件資料，您可以建立顯示客服人員資訊和事件的儀表板，並觸發特定客服人員活動的自訂通知。

如需詳細資訊，請參閱 **《Amazon Connect 管理員指南》中的[執行個體資料串流](https://docs.aws.amazon.com/connect/latest/adminguide/data-streaming.html)、[設定即時匯出](https://docs.aws.amazon.com/connect/latest/adminguide/set-up-real-time-export.html)和[客服人員事件串流](https://docs.aws.amazon.com/connect/latest/adminguide/agent-event-streams.html)。

# 使用 寫入 Kinesis Data Streams AWS Database Migration Service
<a name="using-other-services-migration"></a>

您可以使用 AWS Database Migration Service 將資料遷移至 Kinesis 資料串流。然後，可以構建取用者應用程式以即時處理資料記錄。您也可以輕鬆將資料傳送至其他 服務，例如 Amazon Simple Storage Service、Amazon DynamoDB 和 Amazon Redshift

如需詳細資訊，請參閱《AWS Database Migration Service 使用者指南》中的[使用 Kinesis Data Streams](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html)**。

# 使用 Amazon DynamoDB 寫入 Kinesis Data Streams
<a name="using-other-services-ddb"></a>

您可以使用 Amazon Kinesis Data Streams 來擷取 Amazon DynamoDB 的變更。Kinesis Data Streams 會擷取任何 DynamoDB 資料表中的項目層級修改，並將其複製到您選擇的 Kinesis 資料串流。您的取用者應用程式可以存取此串流，以即時檢視項目層級的變更，並在下游傳送這些變更，或根據內容採取動作。

如需詳細資訊，請參閱《Amazon DynamoDB 開發人員指南》**中的[ Kinesis Data Streams 如何與 DynamoDB 搭配運作](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/kds.html)。

# 使用 Amazon EventBridge 寫入 Kinesis Data Streams
<a name="using-other-services-eventbridges"></a>

使用 Kinesis Data Streams，您可以將 EventBridge 中的 AWS API 呼叫[事件](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-events.html)傳送至串流、建置取用者應用程式，以及處理大量資料。您也可以在 EventBridge 管道中使用 Kinesis Data Streams 作為目標，並在選擇性篩選和充實之後，從其中一個可用來源交付串流記錄。

如需詳細資訊，請參閱**《Amazon EventBridge 使用者指南》中的[傳送事件至 Amazon Kinesis 串流](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-relay-events-kinesis-stream.html)和[ EventBridge 管道](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html)。

# 使用 寫入 Kinesis Data Streams AWS IoT Core
<a name="using-other-services-iot-core"></a>

您可以使用 IoT AWS 規則動作，從 AWS IoT Core 中的 MQTT 訊息即時寫入資料。然後，您可以建置處理資料、分析其內容並產生提醒的應用程式，並將其傳遞至分析應用程式或其他 AWS 服務，

如需詳細資訊，請參閱**《AWS IoT Core 開發人員指南》中的 [Kinesis Data Streams](https://docs.aws.amazon.com/iot/latest/developerguide/kinesis-rule-action.html)。

# 使用 Amazon Relational Database Service 寫入 Kinesis Data Streams
<a name="using-other-services-rds"></a>

您可以使用 Amazon Kinesis Data Streams 來監控 Amazon RDS 執行個體上的活動。使用資料庫活動串流，Amazon RDS 會即時將活動推送至 Kinesis 資料串流。然後，可以建置應用程式以進行合規管理，以取用這些活動、進行稽核並產生提醒。您也可以使用 Amazon Data Firehose 來存放資料。

如需詳細資訊，請參閱《Amazon RDS 開發人員指南》**中的[資料庫活動串流](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/DBActivityStreams.html)。

# usingAmazon Pinpoint 寫入 Kinesis Data Streams
<a name="using-other-services-pinpoint"></a>

您可以將 Amazon Pinpoint 設定為將事件資料傳送至 Amazon Kinesis Data Streams。Amazon Pinpoint 可以傳送用於行銷活動、旅程、交易電子郵件和簡訊的事件資料。然後，您可以將資料擷取至分析應用程式中，或建置自己的取用者應用程式，這些應用程式會根據事件的內容採取動作。

如需詳細資訊，請參閱 《Amazon Pinpoint 開發人員指南》**中的[串流事件](https://docs.aws.amazon.com/pinpoint/latest/developerguide/event-streams.html)。

# 使用 Amazon Quantum Ledger Database (Amazon QLDB) 寫入 Kinesis Data Streams
<a name="using-other-services-quantum-ledger"></a>

您可以在 Amazon QLDB 中建立串流，擷取遞交至日誌的每個文件修訂，並即時將此資料交付至 Amazon Kinesis Data Streams。QLDB 串流是從總帳日誌傳送到 Kinesis 資料串流資源的連續資料流。然後，可以使用 Kinesis 串流平台或 Kinesis Client Library 來使用串流、處理資料記錄以及分析資料內容。QLDB 串流會以三種記錄類型將您的資料寫入 Kinesis Data Streams：`control`、`block summary` 和 `revision details`。

如需詳細資訊，請參閱《Amazon QLDB 開發人員指南》**中的[串流](https://docs.aws.amazon.com/qldb/latest/developerguide/streams.html)。

# 使用第三方整合寫入 Kinesis Data Streams
<a name="using-other-services-third-party"></a>

您可以使用與 Kinesis Data Streams 整合的下列其中一個第三方選項，將資料寫入 Kinesis Data Streams。選取您想要進一步了解的選項，並尋找相關文件的資源和連結。

**Topics**
+ [Apache Flink](using-other-services-flink.md)
+ [Fluentd](using-other-services-Fluentd.md)
+ [Debezium](using-other-services-Debezium.md)
+ [Oracle GoldenGate](using-other-services-Oracle-GoldenGate.md)
+ [Kafka Connect](using-other-services-kafka-connect.md)
+ [Adobe 體驗](using-other-services-adobe.md)
+ [Striim](using-other-services-Striim.md)

# Apache Flink
<a name="using-other-services-flink"></a>

Apache Flink 是一個架構和分散式處理引擎，用於對未限制和有限制資料串流進行狀態運算。如需從 Apache Flink 寫入 Kinesis Data Streams 的詳細資訊，請參閱 [Amazon Kinesis Data Streams 連接器](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kinesis/)。

# Fluentd
<a name="using-other-services-Fluentd"></a>

Fluentd 是用於統一日誌記錄層的開放原始碼資料收集器。如需從 Fluentd 寫入 Kinesis Data Streams 的詳細資訊。如需詳細資訊，請參閱[使用 Kinesis 處理串流](https://docs.fluentd.org/how-to-guides/kinesis-stream)。

# Debezium
<a name="using-other-services-Debezium"></a>

Debezium 是一個用於變更資料擷取的開放原始碼分散式平台。如需從 Debezium 寫入 Kinesis Data Streams 的詳細資訊，請參閱[將 MySQL 資料變更串流至 Amazon Kinesis](https://debezium.io/blog/2018/08/30/streaming-mysql-data-changes-into-kinesis/)。

# Oracle GoldenGate
<a name="using-other-services-Oracle-GoldenGate"></a>

Oracle GoldenGate 是一種軟體產品，可讓您複製、篩選，以及將資料從一個資料庫轉換到另一個資料庫。如需從 Oracle GoldenGate 寫入 Kinesis Data Streams 的詳細資訊，請參閱[使用 Oracle GoldenGate 將資料複寫至 Kinesis 資料串流](https://blogs.oracle.com/dataintegration/post/data-replication-to-aws-kinesis-data-stream-using-oracle-goldengate)。

# Kafka Connect
<a name="using-other-services-kafka-connect"></a>

Kafka Connect 是在 Apache Kafka 和其他系統之間以可擴展和可靠方式串流資料的工具。如需將資料從 Apache Kafka 寫入 Kinesis Data Streams 的詳細資訊，請參閱 [Kinesis Kafka 連接器](https://github.com/awslabs/kinesis-kafka-connector)。

# Adobe 體驗
<a name="using-other-services-adobe"></a>

Adobe 體驗平台使組織能夠集中和標準化來自任何系統的客戶資料。然後，它會套用資料科學和機器學習，大幅改善豐富、個人化體驗的設計和交付。如需將資料從 Adobe 體驗平台寫入 Kinesis Data Streams 的詳細資訊。請參閱如何建立 [Amazon Kinesis 連線](https://experienceleague.adobe.com/docs/experience-platform/destinations/catalog/cloud-storage/amazon-kinesis.html?lang=en)。

# Striim
<a name="using-other-services-Striim"></a>

Striim 是一個完整的端對端記憶體平台，用於即時收集、篩選、轉換、富集、彙總、分析和交付資料。如需如何從 Striim 將資料寫入 Kinesis Data Streams 的詳細資訊，請參閱 [Kinesis 寫入器](https://www.striim.com/docs/en/kinesis-writer.html)。

# Amazon Kinesis Data Streams 生產者疑難排解
<a name="troubleshooting-producers"></a>

**Topics**
+ [我的生產者應用程式寫入的速度比預期慢](#producer-writing-at-slower-rate)
+ [我收到未經授權的 KMS 主金鑰許可錯誤](#unauthorized-kms-producer)
+ [對生產者的其他常見問題進行故障診斷](#misc-troubleshooting-producer)

## 我的生產者應用程式寫入的速度比預期慢
<a name="producer-writing-at-slower-rate"></a>

**Topics**
+ [超過服務限制](#service-limits-exceeded)
+ [我想要最佳化我的生產者](#producer-optimization)
+ [操作濫用 `flushSync()`](#misuse-tag)

### 超過服務限制
<a name="service-limits-exceeded"></a>

若要查明是否超出服務限制，請檢查您的生產者是否由服務擲回了傳輸量例外狀況，並查驗有哪些 API 操作受到調節。請切記，視呼叫而定將有不同的限制，具體如 [配額和限制](service-sizes-and-limits.md)所述。例如，除了讀寫操作眾所周知的碎片層級限制外，另有以下的串流層級限制：
+ [CreateStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_CreateStream.html)
+ [DeleteStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeleteStream.html)
+ [ListStreams](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreams.html)
+ [GetShardIterator](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html)
+ [MergeShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_MergeShards.html)
+ [DescribeStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html)
+ [DescribeStreamSummary](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamSummary.html)

`CreateStream`、`DeleteStream`、`ListStreams`、`GetShardIterator` 和 `MergeShards` 操作的限制為每秒 5 次呼叫。`DescribeStream` 操作的限制為每秒 10 次呼叫。`DescribeStreamSummary` 操作的限制為每秒 20 次呼叫。

如果上述呼叫不存在問題，請確定您選取了能夠對所有碎片均勻地分佈 *put* 操作的分割區索引鍵，而且並無任何特定分割區索引鍵不慎達到了服務限制但其餘則未達到限制。對此，您將需要測量尖峰傳輸量並考量到串流中的碎片數目。如需如何管理串流的詳細資訊，請參閱[建立和管理 Kinesis 資料串流](working-with-streams.md)。

**提示**  
請記得，使用單一記錄的 [PutRecord](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html) 操作時，輸送量限流計算要無條件進位至最接近的 KB 數，而多筆記錄的 [PutRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) 操作需對每次呼叫累計的記錄總和進行捨入。例如，放入 600 筆記錄共 1.1 KB 大小的 `PutRecords` 請求將不會受到調節。

### 我想要最佳化我的生產者
<a name="producer-optimization"></a>

開始最佳化生產者之前，請先完成下列關鍵任務。首先，根據記錄大小和每秒記錄筆數，確認所需的尖峰傳輸量。接著，排除串流容量為限制因素 ([超過服務限制](#service-limits-exceeded)) 的可能性。如果您排除了串流容量，則針對以下兩種常見類型的生產者使用相應的故障診斷技巧及最佳化準則。

**大型生產者**

大型生產者通常是從內部部署伺服器或 Amazon EC2 執行個體執行。需要由大型生產者提供較高傳輸量的消費者通常會在意每一記錄延遲。處理延遲的策略包括：如果客戶可以微批次/緩衝記錄，在使用單一記錄操作 PutRecord 之前，請使用 [Amazon Kinesis Producer Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-producers-with-kpl.html) （具有進階彙總邏輯）、多記錄操作 PutRecords [PutRecord](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html)或將記錄彙總到較大的檔案中。 [PutRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) 如果您無法批次處理/緩衝，請使用多個執行緒同時寫入 Kinesis Data Streams 服務。 適用於 Java 的 AWS SDK 和其他 SDKs 包含可在極少程式碼下執行此操作的非同步用戶端。

**小型生產者**

小型生產者通常是行動應用程式、IoT 裝置或 web 用戶端。如果是行動應用程式，建議您在 AWS Mobile SDKs 中使用 `PutRecords`操作或 Kinesis Recorder。如需詳細資訊，請參閱 適用於 Android 的 AWS Mobile SDK 入門指南和 AWS Mobile SDK for iOS 入門指南。行動應用程式必須處理自身固有的斷續連線問題，而且需要某一類的批次 put 操作如 `PutRecords`。若您由於某些因素無法批次處理，請參閱上述「大型生產者」一節的資訊。如果您的生產者是瀏覽器，產生的資料量通常很少。不過，這樣是將 *put* 操作放在了應用程式的重要路徑上，而此做法並不建議。

### 操作濫用 `flushSync()`
<a name="misuse-tag"></a>

`flushSync()` 不正確地使用 可能會大幅影響寫入效能。`flushSync()` 操作專為關閉案例而設計，以確保在 KPL 應用程式終止之前傳送所有緩衝記錄。如果您在每次寫入操作後實作此操作，可能會增加大量額外延遲，每次寫入大約 500 毫秒。請確定您已`flushSync()`針對應用程式關閉實作 ，以避免寫入效能不必要的額外延遲。

## 我收到未經授權的 KMS 主金鑰許可錯誤
<a name="unauthorized-kms-producer"></a>

若生產者應用程式寫入已加密的串流但未具備 KMS 主金鑰的許可，便會發生此錯誤。若要為應用程式指派許可使其能夠存取 KMS 金鑰，請參閱[在 AWS KMS 中使用金鑰政策](https://docs.aws.amazon.com/kms/latest/developerguide/key-policies.html)及[搭配 AWS KMS 使用 IAM 政策](https://docs.aws.amazon.com/kms/latest/developerguide/iam-policies.html)。

## 對生產者的其他常見問題進行故障診斷
<a name="misc-troubleshooting-producer"></a>
+ [為何我的 Kinesis 資料串流會傳回 500 個內部伺服器錯誤？](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-stream-500-error/)
+ [如何對從 Flink 寫入 Kinesis Data Streams 時發生的逾時錯誤進行故障診斷？](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-flink-timeout/)
+ [如何疑難排解 Kinesis Data Streams 中的限流錯誤？](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-stream-throttling-errors/)
+ [為何我的 Kinesis 資料串流會限流？](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-stream-throttling/)
+ [如何使用 KPL 將資料記錄放入 Kinesis 資料串流中？](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-stream-kpl/)

# 最佳化 Kinesis Data Streams 生產者
<a name="advanced-producers"></a>

您可以根據您看到的特定行為，進一步最佳化 Amazon Kinesis Data Streams 生產者。檢閱下列主題以識別解決方案。

**Topics**
+ [自訂 KPL 重試和速率限制行為](kinesis-producer-adv-retries-rate-limiting.md)
+ [將最佳實務套用至 KPL 彙總](kinesis-producer-adv-aggregation.md)

# 自訂 KPL 重試和速率限制行為
<a name="kinesis-producer-adv-retries-rate-limiting"></a>

當您使用 KPL 操作新增 Amazon Kinesis Producer Library (KPL) 使用者記錄時，系統會為記錄提供時間戳記，並新增至具有 `RecordMaxBufferedTime`組態參數所設定截止日期的緩衝區。 `addUserRecord()`此時間戳記/截止日期的組合設定了緩衝區優先順序。記錄將根據下列條件從緩衝區排清：
+ 緩衝區優先順序
+ 彙整組態
+ 收集組態

影響緩衝區行為的彙整和收集組態參數如下：
+ `AggregationMaxCount`
+ `AggregationMaxSize`
+ `CollectionMaxCount`
+ `CollectionMaxSize`

然後，清除的記錄會以 Amazon Kinesis Data Streams 記錄的形式，使用對 Kinesis Data Streams API 操作 `PutRecords` 的呼叫來傳送至 Kinesis 資料串流。`PutRecords` 操作向串流傳送的請求偶爾會完全失敗或局部失敗。失敗的記錄會自動加回到 KPL 緩衝區。將根據以下兩個值當中較小者設定新的截止日期：
+ 目前 `RecordMaxBufferedTime` 組態減半
+ 記錄的存留時間值

此策略使重試的 KPL 使用者記錄得以納入後續 Kinesis Data Streams API 呼叫中，既強制實施了 Kinesis Data Streams 記錄的存留時間值又能提高輸送量並降低複雜性。其間不涉及退避演算法，使得此策略成為相對積極的重試策略。因重試次數過多造成的垃圾郵件會受到速率限制的阻止，相關內容將於下一節談論。

## 速率限制
<a name="kinesis-producer-adv-retries-rate-limiting-rate-limit"></a>

KPL 包含速率限制功能，可限制從單一生產者傳送的每個碎片輸送量。速率限制是使用字符儲存貯體演算法搭配用於 Kinesis Data Streams 記錄和位元組的單獨儲存貯體進行實作。每次成功寫入 Kinesis 資料串流都會將一個字符 (或多個字符) 加入各儲存貯體，最多達到特定閾值。此閾值可供設定，但預設情況下設定的值將比實際碎片限額高出 50%，使得來自單一生產者的碎片能夠達到飽和。

您可降低此限制以減少因重試次數過多造成的垃圾郵件。然而，最佳實務是由每個生產者主動重試最大傳輸量，透過擴展串流容量並實作相應的分割區索引鍵策略來處理判定為過多的任何形成調節情況。

# 將最佳實務套用至 KPL 彙總
<a name="kinesis-producer-adv-aggregation"></a>

雖然產生的 Amazon Kinesis Data Streams 記錄序號方案保持不變， 彙總會導致彙總 Kinesis Data Streams 記錄中包含的 Amazon Kinesis Producer Library (KPL) 使用者記錄索引從 0 （零） 開始； 不過， 只要您不依賴序號來唯一識別您的 KPL 使用者記錄， 您的程式碼可以忽略這一點， 作為彙總 (KPL 使用者記錄的 Kinesis Data Streams 記錄） 和後續的取消彙總 (Kinesis Data Streams 記錄的 Kinesis Data Streams 記錄的 KPL 使用者記錄） 會自動為您處理此問題。無論您的消費者使用的是 KCL 或 AWS SDK，這都適用。若要使用此彙總功能，如果您的取用者是使用 AWS SDK 中提供的 API 進行寫入，則需要將 KPL 的 Java 部分提取到組建中。

若您打算使用序號做為 KPL 使用者記錄的唯一識別符，建議您使用 `Record` 和 `UserRecord` 所提供遵守合約的 `public int hashCode()` 及 `public boolean equals(Object obj)` 操作，對您的 KPL 使用者記錄進行比較。此外，如果想要檢查 KPL 使用者記錄的子序號，您可以將其轉換為 `UserRecord` 執行個體並擷取其子序號。

如需詳細資訊，請參閱[實作消費者取消彙總](kinesis-kpl-consumer-deaggregation.md)。