

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# MSK Connect ワーカーを理解する
<a name="msk-connect-workers"></a>

ワーカーは、コネクタロジックを実行する Java 仮想マシン (JVM) プロセスです。各ワーカーは、並列スレッドで実行され、データをコピーする作業を行う一連のタスクを作成します。タスクは状態を保存しないため、復元力のあるスケーラブルな Data Pipeline を提供するために、いつでも開始、停止、または再開できます。スケーリングイベントまたは予期しない障害によるワーカー数の変更は、残りのワーカーによって自動的に検出され、残りのワーカーのセット全体でタスクのバランスを取り直すように調整されます。Connect ワーカーは、Apache Kafka のコンシューマーグループを使用して、調整とリバランスを行います。

コネクタの容量要件が可変するか、見積もりが難しい場合は、MSK Connect に、指定した下限と上限の間で必要に応じてワーカー数をスケールさせることができます。または、コネクタロジックを実行するワーカーの正確な数を指定することもできます。詳細については、「[コネクタ容量を理解する](msk-connect-capacity.md)」を参照してください。

**MSK Connect ワーカーによる IP アドレスの消費**  
MSK Connect ワーカーは、カスタマー提供のサブネット内の IP アドレスを消費します。各ワーカーは、カスタマー提供のサブネットの 1 つに含まれている 1 つの IP アドレスを使用します。CreateConnector リクエストに応答して提供されたサブネットに使用可能な IP アドレスが十分にあり、指定した容量を満たしていることを確認する必要があります。特に、ワーカー数が変動する可能性のあるコネクタを自動スケーリングする場合は注意が必要です。

## デフォルトのワーカー構成
<a name="msk-connect-default-worker-config"></a>

MSK Connect は、次のデフォルトのワーカー構成を提供します。

```
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
```

# サポートされているワーカー設定プロパティ
<a name="msk-connect-supported-worker-config-properties"></a>

MSK Connect は、デフォルトのワーカー構成を提供します。コネクタで使用するカスタムワーカー設定を作成することもできます。次のリストには、Amazon MSK Connect がサポートする、またはサポートしないワーカー設定プロパティに関する情報が含まれています。
+ `key.converter` プロパティと `value.converter` プロパティが必要です。
+ MSK Connect は、次の `producer.` 設定プロパティをサポートしています。

  ```
  producer.acks
  producer.batch.size
  producer.buffer.memory
  producer.compression.type
  producer.enable.idempotence
  producer.key.serializer
  producer.linger.ms
  producer.max.request.size
  producer.metadata.max.age.ms
  producer.metadata.max.idle.ms
  producer.partitioner.class
  producer.reconnect.backoff.max.ms
  producer.reconnect.backoff.ms
  producer.request.timeout.ms
  producer.retry.backoff.ms
  producer.value.serializer
  ```
+ MSK Connect は、次の `consumer.` 設定プロパティをサポートしています。

  ```
  consumer.allow.auto.create.topics
  consumer.auto.offset.reset
  consumer.check.crcs
  consumer.fetch.max.bytes
  consumer.fetch.max.wait.ms
  consumer.fetch.min.bytes
  consumer.heartbeat.interval.ms
  consumer.key.deserializer
  consumer.max.partition.fetch.bytes
  consumer.max.poll.interval.ms
  consumer.max.poll.records
  consumer.metadata.max.age.ms
  consumer.partition.assignment.strategy
  consumer.reconnect.backoff.max.ms
  consumer.reconnect.backoff.ms
  consumer.request.timeout.ms
  consumer.retry.backoff.ms
  consumer.session.timeout.ms
  consumer.value.deserializer
  ```
+ 次のプロパティを除いて、`producer.` または `consumer.` プレフィックスでスタートしないすべての設定プロパティが許可されます。

  ```
  access.control.
  admin.
  admin.listeners.https.
  client.
  connect.
  inter.worker.
  internal.
  listeners.https.
  metrics.
  metrics.context.
  rest.
  sasl.
  security.
  socket.
  ssl.
  topic.tracking.
  worker.
  bootstrap.servers
  config.storage.topic
  connections.max.idle.ms
  connector.client.config.override.policy
  group.id
  listeners
  metric.reporters
  plugin.path
  receive.buffer.bytes
  response.http.headers.config
  scheduled.rebalance.max.delay.ms
  send.buffer.bytes
  status.storage.topic
  ```

