

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# コンシューマーの集約解除を実装する
<a name="kinesis-kpl-consumer-deaggregation"></a>

KCL は、リリース 1.4.0 から KPL ユーザーレコードの自動集計解除をサポートしています。以前のバージョンの KCL で書かれたコンシューマーアプリケーションのコードは、KCL を更新した後、コードを何も修正せずにコンパイルできます。ただし、プロデューサー側で KPL の集約を使用している場合、チェックポイントが多少関係してきます。集約されたレコード内のすべてのサブレコードは同じシーケンス番号を持っているため、サブレコード間の区別が必要な場合、チェックポイントを使用して追加のデータを保存する必要があります。この追加データは、*サブシーケンス番号*と呼ばれます。

**Topics**
+ [以前のバージョンの KCL から移行する](#kinesis-kpl-consumer-deaggregation-migration)
+ [KPL の集約解除のための KCL の拡張を使用する](#kinesis-kpl-consumer-deaggregation-extensions)
+ [GetRecords を直接使用する](#kinesis-kpl-consumer-deaggregation-getrecords)

## 以前のバージョンの KCL から移行する
<a name="kinesis-kpl-consumer-deaggregation-migration"></a>

集約を使用している場合でも、既存のチェックポイント呼び出しを変更する必要はありません。Kinesis Data Streams に保存されているすべてのレコードを正しく取得できることが保証されています。以下で説明する特定のユースケースをサポートするために、現在 KCL には、2 つの新しいチェックポイントオペレーションが用意されています。

既存のコードが KPL サポート以前の KCL 用に書かれていて、チェックポイントオペレーションが引数なしで呼び出される場合、そのコードの動作は、バッチ内にある最後の KPL ユーザーレコードのシーケンス番号に対するチェックポイントの作成と同等です。シーケンス番号文字列を使用してチェックポイントオペレーションを呼び出す場合は、暗黙的なサブシーケンス番号 0 (ゼロ) を伴う、バッチの指定されたシーケンス番号に対するチェックポイントの作成と同等です。

引数なしで新しいKCL チェックポイントオペレーション `checkpoint()` を呼び出すことは、暗黙的なサブシーケンス番号 0 (ゼロ) を伴う、バッチ内の最後の `Record` 呼び出しのシーケンス番号に対するチェックポイントの作成と意味的に同等です。

新しい KCL チェックポイントオペレーション `checkpoint(Record record)` を呼び出すことは、暗黙的なサブシーケンス番号 0 (ゼロ) を伴う、指定された `Record` のシーケンス番号に対するチェックポイントの作成と意味的に同等です。`Record` 呼び出しが実際には `UserRecord` である場合、`UserRecord` のシーケンス番号とサブシーケンス番号にチェックポイントが作成されます。

新しい KCL チェックポイントオペレーション `checkpoint(String sequenceNumber, long subSequenceNumber)` を呼び出すと、指定されたシーケンス番号とサブシーケンス番号に明示的にチェックポイントが作成されます。

いずれの場合も、チェックポイントが Amazon DynamoDB チェックポイントテーブルに保存された後は、アプリケーションがクラッシュして再起動した場合、KCL により、レコードの取得が正常に再開されます。さらにレコードがシーケンス内に含まれている場合は、最後にチェックポイントが作成されたシーケンス番号が付けられているレコード内の次のサブシーケンス番号のレコードから取得が開始されます。前のシーケンス番号のレコードにある最後のサブシーケンス番号が、最新のチェックポイントに含まれている場合、その次のシーケンス番号が付けられているレコードから取得が開始されます。

次のセクションでは、レコードのスキップや重複を避けるために必要な、コンシューマーのシーケンスとサブシーケンスのチェックポイントの詳細について説明します。コンシューマーのレコード処理を停止し再起動するときに、レコードのスキップや重複が重要でない場合は、変更せずに既存のコードを実行してかまいません。

## KPL の集約解除のための KCL の拡張を使用する
<a name="kinesis-kpl-consumer-deaggregation-extensions"></a>

KPL の集約解除ではサブシーケンスチェックポイントを使用できます。サブシーケンスチェックポイントを使いやすくするために、`UserRecord` クラスが KCL に追加されています。

```
public class UserRecord extends Record {     
    public long getSubSequenceNumber() {
    /* ... */
    }      
    @Override 
    public int hashCode() {
    /* contract-satisfying implementation */ 
    }      
    @Override 
    public boolean equals(Object obj) {
    /* contract-satisfying implementation */ 
    } 
}
```

このクラスは、現在 `Record` の代わりに使用されています。これは `Record` のサブクラスであるため、既存のコードは影響を受けません。`UserRecord` クラスは、実際のサブレコードと通常の集約されていないレコードの両方を表します。集約されていないレコードは、サブレコードを 1 つだけ含む集約されたレコードと考えることができます。

さらに、2 つの新しいオペレーションが `IRecordProcessorCheckpointer` に追加されています。

```
public void checkpoint(Record record); 
public void checkpoint(String sequenceNumber, long subSequenceNumber);
```

サブシーケンス番号チェックポイントの使用を開始するには、次の変更を行います。次のフォームコードを変更します。

```
checkpointer.checkpoint(record.getSequenceNumber());
```

新しいフォームコードは次のようになります。

```
checkpointer.checkpoint(record);
```

サブシーケンスチェックポイントでは、`checkpoint(Record record)` フォームを使用することをお勧めします。ただし、チェックポイントの作成で使用する文字列にすでに `sequenceNumbers` を保存している場合は、次の例に示すように、`subSequenceNumber` も保存する必要があります。

```
String sequenceNumber = record.getSequenceNumber(); 
long subSequenceNumber = ((UserRecord) record).getSubSequenceNumber();  // ... do other processing  
checkpointer.checkpoint(sequenceNumber, subSequenceNumber);
```

この実装では `UserRecord` を必ず使用するため、`UserRecord` から `Record` へのキャストは必ず成功します。シーケンス番号の計算を実行する必要がない場合、この方法はお勧めしません。

KPL ユーザーレコードの処理中に、CL は、サブシーケンス番号を Amazon DynamoDB に各行の追加フィールドとして書き込みます。以前のバージョンの KCL では、チェックポイントを再開するときに `AFTER_SEQUENCE_NUMBER` を使用してレコードを取得していました。KPL サポートを含む現在の KCL では、代わりに `AT_SEQUENCE_NUMBER` を使用します。チェックポイントが作成されたシーケンス番号のレコードを取得するとき、チェックポイントが作成されたサブシーケンス番号がチェックされ、サブレコードが必要に応じて削除されます (最後のサブレコードにチェックポイントが作成されている場合、すべてのサブレコードが削除されます)。ここでも、集約されていないレコードは、単一のサブレコードを含む集約されたレコードと考えることができ、集約されたレコードと集約されていないレコードの両方で同じアルゴリズムを使用できます。

## GetRecords を直接使用する
<a name="kinesis-kpl-consumer-deaggregation-getrecords"></a>

KCL の使用を選択せずに、API オペレーション `GetRecords` を直接呼び出して Kinesis Data Streams レコードを取得することもできます。これらの取得したレコードを元の KPL ユーザーレコードに解凍するには、`UserRecord.java` にある次の静的なオペレーションの 1 つを呼び出します。

```
public static List<Record> deaggregate(List<Record> records)

public static List<UserRecord> deaggregate(List<UserRecord> records, BigInteger startingHashKey, BigInteger endingHashKey)
```

最初のオペレーションでは、`startingHashKey` のデフォルト値 `0` (ゼロ) と `endingHashKey` のデフォルト値 `2^128 -1` を使用します。

これらの各オペレーションは、Kinesis Data Streams レコードの指定されたリストを KPL ユーザーレコードのリストに集約解除します。KPL ユーザーレコードの明示的なハッシュキーまたはパーティションキーが `startingHashKey` と `endingHashKey` の範囲 (境界を含む) 外にある場合、これらのユーザーレコードは、返されるレコードのリストから破棄されます。