翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
MSK Connect の EventBridge Kafka シンクコネクタを設定する
このトピックでは、MSK Connect 用の EventBridge Kafka シンクコネクタ
前提条件
コネクタをデプロイする前に、次のリソースがあることを確認してください。
-
Amazon MSK クラスター: Kafka メッセージを生成および消費するためのアクティブな MSK クラスター。
-
Amazon EventBridge イベントバス: Kafka トピックからイベントを受信するための EventBridge イベントバス。
-
IAM ロール: MSK Connect と EventBridge コネクタに必要なアクセス許可を持つ IAM ロールを作成します。
-
MSK Connect または VPC インターフェース エンドポイント から パブリック インターネットへのアクセス (MSK クラスターの VPC およびサブネット内に作成された EventBridge 用)。これにより、NAT ゲートウェイを必要とせずにパブリックインターネットを経由することを回避できます。
-
Amazon EC2 インスタンスや AWS CloudShell
などの クライアントマシン を使用して、トピックを作成し Kafka にレコードを送信します。
MSK Connect に必要なリソースをセットアップする
コネクタの IAM ロールを作成してから、コネクタを作成します。また、EventBridge ルールを作成して、EventBridge イベントバスに送信された Kafka イベントをフィルタリングします。
コネクタの IAM ロール
コネクタに関連付ける IAM ロールには、EventBridge へのイベントの送信を許可する PutEvents アクセス許可が必要です。次の IAM ポリシーの例では、example-event-bus という名前のイベントバスにイベントを送信する許可を付与します。次の例のリソース ARN をイベントバスの ARN に置き換えてください。
さらに、コネクタの IAM ロールに次の信頼ポリシーが含まれていることを確認する必要があります。
受信イベントの EventBridge ルール
受信イベントをイベントパターンと呼ばれるイベントデータ基準と照合するルールを作成します。イベントパターンを使用すると、受信イベントをフィルタリングする基準を定義し、特定のルールをトリガーし、その後指定されたターゲットにルーティングするイベントを決定できます。次のイベントパターンの例は、EventBridge イベントバスに送信された Kafka イベントと一致します。
{ "detail": { "topic": ["msk-eventbridge-tutorial"] } }
以下は、Kafka シンクコネクタを使用して Kafka から EventBridge に送信されるイベントの例です。
{ "version": "0", "id": "dbc1c73a-c51d-0c0e-ca61-ab9278974c57", "account": "123456789012", "time": "2025-03-26T10:15:00Z", "region": "us-east-1", "detail-type": "msk-eventbridge-tutorial", "source": "kafka-connect.msk-eventbridge-tutorial", "resources": [], "detail": { "topic": "msk-eventbridge-tutorial", "partition": 0, "offset": 0, "timestamp": 1742984100000, "timestampType": "CreateTime", "headers": [], "key": "order-1", "value": { "orderItems": [ "item-1", "item-2" ], "orderCreatedTime": "Wed Mar 26 10:15:00 UTC 2025" } } }
EventBridge コンソールで、このパターン例を使用してイベントバスにルールを作成し、CloudWatch Logs グループなどのターゲットを指定します。EventBridge コンソールは、CloudWatch Logs グループに必要なアクセスポリシーを自動的に設定します。
コネクタを作成する
次のセクションでは、 AWS マネジメントコンソールを使用して EventBridge Kafka シンクコネクタ
トピック
ステップ 1: コレクタをダウンロードする
EventBridge Kafka コネクタの GitHub リリースページkafka-eventbridge-sink-with-dependencies.jar を選択してコネクタをダウンロードします。次に、ファイルをマシン上の任意の場所に保存します。
ステップ 1: Amazon S3 バケットを作成する
-
MSK Connect で使用する JAR ファイルを Amazon S3 に保存するには、 を開き AWS マネジメントコンソール、Amazon S3 を選択します。
-
Amazon S3 コンソールで、バケットの作成を選択し、一意のバケット名を入力します。例えば、
amzn-s3-demo-bucket1-eb-connector。 -
Amazon S3 バケットに適したリージョンを選択します。MSK クラスターがデプロイされているリージョンと一致することを確認してください。
-
バケット設定では、デフォルトの選択を保持するか、必要に応じて調整します。
-
[バケットを作成] を選択します。
-
JAR ファイルを Amazon S3 バケットにアップロードします。
ステップ 3: MSK Connect でプラグインを作成する
-
を開き AWS マネジメントコンソール、MSK Connect に移動します。
-
左側のナビゲーションペインで [カスタムプラグイン] を選択します。
-
プラグインの作成を選択し、プラグイン名を入力します。例えば、
eventbridge-sink-plugin。 -
カスタムプラグインの場所の場合は、S3 オブジェクト URL を貼り付けます。
-
プラグインのオプションの説明を追加します。
-
プラグインの作成 を選択します。
プラグインを作成したら、それを使用して MSK Connect で EventBridge Kafka コネクタを設定およびデプロイできます。
ステップ 4: コネクタを作成する
コネクタを作成する前に、コネクタエラーを避けるために必要な Kafka トピックを作成することをお勧めします。トピックを作成するには、クライアントマシンを使用します。
-
MSK コンソールの左側のペインで[コネクタ] を選択し、[コネクタの作成]を選択します。
-
プラグインのリストで、eventbridge-sink-plugin を選択し、Next を選択します。
-
コネクタ名には、
EventBridgeSinkと入力します。 -
クラスターリストから、 お客様の MSK クラスターを選択します。
-
以下のコネクタ設定をコピーし、コネクタ設定フィールドに貼り付けてください
必要に応じて、次の設定のプレースホルダーを置き換えます。
-
MSK クラスターにパブリックインターネットアクセスがある場合は、
aws.eventbridge.endpoint.uriを削除します。 -
PrivateLink を使用して MSK から EventBridge に安全に接続する場合は、
https://の後の DNS 部分を、前に作成した EventBridge の (オプション) VPC インターフェイスエンドポイントの正しいプライベート DNS 名に置き換えます。 -
次の設定の EventBridge イベントバス ARN をイベントバスの ARN に置き換えます。
-
リージョン固有の値を更新します。
{ "connector.class": "software.amazon.event.kafkaconnector.EventBridgeSinkConnector", "aws.eventbridge.connector.id": "msk-eventbridge-tutorial", "topics": "msk-eventbridge-tutorial", "tasks.max": "1", "aws.eventbridge.endpoint.uri": "https://events.us-east-1.amazonaws.com", "aws.eventbridge.eventbus.arn": "arn:aws:events:us-east-1:123456789012:event-bus/example-event-bus", "value.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "aws.eventbridge.region": "us-east-1", "auto.offset.reset": "earliest", "key.converter": "org.apache.kafka.connect.storage.StringConverter" }コネクタ設定の詳細については、「eventbridge-kafka-connector
」を参照してください。 必要に応じて、ワーカーと自動スケーリングの設定を変更します。また、ドロップダウンから利用可能な最新の (推奨) Apache Kafka Connect バージョンを使用することをお勧めします。アクセス許可で、前に作成したロールを使用します。また、オブザーバビリティとトラブルシューティングのために CloudWatch へのログ記録を有効にすることをお勧めします。必要に応じて、タグなどの他のオプション設定を調整します。次に、コネクタをデプロイし、ステータスが実行中状態になるまで待ちます。
-
Kafka にメッセージを送信する
Apache Avro や JSON などのメッセージエンコーディングを設定するには、 value.converter と、オプションで Kafka Connect で使用できるkey.converter設定を使用して、さまざまなコンバーターを指定します。
このトピックの connector example は、org.apache.kafka.connect.json.JsonConverter の使用で示されているように、value converter 用の JSON エンコードされたメッセージで動作するように設定されています。コネクタが実行中状態になったら、クライアントマシンから msk-eventbridge-tutorial Kafka トピックにレコードを送信します。