步骤 4:在 Amazon MSK 集群中创建主题 - Amazon Managed Streaming for Apache Kafka

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

步骤 4:在 Amazon MSK 集群中创建主题

开始使用 Amazon MSK 的此步骤中,您需在客户端计算机上安装 Apache Kafka 客户端库和工具,然后创建主题。

警告

本教程中使用的 Apache Kafka 版本号仅为示例。建议您使用与 MSK 集群版本相同的客户端版本。较旧的客户端版本可能缺少某些功能和严重的错误修复。

确定您的 MSK 集群版本

  1. https://console.aws.amazon.com/msk/ 打开 Amazon MSK 控制台。

  2. 在导航栏中,选择您创建 MSK 集群的区域。

  3. 选择 MSK 集群。

  4. 请注意集群上所用 Apache Kafka 的版本。

  5. 将本教程中的 Amazon MSK 版本号实例替换为在步骤 3 中获得的版本。

在客户端计算机上创建主题

  1. Connect 连接到您的客户机器。

    1. 打开亚马逊 EC2 控制台,网址为https://console.aws.amazon.com/ec2/

    2. 在导航窗格中,选择 Instances (实例)。然后,选中您在中创建的客户机名称旁边的复选框步骤 3:创建客户端计算机

    3. 选择 Actions (操作),然后选择 Connect (连接)。按照控制台中的说明,连接到您的客户端计算机。

  2. 安装 Java 并设置 Kafka 版本的环境变量。

    1. 通过运行以下命令在客户端计算机上安装 Java。

      sudo yum -y install java-11
    2. 将 MSK 集群的 Kafka 版本存储在环境变量中KAFKA_VERSION,如以下命令所示。在整个设置过程中,你都需要这些信息。

      export KAFKA_VERSION={KAFKA VERSION}

      例如,如果您使用的是 3.6.0 版,请使用以下命令。

      export KAFKA_VERSION=3.6.0
  3. 下载并解压 Apache Kafka。

    1. 运行以下命令以下载 Apache Kafka。

      wget https://archive.apache.org/dist/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz

      例如,如果您的 MSK 集群使用 Apache Kafka 版本 3.6.0,请运行以下命令。

      wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz
      注意

      以下列表列出了一些其他的 Kafka 下载信息,如果你遇到任何问题,你可以使用这些信息。

      • 如果您遇到连接问题或想要使用镜像站点,请尝试使用 Apache 镜像选择器,如以下命令所示。

        wget https://www.apache.org/dyn/closer.cgi?path=/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz
      • 直接从 Apache Kafka 网站下载相应的版本。

    2. 在上一步中将 TAR 文件下载到的目录中运行以下命令。

      tar -xzf kafka_2.13-$KAFKA_VERSION.tgz
    3. 将新创建的目录的完整路径存储在KAFKA_ROOT环境变量中。

      export KAFKA_ROOT=$(pwd)/kafka_2.13-$KAFKA_VERSION
  4. 为您的 MSK 集群设置身份验证。

    1. $KAFKA_ROOT/libs目录中下载最新版本的 Amazon MSK IAM JAR 文件。使用以下命令下载文件,并{LATEST VERSION}替换为实际版本号。

      cd $KAFKA_ROOT/libs wget https://github.com/aws/aws-msk-iam-auth/releases/latest/download/aws-msk-iam-auth-{LATEST VERSION}-all.jar

      Amazon MSK IAM JAR 文件允许您的客户端计算机使用 IAM 身份验证访问 MSK 集群。

      注意

      在运行任何与你的 MSK 集群交互的 Kafka 命令之前,你可能需要将 Amazon MSK IAM JAR 文件添加到你的 Java 类路径中。设置CLASSPATH环境变量,如以下示例所示。

      export CLASSPATH=$KAFKA_ROOT/libs/aws-msk-iam-auth-{LATEST VERSION}-all.jar

      这将CLASSPATH为整个会话设置,从而使 JAR 可用于所有后续的 Kafka 命令。

    2. 转到$KAFKA_ROOT/config目录创建客户机配置文件。

      cd $KAFKA_ROOT/config
    3. 复制以下属性设置并将其粘贴到新文件中。将该文件保存为 client.properties

      security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
  5. (可选)如果您遇到任何与内存相关的问题,或者正在处理大量的主题或分区,则可以调整 Kafka 工具的 Java 堆大小。为此,请在运行 Kafka 命令之前设置KAFKA_HEAP_OPTS环境变量。

    以下示例将最大堆大小和初始堆大小都设置为 512 兆字节。根据您的具体要求和可用系统资源调整这些值。

    export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
  6. 获取您的集群连接信息。

    1. https://console.aws.amazon.com/msk/ 打开 Amazon MSK 控制台。

    2. 等待集群的状态变为活动。这可能需要花几分钟的时间。在状态变为活动后,选择集群名称。这会将您引导至包含集群摘要的页面。

    3. 选择查看客户端信息

    4. 复制私有端点的连接字符串。

      您将为每个经纪人获得三个端点。将其中一个连接字符串存储在环境变量中BOOTSTRAP_SERVER,如以下命令所示。<bootstrap-server-string>替换为连接字符串的实际值。

      export BOOTSTRAP_SERVER=<bootstrap-server-string>
  7. 运行以下命令来创建主题。

    $KAFKA_ROOT/bin/kafka-topics.sh --create --bootstrap-server $BOOTSTRAP_SERVER --command-config $KAFKA_ROOT/config/client.properties --replication-factor 3 --partitions 1 --topic MSKTutorialTopic

    如果文件NoSuchFileException为 a,请确保该client.properties文件存在于 Kafka bin 目录中的当前工作目录中。

    注意

    如果您不想为整个会话设置CLASSPATH环境变量,也可以将该CLASSPATH变量作为每个 Kafka 命令的前缀。这种方法仅将类路径应用于该特定命令。

    CLASSPATH=$KAFKA_ROOT/libs/aws-msk-iam-auth-{LATEST VERSION}-all.jar \ $KAFKA_ROOT/bin/kafka-topics.sh --create \ --bootstrap-server $BOOTSTRAP_SERVER \ --command-config $KAFKA_ROOT/config/client.properties \ --replication-factor 3 \ --partitions 1 \ --topic MSKTutorialTopic
  8. (可选)验证主题是否已成功创建。

    1. 如果命令成功,您应该会看到以下消息:Created topic MSKTutorialTopic.

    2. 列出所有主题以确认您的主题存在。

      $KAFKA_ROOT/bin/kafka-topics.sh --list --bootstrap-server $BOOTSTRAP_SERVER --command-config $KAFKA_ROOT/config/client.properties

    如果命令失败或遇到错误,请参见排查 Amazon MSK 集群的问题以获取故障排除信息。

  9. (可选)如果您想在本教程的后续步骤中保留环境变量,请跳过此步骤。否则,您可以取消设置变量,如以下示例所示。

    unset KAFKA_VERSION KAFKA_ROOT BOOTSTRAP_SERVER CLASSPATH KAFKA_HEAP_OPTS

下一步

步骤 5:生成和使用数据