Step 4: Create a topic in the Amazon MSK cluster - Amazon Managed Streaming for Apache Kafka

Step 4: Create a topic in the Amazon MSK cluster

In this step of Getting Started Using Amazon MSK, you install Apache Kafka client libraries and tools on the client machine, and then you create a topic.

Warning

Apache Kafka version numbers used in this tutorial are examples only. We recommend that you use the same version of the client as your MSK cluster version. An older client version might be missing certain features and critical bug fixes.

To find the version of your MSK cluster
  1. Open the Amazon MSK console at https://console.aws.amazon.com/msk/.

  2. In the navigation bar, choose the Region where you created the MSK cluster.

  3. Choose the MSK cluster.

  4. Note the version of Apache Kafka used on the cluster.

  5. Replace instances of Amazon MSK version numbers in this tutorial with the version obtained in Step 3.

To create a topic on the client machine
  1. Open the Amazon EC2 console at https://console.aws.amazon.com/ec2/.

  2. In the navigation pane, choose Instances. Then select the check box beside the name of the client machine that you created in Step 3: Create a client machine.

  3. Choose Actions, and then choose Connect. Follow the instructions in the console to connect to your client machine.

  4. Install Java on the client machine by running the following command:

    sudo yum -y install java-11
  5. Run the following command to download Apache Kafka.

    wget https://archive.apache.org/dist/kafka/{YOUR MSK VERSION}/kafka_2.13-{YOUR MSK VERSION}.tgz

    For example, if your MSK cluster uses Apache Kafka version 3.5.1, run the following command.

    wget https://archive.apache.org/dist/kafka/3.5.1/kafka_2.13-3.5.1.tgz
    Note

    The following list presents some alternative Kafka download information that you can use, if you encounter any issues.

    • If you want to use a mirror site other than the one used in this command, you can choose a different one on the Apache website.

    • If you encounter connectivity issues, try using the Apache mirror selector, as shown in the following command.

      wget https://www.apache.org/dyn/closer.cgi?path=/kafka/{YOUR MSK VERSION}/kafka_2.13-{YOUR MSK VERSION}.tgz
    • Download an appropriate version directly from the Apache Kafka website: Apache Kafka download.

  6. Run the following command in the directory where you downloaded the TAR file in the previous step.

    tar -xzf kafka_2.13-{YOUR MSK VERSION}.tgz
  7. Go to the kafka_2.13-{YOUR MSK VERSION}/libs directory, then run the following command to download the Amazon MSK IAM JAR file. The Amazon MSK IAM JAR makes it possible for the client machine to access the cluster.

    wget https://github.com/aws/aws-msk-iam-auth/releases/download/v2.3.0/aws-msk-iam-auth-2.3.0-all.jar

    Using this command, you can also download the latest version of aws-msk-iam-auth-*-all.jar.

    Note

    Before running any Kafka commands that interact with your MSK cluster, you must add the AWS MSK IAM JAR file to your Java classpath. This JAR enables IAM authentication between your client and the MSK cluster. Set the CLASSPATH environment variable, as shown in the following example.

    export CLASSPATH=<path-to-your-kafka-installation>/libs/aws-msk-iam-auth-2.3.0-all.jar

    For example, suppose that you downloaded Kafka to the default location.

    export CLASSPATH=/home/ec2-user/kafka_2.13-3.5.1/libs/aws-msk-iam-auth-2.3.0-all.jar

    This sets the CLASSPATH for your entire session, making the JAR available to all subsequent Kafka commands.

  8. Go to the kafka_2.13-{YOUR MSK VERSION}/config directory. Copy the following property settings and paste them into a new file. Name the file client.properties and save it.

    security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
  9. (Optional) If you need to adjust the Java heap size for Kafka tools, you can set the KAFKA_HEAP_OPTS environment variable before running Kafka commands. This is useful if you encounter memory-related issues or are working with a large number of topics or partitions.

    The following example sets both the maximum and initial heap size to 512 megabytes. Adjust these values according to your specific requirements and available system resources.

    export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
  10. Open the Amazon MSK console at https://console.aws.amazon.com/msk/.

  11. Wait for the status of your cluster to become Active. This might take several minutes. After the status becomes Active, choose the cluster name. This takes you to a page containing the cluster summary.

  12. Choose View client information.

  13. Copy the connection string for the private endpoint.

    You will get three endpoints for each of the brokers. You only need one broker endpoint for the following step.

  14. Run the following command, replacing BootstrapServerString with one of the broker endpoints that you obtained in the previous step.

    <path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server BootstrapServerString --command-config client.properties --replication-factor 3 --partitions 1 --topic MSKTutorialTopic

    If you get a NoSuchFileException for the client.properties file, make sure that this file exists in the current working directory within the Kafka bin directory. Alternatively, provide a full path to this file's location in the Kafka command, as shown in the following example.

    <path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server BootstrapServerString --command-config <path-to-client-properties>/client.properties --replication-factor 3 --partitions 1 --topic MSKTutorialTopic
    Note

    If you prefer not to set the CLASSPATH environment variable for your entire session, you can alternatively prefix each Kafka command with the CLASSPATH variable. This approach applies the classpath only to that specific command.

    For example, if you've installed Kafka in /home/ec2-user/kafka_2.13-3.5.1 and your broker endpoint is myBrokerEndpoint-1.myCluster.abc123.kafka.us-east-1.amazonaws.com:9098:, prefix the Kafka command with the CLASSPATH variable, as shown in the following example.

    CLASSPATH=/home/ec2-user/kafka_2.13-3.5.1/libs/aws-msk-iam-auth-2.3.0-all.jar \ /home/ec2-user/kafka_2.13-3.5.1/bin/kafka-topics.sh --create \ --bootstrap-server myBrokerEndpoint-1.myCluster.abc123.kafka.us-east-1.amazonaws.com:9098 \ --command-config client.properties \ --replication-factor 3 \ --partitions 1 \ --topic MSKTutorialTopic

    If the command succeeds, you see the following message: Created topic MSKTutorialTopic.

    If the command is unsuccessful or you run into an error, see Troubleshoot your Amazon MSK cluster for troubleshooting information.

Next Step

Step 5: Produce and consume data