步驟 4:在 Amazon MSK 叢集中建立主題 - Amazon Managed Streaming for Apache Kafka

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

步驟 4:在 Amazon MSK 叢集中建立主題

開始使用 Amazon MSK 的這個步驟中,您可以使用兩種方法之一建立主題:搭配 CreateTopic API 使用原生 AWS 工具,或在用戶端機器上使用 Apache Kafka AdminClient 工具。

警告

搭配 CreateTopic API 使用 AWS 工具時,請確認您的叢集符合需求。如需詳細資訊,請參閱使用主題 APIs的需求

警告

使用 AdminClient 方法時,本教學中使用的 Apache Kafka 版本編號僅為範例。我們建議您使用與 MSK 叢集版本相同的用戶端版本。較舊的用戶端版本可能缺少某些功能和重要的錯誤修正。

使用 AWS 工具建立主題

您可以使用 CLI、SDK 或 AWS 管理主控台等 AWS 工具,在 MSK AWS 叢集中建立主題。 AWS SDKs 此方法提供簡化的方式來管理主題,而不需要直接存取 Kafka 用戶端工具。

如需使用 AWS 工具建立主題的詳細資訊,請參閱 CreateTopic API 開發人員指南

判斷您的 MSK 叢集版本

  1. 開啟位於 https://console.aws.amazon.com/msk/ 的 Amazon MSK 主控台。

  2. 在導覽列中,選擇您建立 MSK 叢集的區域。

  3. 選擇 MSK 叢集。

  4. 請記住叢集上使用的 Apache Kafka 版本。

  5. 使用步驟 3 中取得的版本取代本教學課程中 Amazon MSK 版本號碼的執行個體。

