

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 在 Python 中開發 Kinesis Client Library 取用者
<a name="kinesis-record-processor-implementation-app-py"></a>

**重要**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們**強烈建議**您使用 1.x 版將 KCL 應用程式遷移至 2026 年 1 月 30 日之前的最新 KCL 版本。若要尋找最新的 KCL 版本，請參閱 [ GitHub 上的 Amazon Kinesis Client Library 頁面](https://github.com/awslabs/amazon-kinesis-client)。如需最新 KCL 版本的資訊，請參閱 [使用 Kinesis 用戶端程式庫](kcl.md)。如需從 KCL 1.x 遷移至 KCL 3.x 的資訊，請參閱 [從 KCL 1.x 移轉到 KCL 3.x](kcl-migration-1-3.md)。

您可以使用 Kinesis Client Library (KCL) 建置應用程式，處理來自 Kinesis 資料串流的資料。Kinesis Client Library 支援多種語言。本主題將討論 Python。

KCL 是一種 Java 程式庫，使用稱為 *MultiLangDaemon* 的多語言介面提供對 Java 以外語言的支援。此常駐程式是以 Java 為基礎，並在您使用 Java 以外的 KCL 語言時在背景執行。因此，若您安裝了適用於 Python 的 KCL 並完全以 Python 撰寫取用者應用程式，則由於 MultiLangDaemon 的緣故，您的系統仍需要安裝 Java。此外，MultiLangDaemon 有一些預設設定，您可能需要針對您的使用案例進行自訂，例如其連線 AWS 的區域。如需 MultiLangDaemon 的詳細資訊，請前往 GitHub 上的 [KCL MultiLangDaemon 專案](https://github.com/awslabs/amazon-kinesis-client/tree/v1.x/src/main/java/com/amazonaws/services/kinesis/multilang)頁面。

若要從 GitHub 下載 Python KCL，請前往 [Kinesis Client Library (Python)](https://github.com/awslabs/amazon-kinesis-client-python)。如需下載 Python KCL 取用者應用程式的範本程式碼，請至 GitHub 前[往適用於 Python 的 KCL 範例專案](https://github.com/awslabs/amazon-kinesis-client-python/tree/master/samples)頁面。

以 Python 實作 KCL 取用者應用程式時，您必須完成以下任務：

**Topics**
+ [實作 RecordProcessor 類別方法](#kinesis-record-processor-implementation-interface-py)
+ [修改組態屬性](#kinesis-record-processor-initialization-py)

## 實作 RecordProcessor 類別方法
<a name="kinesis-record-processor-implementation-interface-py"></a>

`RecordProcess` 類別必須擴充 `RecordProcessorBase` 以實作下列方法。範例提供的實作可讓您用於做為起點 (請參閱 `sample_kclpy_app.py`)。

```
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
```

**initialize**  
KCL 將於記錄處理器執行個體化時呼叫 `initialize` 方法，傳遞特定碎片 ID 作為參數。此記錄處理器只會處理該碎片，且通常反過來說同樣成立 (該碎片僅由此記錄處理器處理)。然而，您的消費者應該考慮到資料記錄可能經過多次處理的情況。這是因為 Kinesis Data Streams 具有*至少一次*的語意，即碎片中的每一筆資料記錄至少會由取用者內的工作者處理一次。如需特定碎片可能由多個工作者處理之各種情況的詳細資訊，請參閱[使用重新分片、擴展和平行處理來變更碎片數量](kinesis-record-processor-scaling.md)。

```
def initialize(self, shard_id)
```

**process\$1records**  
 KCL 會呼叫此方法，傳遞由 `initialize` 方法所指定碎片中之資料記錄的清單。您所實作的記錄處理器根據消費者的語意處理這些記錄中的資料。例如，工作者可能會執行資料轉換，然後將結果存放至 Amazon Simple Storage Service (Amazon S3) 儲存貯體。

```
def process_records(self, records, checkpointer) 
```

除了資料本身外，記錄還包含序號和分割區索引鍵。工作者在處理資料時可使用這些值。例如，工作者可根據分割區索引鍵的值，選擇要存放資料的 S3 儲存貯體。`record` 字典公開了以下的索引鍵值組，可供存取記錄的資料、序號和分割區索引鍵：

```
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
```

請注意，資料為 Base64 編碼。

範例中，`process_records` 方法的程式碼示範了工作者如何能夠存取記錄的資料、序號和分割區索引鍵。

Kinesis Data Streams 需要由記錄處理器追蹤碎片中已經處理過的記錄。KCL 透過將 `Checkpointer` 物件傳遞給 `process_records` 為您進行這項追蹤。記錄處理器將對此物件呼叫 `checkpoint` 方法，以通知 KCL 目前處理碎片中的記錄之進度。如果工作者發生失敗，KCL 將使用此資訊於上一筆已知處理過的記錄處重新啟動碎片處理。

對於分割或合併操作，在原始碎片的處理器呼叫 `checkpoint` 以表示對原始碎片進行所有處理都已完成之前，KCL 不會開始處理新碎片。

如果您未傳遞參數，KCL 將假定對 `checkpoint` 的呼叫表示所有記錄皆已處理，一直處理到傳遞至記錄處理器的最後一筆記錄。因此，記錄處理器應僅在已處理過向其傳遞的清單中之所有記錄後才呼叫 `checkpoint`。記錄處理器不需要在每次呼叫 `checkpoint` 時呼叫 `process_records`。例如，處理器可以每呼叫三次該方法才呼叫一次 `checkpoint`。您可以選擇性指定某筆記錄的確切序號做為 `checkpoint` 的參數。在此情況下，KCL 將假定所有記錄皆已處理，僅止於處理到該記錄。

範例中，私有方法 `checkpoint` 示範了如何利用適當的例外狀況處理和重試邏輯來呼叫 `Checkpointer.checkpoint` 方法。

KCL 倚賴 `process_records` 以處理任何因處理資料記錄而引發的例外狀況。如果 `process_records` 擲回例外狀況，KCL 將略過例外狀況發生前已傳遞至 `process_records` 的資料記錄。也就是說，這些記錄不會重新傳送到擲回例外狀況的記錄處理器或消費者內的任何其他記錄處理器。

**shutdown**  
 KCL 會在處理結束 (關閉原因是 `TERMINATE`) 或工作者不再回應 (關閉 `reason` 為 `ZOMBIE`) 時呼叫 `shutdown` 方法。

```
def shutdown(self, checkpointer, reason)
```

當記錄處理器未能再從碎片接收任何記錄 (因為碎片已進行分割或合併或者串流已刪除) 時，處理即告結束。

 KCL 還會將 `Checkpointer` 物件傳遞給 `shutdown`。如果關閉 `reason` 是 `TERMINATE`，表示記錄處理器應已完成處理任何資料記錄，然後對此界面呼叫 `checkpoint` 方法。

## 修改組態屬性
<a name="kinesis-record-processor-initialization-py"></a>

範例提供了組態屬性的預設值。您可使用自訂值覆寫任何這些屬性 (請參閱 `sample.properties`)。

### Application name (應用程式名稱)
<a name="kinesis-record-processor-application-name-py"></a>

KCL 要求所有應用程式和同一區域內的 Amazon DynamoDB 資料表必須具有獨一無二的應用程式名稱。其使用應用程式名稱組態值的方式如下：
+ 假定所有與此應用程式名稱相關聯的工作者合作處理同一串流。這些工作者可分佈於多個執行個體。如果您以相同應用程式的程式碼執行另一執行個體但使用不同的應用程式名稱，KCL 便會將第二個執行個體視為亦對同一串流進行操作的完全獨立應用程式。
+ KCl 將使用應用程式名稱建立 DynamoDB 資料表並由該資料表維護應用程式的狀態資訊 (例如檢查點及工作者與碎片間對應)。每個應用程式都有其自身的 DynamoDB 資料表。如需詳細資訊，請參閱[使用租用資料表來追蹤 KCL 取用者應用程式處理的碎片](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)。

### 設定登入資料
<a name="kinesis-record-processor-creds-py"></a>

您必須將 AWS 登入資料提供給預設登入資料提供者鏈結中的其中一個登入資料提供者。您可以使用 `AWSCredentialsProvider` 屬性，設定登入資料供應者。[sample.properties](https://github.com/awslabs/amazon-kinesis-client-python/blob/master/samples/sample.properties) 必須向[預設登入資料供應者鏈結](https://docs.aws.amazon.com/sdk-for-java/latest/reference/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)中的某一登入資料供應者提供您的登入資料。如果您在 Amazon EC2 執行個體上執行取用者應用程式，建議您使用 IAM 角色設定執行個體。反映與此 IAM 角色相關聯許可的 AWS 憑證，可透過執行個體中繼資料提供給執行個體上的應用程式。以這種方式管理 EC2 執行個體上執行的消費者應用程式的登入資料最為安全。

範例的屬性檔案設定由 KCL 使用 `sample_kclpy_app.py` 所提供的記錄處理器，處理名為 "words" 的 Kinesis 資料串流。