步驟 4:在 Amazon MSK 叢集中建立主題 - Amazon Managed Streaming for Apache Kafka

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

步驟 4:在 Amazon MSK 叢集中建立主題

開始使用 Amazon MSK 的這個步驟中,您可以在用戶端機器上安裝 Apache Kafka 用戶端程式庫和工具,然後建立主題。

警告

本教學課程中使用的 Apache Kafka 版本號僅為示例。我們建議您使用與 MSK 叢集版本相同的用戶端版本。較舊的用戶端版本可能缺少某些功能和重要的錯誤修正。

判斷您的 MSK 叢集版本

  1. 開啟位於 https://console.aws.amazon.com/msk/ 的 Amazon MSK 主控台。

  2. 在導覽列中,選擇您建立 MSK 叢集的區域。

  3. 選擇 MSK 叢集。

  4. 請記住叢集上使用的 Apache Kafka 版本。

  5. 使用步驟 3 中取得的版本取代本教學課程中 Amazon MSK 版本號碼的執行個體。

在用戶端機器上建立主題

  1. 連線至您的用戶端機器。

    1. 前往 https://console.aws.amazon.com/ec2/ 開啟 Amazon EC2 主控台。

    2. 在導覽窗格中,選擇執行個體。然後,選取您在 中建立的用戶端機器名稱旁的核取方塊步驟 3:建立用戶端機器

    3. 選擇動作,然後選擇連線。遵循主控台的指示操作,連線至您的用戶端機器。

  2. 安裝 Java 並設定 Kafka 版本環境變數。

    1. 執行下列命令,在用戶端電腦上安裝 Java。

      sudo yum -y install java-11
    2. 將 MSK 叢集的 Kafka 版本存放在環境變數 中KAFKA_VERSION,如下列命令所示。在整個設定過程中,您將需要此資訊。

      export KAFKA_VERSION={KAFKA VERSION}

      例如,如果您使用的是 3.6.0 版,請使用下列命令。

      export KAFKA_VERSION=3.6.0
  3. 下載並擷取 Apache Kafka。

    1. 執行下列命令下載 Apache Kafka。

      wget https://archive.apache.org/dist/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz

      例如,如果您的 MSK 叢集使用 Apache Kafka 3.6.0 版,請執行下列命令。

      wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz
      注意

      以下清單提供一些替代的 Kafka 下載資訊,供您在遇到任何問題時使用。

      • 如果您遇到連線問題或想要使用鏡像網站,請嘗試使用 Apache 鏡像選取器,如下列命令所示。

        wget https://www.apache.org/dyn/closer.cgi?path=/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz
      • 直接從 Apache Kafka 網站下載適當的版本。

    2. 在您在先前步驟中下載 TAR 檔案的目錄中執行下列命令。

      tar -xzf kafka_2.13-$KAFKA_VERSION.tgz
    3. 將新建立目錄的完整路徑存放在KAFKA_ROOT環境變數內。

      export KAFKA_ROOT=$(pwd)/kafka_2.13-$KAFKA_VERSION
  4. 設定 MSK 叢集的身分驗證。

    1. $KAFKA_ROOT/libs目錄中下載最新版本的 Amazon MSK IAM JAR 檔案。使用下列命令下載 檔案,並以實際版本編號取代 {LATEST VERSION}

      cd $KAFKA_ROOT/libs wget https://github.com/aws/aws-msk-iam-auth/releases/latest/download/aws-msk-iam-auth-{LATEST VERSION}-all.jar

      Amazon MSK IAM JAR 檔案可讓您的用戶端機器使用 IAM 身分驗證存取 MSK 叢集。

      注意

      在執行與 MSK 叢集互動的任何 Kafka 命令之前,您可能需要將 Amazon MSK IAM JAR 檔案新增至 Java classpath。設定CLASSPATH環境變數,如下列範例所示。

      export CLASSPATH=$KAFKA_ROOT/libs/aws-msk-iam-auth-{LATEST VERSION}-all.jar

      這會CLASSPATH為您的整個工作階段設定 ,讓 JAR 可供所有後續的 Kafka 命令使用。

    2. 前往 $KAFKA_ROOT/config目錄以建立用戶端組態檔案。

      cd $KAFKA_ROOT/config
    3. 複製下列屬性設定,並將其貼入新檔案。儲存檔案為 client.properties

      security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
  5. (選用) 如果您遇到任何記憶體相關問題或正在使用大量主題或分割區,您可以調整 Kafka 工具的 Java 堆積大小。若要這樣做,請在執行 Kafka 命令之前設定 KAFKA_HEAP_OPTS 環境變數。

    下列範例會將最大和初始堆積大小設定為 512 MB。根據您的特定需求和可用的系統資源來調整這些值。

    export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
  6. 取得叢集連線資訊。

    1. 開啟位於 https://console.aws.amazon.com/msk/ 的 Amazon MSK 主控台。

    2. 等待叢集的狀態變成作用中。這可能需要幾分鐘的時間。狀態變為作用中之後,選擇叢集名稱。這會帶您前往包含叢集摘要的頁面。

    3. 選擇檢視用戶端資訊

    4. 複製私有端點的連線字串。

      每個代理程式都會有三個端點。將其中一個連線字串存放在環境變數 中BOOTSTRAP_SERVER,如下列命令所示。將 <bootstrap-server-string> 取代為連線字串的實際值。

      export BOOTSTRAP_SERVER=<bootstrap-server-string>
  7. 執行下列命令來建立 主題。

    $KAFKA_ROOT/bin/kafka-topics.sh --create --bootstrap-server $BOOTSTRAP_SERVER --command-config $KAFKA_ROOT/config/client.properties --replication-factor 3 --partitions 1 --topic MSKTutorialTopic

    如果您取得 client.properties 檔案NoSuchFileException的 ,請確定此檔案存在於 Kafka bin 目錄中的目前工作目錄中。

    注意

    如果您不想為整個工作階段設定CLASSPATH環境變數,也可以在每個 Kafka 命令前面加上CLASSPATH變數。此方法只會將 classpath 套用至該特定命令。

    CLASSPATH=$KAFKA_ROOT/libs/aws-msk-iam-auth-{LATEST VERSION}-all.jar \ $KAFKA_ROOT/bin/kafka-topics.sh --create \ --bootstrap-server $BOOTSTRAP_SERVER \ --command-config $KAFKA_ROOT/config/client.properties \ --replication-factor 3 \ --partitions 1 \ --topic MSKTutorialTopic
  8. (選用) 確認主題已成功建立。

    1. 如果命令成功,您應該會看到下列訊息: Created topic MSKTutorialTopic.

    2. 列出所有主題以確認您的主題存在。

      $KAFKA_ROOT/bin/kafka-topics.sh --list --bootstrap-server $BOOTSTRAP_SERVER --command-config $KAFKA_ROOT/config/client.properties

    如果命令失敗或您遇到錯誤,請參閱 對 Amazon MSK 叢集進行故障診斷 以取得故障診斷資訊。

  9. (選用) 如果您想要保留環境變數以進行本教學課程的後續步驟,請略過此步驟。否則,您可以取消設定變數,如下列範例所示。

    unset KAFKA_VERSION KAFKA_ROOT BOOTSTRAP_SERVER CLASSPATH KAFKA_HEAP_OPTS

後續步驟

步驟 5:產生和取用資料