在用戶端機器上建立主題

  1. 連接至您的用戶端機器。

    1. 前往 https://console.aws.amazon.com/ec2/ 開啟 Amazon EC2 主控台。

    2. 在導覽窗格中,選擇執行個體。然後,選取您在 中建立的用戶端機器名稱旁的核取方塊步驟 3:建立用戶端機器

    3. 選擇動作,然後選擇連線。遵循主控台的指示操作,連線至您的用戶端機器。

  2. 安裝 Java 並設定 Kafka 版本環境變數。

    1. 執行下列命令,在用戶端電腦上安裝 Java。

      sudo yum -y install java-11
    2. 將 MSK 叢集的 Kafka 版本存放在環境變數 中KAFKA_VERSION,如下列命令所示。在整個設定過程中,您將需要此資訊。

      export KAFKA_VERSION={KAFKA VERSION}

      例如,如果您使用的是 3.6.0 版,請使用下列命令。

      export KAFKA_VERSION=3.6.0
  3. 下載並擷取 Apache Kafka。

    1. 執行下列命令下載 Apache Kafka。

      wget https://archive.apache.org/dist/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz
      注意

      以下清單提供一些替代的 Kafka 下載資訊,供您在遇到任何問題時使用。

      • 如果您遇到連線問題或想要使用鏡像網站,請嘗試使用 Apache 鏡像選取器,如下列命令所示。

        wget https://www.apache.org/dyn/closer.cgi?path=/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz
      • 直接從 Apache Kafka 網站下載適當的版本。

    2. 在您在先前步驟中下載 TAR 檔案的目錄中執行下列命令。

      tar -xzf kafka_2.13-$KAFKA_VERSION.tgz
    3. 將新建立目錄的完整路徑存放在KAFKA_ROOT環境變數中。

      export KAFKA_ROOT=$(pwd)/kafka_2.13-$KAFKA_VERSION
  4. 為您的 MSK 叢集設定身分驗證。

    1. 尋找最新版本的 Amazon MSK IAM 用戶端程式庫。此程式庫可讓您的用戶端機器使用 IAM 身分驗證存取 MSK 叢集。

    2. 使用下列命令,導覽至 $KAFKA_ROOT/libs目錄,並下載您在上一個步驟中找到的相關 Amazon MSK IAM JAR。請務必將 {LATEST VERSION} 取代為您下載的實際版本編號。

      cd $KAFKA_ROOT/libs
      wget https://github.com/aws/aws-msk-iam-auth/releases/latest/download/aws-msk-iam-auth-{LATEST VERSION}-all.jar
      注意

      在執行與 MSK 叢集互動的任何 Kafka 命令之前,您可能需要將 Amazon MSK IAM JAR 檔案新增至 Java classpath。設定CLASSPATH環境變數,如下列範例所示。

      export CLASSPATH=$KAFKA_ROOT/libs/aws-msk-iam-auth-{LATEST VERSION}-all.jar

      這會CLASSPATH為您的整個工作階段設定 ,讓 JAR 可供所有後續的 Kafka 命令使用。

    3. 前往 $KAFKA_ROOT/config目錄以建立用戶端組態檔案。

      cd $KAFKA_ROOT/config
    4. 複製下列屬性設定,並將其貼入新檔案。儲存檔案為 client.properties

      security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
  5. (選用) 調整 Kafka 工具的 Java 堆積大小。

    如果您遇到任何記憶體相關問題,或正在使用大量主題或分割區,您可以調整 Java 堆積大小。若要這樣做,請在執行 Kafka 命令之前設定 KAFKA_HEAP_OPTS 環境變數。

    下列範例會將最大和初始堆積大小設定為 512 MB。根據您的特定需求和可用的系統資源來調整這些值。

    export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
  6. 取得叢集連線資訊。

    1. 開啟位於 https://console.aws.amazon.com/msk/ 的 Amazon MSK 主控台。

    2. 等待叢集的狀態變成作用中。這可能需要幾分鐘的時間。狀態變為作用中之後,選擇叢集名稱。這會帶您前往包含叢集摘要的頁面。

    3. 選擇檢視用戶端資訊

    4. 複製私有端點的連線字串。

      每個代理程式都會有三個端點。將其中一個連線字串存放在環境變數 中BOOTSTRAP_SERVER,如下列命令所示。將 <bootstrap-server-string> 取代為連線字串的實際值。

      export BOOTSTRAP_SERVER=<bootstrap-server-string>
  7. 執行下列命令來建立 主題。

    $KAFKA_ROOT/bin/kafka-topics.sh --create --bootstrap-server $BOOTSTRAP_SERVER --command-config $KAFKA_ROOT/config/client.properties --replication-factor 3 --partitions 1 --topic MSKTutorialTopic

    如果您取得 client.properties 檔案NoSuchFileException的 ,請確定此檔案存在於 Kafka bin 目錄中的目前工作目錄中。

    注意

    如果您不想為整個工作階段設定CLASSPATH環境變數,您也可以使用CLASSPATH變數為每個 Kafka 命令加上字首。此方法只會將 classpath 套用至該特定命令。

    CLASSPATH=$KAFKA_ROOT/libs/aws-msk-iam-auth-{LATEST VERSION}-all.jar \ $KAFKA_ROOT/bin/kafka-topics.sh --create \ --bootstrap-server $BOOTSTRAP_SERVER \ --command-config $KAFKA_ROOT/config/client.properties \ --replication-factor 3 \ --partitions 1 \ --topic MSKTutorialTopic
  8. (選用) 確認主題已成功建立。

    1. 如果命令成功,您應該會看到下列訊息: Created topic MSKTutorialTopic.

    2. 列出所有主題,以確認您的主題存在。

      $KAFKA_ROOT/bin/kafka-topics.sh --list --bootstrap-server $BOOTSTRAP_SERVER --command-config $KAFKA_ROOT/config/client.properties

    如果命令失敗或您遇到錯誤,請參閱 對 Amazon MSK 叢集進行故障診斷 以取得故障診斷資訊。

  9. (選用) 刪除您在本教學課程中使用的環境變數。

    如果您想要保留環境變數以進行本教學課程中的後續步驟,請略過此步驟。否則,您可以取消設定這些變數,如下列範例所示。

    unset KAFKA_VERSION KAFKA_ROOT BOOTSTRAP_SERVER CLASSPATH KAFKA_HEAP_OPTS

後續步驟

步驟 5:產生和取用資料