ステップ 4: Amazon MSK クラスターにトピックを作成する - Amazon Managed Streaming for Apache Kafka

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

ステップ 4: Amazon MSK クラスターにトピックを作成する

Amazon MSK の使用をスタートするのこのステップでは、Apache Kafka クライアントライブラリとツールをクライアントマシンにインストールしてから、トピックを作成します。

警告

このチュートリアルで使用している Apache Kafka のバージョン番号は例にすぎません。MSK クラスターバージョンと同じバージョンのクライアントを使用することが推奨されます。古いクライアントバージョンでは、特定の機能や重大なバグ修正が行われていない可能性があります。

MSK クラスターのバージョンの確認

  1. https://console.aws.amazon.com/msk/ で Amazon MSK コンソールを開きます。

  2. ナビゲーションバーで、MSK クラスターを作成したリージョンを選択します。

  3. MSK クラスターを選択します。

  4. クラスターで使用されている Apache Kafka のバージョンをメモします。

  5. このチュートリアルの Amazon MSK バージョン番号のインスタンスは、ステップ 3 で取得したバージョンに置き換えてください。

クライアントマシンでのトピックの作成

  1. クライアントマシンに接続します。

    1. Amazon EC2 コンソールの https://console.aws.amazon.com/ec2/ を開いてください。

    2. ナビゲーションペインで、[インスタンス] を選択してください。次に、 で作成したクライアントマシンの名前の横にあるチェックボックスをオンにしますステップ 3: クライアントマシンを作成する

    3. [Actions (アクション)] を選択して、[Connect (接続)] を選択します。コンソールの指示に従ってクライアントマシンに接続します。

  2. Java をインストールし、Kafka バージョン環境変数を設定します。

    1. 次のコマンドを実行して、クライアントマシンに Java をインストールします。

      sudo yum -y install java-11
    2. 次のコマンドKAFKA_VERSIONに示すように、MSK クラスターの Kafka バージョンを環境変数 に保存します。この情報は、セットアップ全体で必要になります。

      export KAFKA_VERSION={KAFKA VERSION}

      たとえば、バージョン 3.6.0 を使用している場合は、次のコマンドを使用します。

      export KAFKA_VERSION=3.6.0
  3. Apache Kafka をダウンロードして抽出します。

    1. 次のコマンドを実行して、Apache Kafka をダウンロードします。

      wget https://archive.apache.org/dist/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz
      注記

      次のリストは、問題が発生した場合に使用できる代替 Kafka ダウンロード情報を示しています。

      • 接続の問題が発生した場合、またはミラーサイトを使用する場合は、次のコマンドに示すように、Apache ミラーセレクタを試してください。

        wget https://www.apache.org/dyn/closer.cgi?path=/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz
      • Apache Kafka ウェブサイトから直接適切なバージョンをダウンロードします。

    2. 前のステップで TAR ファイルをダウンロードしたディレクトリで次のコマンドを実行します。

      tar -xzf kafka_2.13-$KAFKA_VERSION.tgz
    3. 新しく作成したディレクトリへのフルパスをKAFKA_ROOT環境変数内に保存します。

      export KAFKA_ROOT=$(pwd)/kafka_2.13-$KAFKA_VERSION
  4. MSK クラスターの認証を設定します。

    1. Amazon MSK IAM クライアントライブラリの最新バージョンを検索します。このライブラリを使用すると、クライアントマシンは IAM 認証を使用して MSK クラスターにアクセスできます。

    2. 次のコマンドを使用して、 $KAFKA_ROOT/libs ディレクトリに移動し、前のステップで見つけた関連する Amazon MSK IAM JAR をダウンロードします。{LATEST VERSION} は、ダウンロードする実際のバージョン番号に置き換えてください。

      cd $KAFKA_ROOT/libs
      wget https://github.com/aws/aws-msk-iam-auth/releases/latest/download/aws-msk-iam-auth-{LATEST VERSION}-all.jar
      注記

      MSK クラスターとやり取りする Kafka コマンドを実行する前に、Java クラスパスに Amazon MSK IAM JAR ファイルを追加する必要がある場合があります。次の例に示すように、 CLASSPATH環境変数を設定します。

      export CLASSPATH=$KAFKA_ROOT/libs/aws-msk-iam-auth-{LATEST VERSION}-all.jar

      これにより、セッション全体の が設定され、後続のすべての Kafka コマンドで JAR を使用CLASSPATHできるようになります。

    3. $KAFKA_ROOT/config ディレクトリに移動して、クライアント設定ファイルを作成します。

      cd $KAFKA_ROOT/config
    4. 次のプロパティ設定をコピーして、新しいファイルに貼り付けます。client.properties という名前でファイルを保存します。

      security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
  5. (オプション) Kafka ツールの Java ヒープサイズを調整します。

    メモリ関連の問題が発生した場合、または多数のトピックやパーティションを使用している場合は、Java ヒープサイズを調整できます。これを行うには、Kafka コマンドを実行する前に KAFKA_HEAP_OPTS環境変数を設定します。

    次の例では、最大ヒープサイズと初期ヒープサイズの両方を 512 メガバイトに設定します。これらの値は、特定の要件と使用可能なシステムリソースに従って調整します。

    export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
  6. クラスター接続情報を取得します。

    1. https://console.aws.amazon.com/msk/ で Amazon MSK コンソールを開きます。

    2. クラスターのステータスが [アクティブ] になるまで待ちます。この処理には数分かかることがあります。ステータスが [アクティブ] になったら、クラスター名を選択します。これにより、そのクラスターの概要を含むページに移動します。

    3. View client information (ライアント情報の表示) を選択します。

    4. プライベートエンドポイントの接続文字列をコピーします。

      ブローカーごとに 3 つのエンドポイントを取得します。次のコマンドに示すようにBOOTSTRAP_SERVER、これらの接続文字列の 1 つを環境変数 に保存します。<bootstrap-server-string> を接続文字列の実際の値に置き換えます。

      export BOOTSTRAP_SERVER=<bootstrap-server-string>
  7. 次のコマンドを実行して、 トピックを作成します。

    $KAFKA_ROOT/bin/kafka-topics.sh --create --bootstrap-server $BOOTSTRAP_SERVER --command-config $KAFKA_ROOT/config/client.properties --replication-factor 3 --partitions 1 --topic MSKTutorialTopic

    NoSuchFileException client.properties ファイルの を取得した場合は、このファイルが Kafka bin ディレクトリ内の現在の作業ディレクトリに存在することを確認してください。

    注記

    セッション全体にCLASSPATH環境変数を設定しない場合は、代わりに各 Kafka コマンドに CLASSPATH変数をプレフィックスできます。このアプローチは、その特定のコマンドにのみクラスパスを適用します。

    CLASSPATH=$KAFKA_ROOT/libs/aws-msk-iam-auth-{LATEST VERSION}-all.jar \ $KAFKA_ROOT/bin/kafka-topics.sh --create \ --bootstrap-server $BOOTSTRAP_SERVER \ --command-config $KAFKA_ROOT/config/client.properties \ --replication-factor 3 \ --partitions 1 \ --topic MSKTutorialTopic
  8. (オプション) トピックが正常に作成されたことを確認します。

    1. コマンドが成功すると、次のメッセージが表示されます。 Created topic MSKTutorialTopic.

    2. すべてのトピックを一覧表示して、トピックが存在することを確認します。

      $KAFKA_ROOT/bin/kafka-topics.sh --list --bootstrap-server $BOOTSTRAP_SERVER --command-config $KAFKA_ROOT/config/client.properties

    コマンドが失敗した場合、またはエラーが発生した場合は、トラブルシューティング情報Amazon MSK クラスターをトラブルシューティングするについて「」を参照してください。

  9. (オプション) このチュートリアルで使用した環境変数を削除します。

    このチュートリアルの次のステップで環境変数を保持する場合は、このステップをスキップします。それ以外の場合は、次の例に示すように、これらの変数の設定を解除できます。

    unset KAFKA_VERSION KAFKA_ROOT BOOTSTRAP_SERVER CLASSPATH KAFKA_HEAP_OPTS

次のステップ

ステップ 5: データを生成および消費する