カスタムオフセットストレージトピックを使用する - Amazon Managed Streaming for Apache Kafka

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

カスタムオフセットストレージトピックを使用する

ソースコネクタ間のオフセットの連続性を提供するために、デフォルトトピックの代わりに任意のオフセットストレージトピックを使用できます。オフセットストレージトピックを指定すると、前のコネクタの最後のオフセットから読み取りを再開するソースコネクタを作成するといったタスクを実行しやすくなります。

オフセットストレージトピックを指定するには、コネクタを作成する前にワーカー設定で offset.storage.topic プロパティの値を指定します。オフセットストレージトピックを再利用して以前に作成したコネクタのオフセットを利用する場合は、新しいコネクタに古いコネクタと同じ名前を付ける必要があります。カスタムオフセットストレージトピックを作成する場合は、トピック設定で cleanup.policycompact に設定する必要があります。

注記

シンクコネクタの作成時にオフセットストレージトピックを指定すると、トピックがまだ存在しない場合は MSK Connect によってそのトピックが作成されます。ただし、このトピックはコネクタオフセットの保存には使用されません。

代わりに、シンクコネクタオフセットは Kafka コンシューマーグループプロトコルを使用して管理されます。各シンクコネクタは connect-{CONNECTOR_NAME} という名前のグループを作成します。コンシューマーグループが存在する限り、同じ CONNECTOR_NAME 値で連続して作成されるシンクコネクタは、最後にコミットされたオフセットから継続されます。

例 : オフセットストレージトピックを指定し、更新された設定を使用してソースコネクタを再作成する

変更データキャプチャ (CDC) コネクタがあり、CDC ストリーム内での位置を見失うことなくコネクタ設定を変更するとします。既存のコネクタ設定を更新することはできませんが、そのコネクタを削除して同じ名前で新しいコネクタ設定を作成することはできます。CDC ストリームのどこから読み取りを開始するかを新しいコネクタに伝えるには、ワーカー設定で古いコネクタのオフセットストレージトピックを指定します。次のステップでこのタスクのやり方を説明します。

  1. クライアントマシンで、次のコマンドを実行してコネクタのオフセットストレージトピックの名前を検索します。<bootstrapBrokerString> をクラスターのブートストラップブローカー文字列に置き換えます。ブートストラップブローカー文字列を取得する手順については、「Amazon MSK クラスターのブートストラップブローカーを取得する」を参照してください。

    <path-to-your-kafka-installation>/bin/kafka-topics.sh --list --bootstrap-server <bootstrapBrokerString>

    次の出力は、デフォルトの内部コネクタトピックを含むすべてのクラスタートピックのリストを示しています。この例では、既存の CDC コネクタは MSK Connect によって作成されたデフォルトのオフセットストレージトピックを使用します。オフセットストレージトピックが __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 と呼ばれるのはこれが理由です。

    __consumer_offsets __amazon_msk_canary __amazon_msk_connect_configs_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_status_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 my-msk-topic-1 my-msk-topic-2
  2. https://console.aws.amazon.com/msk/ で Amazon MSK コンソールを開きます。

  3. [コネクタ] リストからコネクタを選択します。[コネクタ設定] フィールドの内容をコピーして保存し、内容を変更して新しいコネクタを作成できるようにします。

  4. コネクタを削除するには、[削除] を選択します。テキスト入力フィールドにコネクタ名を入力して、削除を確定します。

  5. 実際のシナリオに合った値を使用してカスタムワーカー設定を作成します。手順については、「カスタムワーカー設定の作成」を参照してください。

    ワーカー設定では、以下の設定のように、以前に offset.storage.topic の値として取得したオフセットストレージトピックの名前を指定する必要があります。

    config.providers.secretManager.param.aws.region=eu-west-3 key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager offset.storage.topic=__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
  6. 重要

    新しいコネクタには古いコネクタと同じ名前を付ける必要があります。

    前のステップで設定したワーカー設定を使用して、新しいコネクタを作成します。手順については、「コネクタを作成する」を参照してください。