本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
步骤 4:在 Amazon MSK 集群中创建主题
在开始使用 Amazon MSK 的此步骤中,您需在客户端计算机上安装 Apache Kafka 客户端库和工具,然后创建主题。
警告
本教程中使用的 Apache Kafka 版本号仅为示例。建议您使用与 MSK 集群版本相同的客户端版本。较旧的客户端版本可能缺少某些功能和严重的错误修复。
确定您的 MSK 集群版本
在 https://console.aws.amazon.com/msk/
打开 Amazon MSK 控制台。 -
在导航栏中,选择您创建 MSK 集群的区域。
选择 MSK 集群。
请注意集群上所用 Apache Kafka 的版本。
将本教程中的 Amazon MSK 版本号实例替换为在步骤 3 中获得的版本。
在客户端计算机上创建主题
-
Connect 连接到您的客户机器。
打开亚马逊 EC2 控制台,网址为https://console.aws.amazon.com/ec2/
。 -
在导航窗格中,选择 Instances (实例)。然后,选中您在中创建的客户机名称旁边的复选框步骤 3:创建客户端计算机。
-
选择 Actions (操作),然后选择 Connect (连接)。按照控制台中的说明,连接到您的客户端计算机。
-
安装 Java 并设置 Kafka 版本的环境变量。
-
通过运行以下命令在客户端计算机上安装 Java。
sudo yum -y install java-11
-
将 MSK 集群的 Kafka 版本存储在环境变量中
KAFKA_VERSION
,如以下命令所示。在整个设置过程中,你都需要这些信息。export KAFKA_VERSION=
{KAFKA VERSION}
例如,如果您使用的是 3.6.0 版,请使用以下命令。
export KAFKA_VERSION=3.6.0
-
-
下载并解压 Apache Kafka。
-
运行以下命令以下载 Apache Kafka。
wget https://archive.apache.org/dist/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz
例如,如果您的 MSK 集群使用 Apache Kafka 版本 3.6.0,请运行以下命令。
wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz
注意
以下列表列出了一些其他的 Kafka 下载信息,如果你遇到任何问题,你可以使用这些信息。
-
如果您遇到连接问题或想要使用镜像站点,请尝试使用 Apache 镜像选择器,如以下命令所示。
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz
-
直接从 Apache Kafka
网站下载相应的版本。
-
-
在上一步中将 TAR 文件下载到的目录中运行以下命令。
tar -xzf kafka_2.13-$KAFKA_VERSION.tgz
-
将新创建的目录的完整路径存储在
KAFKA_ROOT
环境变量中。export KAFKA_ROOT=$(pwd)/kafka_2.13-$KAFKA_VERSION
-
-
为您的 MSK 集群设置身份验证。
-
在
$KAFKA_ROOT/libs
目录中下载最新版本的 Amazon MSK IAM JAR 文件。使用以下命令下载文件,并{LATEST VERSION}
替换为实际版本号。cd $KAFKA_ROOT/libs wget https://github.com/aws/aws-msk-iam-auth/releases/latest/download/aws-msk-iam-auth-
{LATEST VERSION}
-all.jarAmazon MSK IAM JAR 文件允许您的客户端计算机使用 IAM 身份验证访问 MSK 集群。
注意
在运行任何与你的 MSK 集群交互的 Kafka 命令之前,你可能需要将 Amazon MSK IAM JAR 文件添加到你的 Java 类路径中。设置
CLASSPATH
环境变量,如以下示例所示。export CLASSPATH=$KAFKA_ROOT/libs/aws-msk-iam-auth-
{LATEST VERSION}
-all.jar这将
CLASSPATH
为整个会话设置,从而使 JAR 可用于所有后续的 Kafka 命令。 -
转到
$KAFKA_ROOT/config
目录创建客户机配置文件。cd $KAFKA_ROOT/config
-
复制以下属性设置并将其粘贴到新文件中。将该文件保存为
client.properties
。security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
-
-
(可选)如果您遇到任何与内存相关的问题,或者正在处理大量的主题或分区,则可以调整 Kafka 工具的 Java 堆大小。为此,请在运行 Kafka 命令之前设置
KAFKA_HEAP_OPTS
环境变量。以下示例将最大堆大小和初始堆大小都设置为 512 兆字节。根据您的具体要求和可用系统资源调整这些值。
export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
-
获取您的集群连接信息。
在 https://console.aws.amazon.com/msk/
打开 Amazon MSK 控制台。 -
等待集群的状态变为活动。这可能需要花几分钟的时间。在状态变为活动后,选择集群名称。这会将您引导至包含集群摘要的页面。
-
选择查看客户端信息。
-
复制私有端点的连接字符串。
您将为每个经纪人获得三个端点。将其中一个连接字符串存储在环境变量中
BOOTSTRAP_SERVER
,如以下命令所示。<bootstrap-server-string>
替换为连接字符串的实际值。export BOOTSTRAP_SERVER=
<bootstrap-server-string>
-
运行以下命令来创建主题。
$KAFKA_ROOT/bin/kafka-topics.sh --create --bootstrap-server $BOOTSTRAP_SERVER --command-config $KAFKA_ROOT/config/client.properties --replication-factor 3 --partitions 1 --topic MSKTutorialTopic
如果文件
NoSuchFileException
为 a,请确保该client.properties
文件存在于 Kafka bin 目录中的当前工作目录中。注意
如果您不想为整个会话设置
CLASSPATH
环境变量,也可以将该CLASSPATH
变量作为每个 Kafka 命令的前缀。这种方法仅将类路径应用于该特定命令。CLASSPATH=$KAFKA_ROOT/libs/aws-msk-iam-auth-
{LATEST VERSION}
-all.jar \ $KAFKA_ROOT/bin/kafka-topics.sh --create \ --bootstrap-server $BOOTSTRAP_SERVER \ --command-config $KAFKA_ROOT/config/client.properties \ --replication-factor 3 \ --partitions 1 \ --topic MSKTutorialTopic -
(可选)验证主题是否已成功创建。
-
如果命令成功,您应该会看到以下消息:
Created topic MSKTutorialTopic.
-
列出所有主题以确认您的主题存在。
$KAFKA_ROOT/bin/kafka-topics.sh --list --bootstrap-server $BOOTSTRAP_SERVER --command-config $KAFKA_ROOT/config/client.properties
如果命令失败或遇到错误,请参见排查 Amazon MSK 集群的问题以获取故障排除信息。
-
-
(可选)如果您想在本教程的后续步骤中保留环境变量,请跳过此步骤。否则,您可以取消设置变量,如以下示例所示。
unset KAFKA_VERSION KAFKA_ROOT BOOTSTRAP_SERVER CLASSPATH KAFKA_HEAP_OPTS
下一步