ワーカー設定プロパティとその表現については、Apache Kafka ドキュメントの「[Kafka Connect Config](https://kafka.apache.org/documentation/#connectconfigs)」を参照してください。

# カスタムワーカー設定の作成
<a name="msk-connect-create-custom-worker-config"></a>

この手順では、 AWS マネジメントコンソールを使用してカスタムワーカー設定を作成する方法について説明します。

**を使用したカスタムワーカー設定の作成 AWS マネジメントコンソール**

1. [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk/) で Amazon MSK コンソールを開きます。

1. 左側のペインの **MSK Connect** で、**Worker configurations** (ワーカー構成) を選択します。

1. **Create worker configuration** (ワーカー構成の作成) を選択します。

1. 名前とオプションの説明を入力し、設定するプロパティと値を追加します。

1. **Create worker configuration** (ワーカー構成の作成) を選択します。

MSK Connect API を使用してワーカー構成を作成するには、[CreateWorkerConfiguration](https://docs.aws.amazon.com/MSKC/latest/mskc/API_CreateWorkerConfiguration.html)を参照してください。

# `offset.storage.topic` を使用してソースコネクタオフセットを管理する
<a name="msk-connect-manage-connector-offsets"></a>

このセクションでは、「オフセットストレージトピック」を使用してソースコネクタオフセットを管理するのに役立つ情報を提供します。オフセットストレージトピックは、Kafka Connect がコネクタとタスク設定のオフセットを保存するために使用する内部トピックです。

## 考慮事項
<a name="msk-connect-manage-connector-offsets-considerations"></a>

ソースコネクタオフセットを管理するときは、次の点を考慮してください。
+ オフセットストレージトピックを指定するには、ワーカー設定の `offset.storage.topic` の値として、コネクタオフセットが保存される Kafka トピックの名前を指定します。
+ コネクタ設定を変更する場合は注意が必要です。ソースコネクタがキーオフセットレコードに対して設定の値を使用している場合、設定値を変更すると、コネクタが意図しない動作をする可能性があります。プラグインのドキュメントを参照することをお勧めします。
+ **デフォルトのパーティション数のカスタマイズ** — `offset.storage.topic` の追加によるワーカー設定のカスタマイズに加えて、オフセットとステータスストレージのトピックのパーティション数をカスタマイズできます。内部トピックのデフォルトパーティションは次のとおりです。
  + `config.storage.topic`: 1、設定不可、単一パーティションのトピックである必要がある
  + `offset.storage.topic`: 25、`offset.storage.partitions` を指定することで設定可能
  + `status.storage.topic`: 5、`status.storage.partitions` を指定することで設定可能
+ **トピックの手動削除** — Amazon MSK Connect は、コネクタをデプロイするたびに新しい Kafka Connect 内部トピック (トピック名が `__amazon_msk_connect` で始まる) を作成します。`offset.storage.topic` などの内部トピックはコネクタ間で再利用される可能性があるため、削除されたコネクタにアタッチされた古いトピックは自動的に削除されません。ただし、MSK Connect によって作成された未使用の内部トピックは手動で削除できます。内部トピックには `__amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id` 形式に従って名前が付けられます。

  正規表現 `__amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id` を使用して内部トピックを削除できます。実行中のコネクタによって現在使用されている内部トピックは削除しないでください。
+ **MSK Connect によって作成された内部トピックに同じ名前を使用する** — オフセットストレージトピックを再利用して以前に作成したコネクタのオフセットを利用する場合は、新しいコネクタに古いコネクタと同じ名前を付ける必要があります。ワーカー設定を使用して `offset.storage.topic` プロパティを設定し、`offset.storage.topic` に同じ名前を割り当て、異なるコネクタ間で再利用することができます。この設定については、「[コネクタオフセットを管理する](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-create-custom-worker-config)」で説明しています。MSK Connect では、異なるコネクタが `config.storage.topic` と `status.storage.topic` を共有することはできません。これらのトピックは、MSKC で新しいコネクタを作成するたびに作成されます。`__amazon_msk_connect_<status|configs>_connector_name_connector_id` 形式に従って自動的に名前が付けられるため、作成するコネクタによって名前が異なります。

# デフォルトのオフセットストレージトピックを使用する
<a name="msk-connect-default-offset-storage-topic"></a>

デフォルトでは、Amazon MSK Connect は、作成したコネクタごとに Kafka クラスターに新しいオフセットストレージトピックを生成します。MSK は、コネクタ ARN の一部を使用してデフォルトのトピック名を作成します。例えば、`__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2`。

# カスタムオフセットストレージトピックを使用する
<a name="msk-connect-set-offset-storage-topic"></a>

ソースコネクタ間のオフセットの連続性を提供するために、デフォルトトピックの代わりに任意のオフセットストレージトピックを使用できます。オフセットストレージトピックを指定すると、前のコネクタの最後のオフセットから読み取りを再開するソースコネクタを作成するといったタスクを実行しやすくなります。

オフセットストレージトピックを指定するには、コネクタを作成する前にワーカー設定で `offset.storage.topic` プロパティの値を指定します。オフセットストレージトピックを再利用して以前に作成したコネクタのオフセットを利用する場合は、新しいコネクタに古いコネクタと同じ名前を付ける必要があります。カスタムオフセットストレージトピックを作成する場合は、トピック設定で [https://kafka.apache.org/27/documentation.html#topicconfigs_cleanup.policy](https://kafka.apache.org/27/documentation.html#topicconfigs_cleanup.policy) を `compact` に設定する必要があります。

**注記**  
*シンク*コネクタの作成時にオフセットストレージトピックを指定すると、トピックがまだ存在しない場合は MSK Connect によってそのトピックが作成されます。ただし、このトピックはコネクタオフセットの保存には使用されません。  
代わりに、シンクコネクタオフセットは Kafka コンシューマーグループプロトコルを使用して管理されます。各シンクコネクタは `connect-{CONNECTOR_NAME}` という名前のグループを作成します。コンシューマーグループが存在する限り、同じ `CONNECTOR_NAME` 値で連続して作成されるシンクコネクタは、最後にコミットされたオフセットから継続されます。

**重要**  
オフセットの継続性を維持しながら既存のコネクタ設定を更新する場合は、UpdateConnector API を使用します。詳細については、「[コネクタを更新する](mkc-update-connector.md)」を参照してください。

**Example : ソースコネクタを再作成するときにオフセットストレージトピックを指定する**  
オフセットの継続性を維持しながらコネクタを削除して再作成する必要がある場合は、ワーカー設定でオフセットストレージトピックを指定できます。たとえば、変更データキャプチャ (CDC) コネクタがあり、CDC ストリーム内の場所を失うことなく再作成するとします。次のステップでこのタスクのやり方を説明します。  

1. クライアントマシンで、次のコマンドを実行してコネクタのオフセットストレージトピックの名前を検索します。`<bootstrapBrokerString>` をクラスターのブートストラップブローカー文字列に置き換えます。ブートストラップブローカー文字列を取得する手順については、「[Amazon MSK クラスターのブートストラップブローカーを取得する](msk-get-bootstrap-brokers.md)」を参照してください。

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --list --bootstrap-server <bootstrapBrokerString>
   ```

   次の出力は、デフォルトの内部コネクタトピックを含むすべてのクラスタートピックのリストを示しています。この例では、既存の CDC コネクタは MSK Connect によって作成された[デフォルトのオフセットストレージトピック](msk-connect-default-offset-storage-topic.md)を使用します。オフセットストレージトピックが `__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2` と呼ばれるのはこれが理由です。

   ```
   __consumer_offsets
   __amazon_msk_canary
   __amazon_msk_connect_configs_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   __amazon_msk_connect_status_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   my-msk-topic-1
   my-msk-topic-2
   ```

1. [https://console.aws.amazon.com/msk/](https://console.aws.amazon.com/msk) で Amazon MSK コンソールを開きます。

1. **[コネクタ]** リストからコネクタを選択します。**[コネクタ設定]** フィールドの内容をコピーして保存し、内容を変更して新しいコネクタを作成できるようにします。

1. コネクタを削除するには、**[削除]** を選択します。テキスト入力フィールドにコネクタ名を入力して、削除を確定します。

1. 実際のシナリオに合った値を使用してカスタムワーカー設定を作成します。手順については、「[カスタムワーカー設定の作成](msk-connect-create-custom-worker-config.md)」を参照してください。

   ワーカー設定では、以下の設定のように、以前に `offset.storage.topic` の値として取得したオフセットストレージトピックの名前を指定する必要があります。

   ```
   config.providers.secretManager.param.aws.region=eu-west-3
   key.converter=<org.apache.kafka.connect.storage.StringConverter>
   value.converter=<org.apache.kafka.connect.storage.StringConverter>
   config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
   config.providers=secretManager
   offset.storage.topic=__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
   ```

1. 
**重要**  
新しいコネクタには古いコネクタと同じ名前を付ける必要があります。

   前のステップで設定したワーカー設定を使用して、新しいコネクタを作成します。手順については、「[コネクタを作成する](mkc-create-connector-intro.md)」を参照してください。