Paso 4: creación de un tema en el clúster de Amazon MSK - Transmisión gestionada de Amazon para Apache Kafka

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Paso 4: creación de un tema en el clúster de Amazon MSK

En este paso de Introducción a Amazon MSK, debe instalar las bibliotecas y herramientas del cliente de Apache Kafka en el equipo cliente y, a continuación, creer un tema.

aviso

Los números de versión de Apache Kafka utilizados en este tutorial son solo ejemplos. Se recomienda utilizar la misma versión del cliente que la versión de clúster de MSK. Es posible que a una versión de cliente anterior le falten ciertas funciones y correcciones de errores críticos.

Determinar la versión del clúster de MSK

  1. Abra la consola de Amazon MSK en https://console.aws.amazon.com/msk/.

  2. En la barra de navegación, elige la región en la que creaste el clúster de MSK.

  3. Elija el clúster de MSK.

  4. Anote la versión de Apache Kafka utilizada en el clúster.

  5. Sustituya las instancias de los números de versión de Amazon MSK de este tutorial por la versión obtenida en el paso 3.

Crear un tema en la máquina cliente

  1. Conéctese a su máquina cliente.

    1. Abre la EC2 consola de Amazon en https://console.aws.amazon.com/ec2/.

    2. En el panel de navegación, seleccione Instances (Instancias). A continuación, seleccione la casilla de verificación situada junto al nombre de la máquina cliente en la que creóPaso 3: creación de un equipo cliente.

    3. Elija Actions (Acciones) y, a continuación, elija Connect (Conectar). Siga las instrucciones de la consola para conectarse al equipo cliente.

  2. Instale Java y configure la variable de entorno de la versión de Kafka.

    1. Instale Java en la máquina cliente ejecutando el siguiente comando.

      sudo yum -y install java-11
    2. Guarde la versión de Kafka del clúster de MSK en la variable de entornoKAFKA_VERSION, como se muestra en el siguiente comando. Necesitará esta información durante toda la configuración.

      export KAFKA_VERSION={KAFKA VERSION}

      Por ejemplo, si utiliza la versión 3.6.0, utilice el siguiente comando.

      export KAFKA_VERSION=3.6.0
  3. Descargue y extraiga Apache Kafka.

    1. Ejecute el siguiente comando para descargar Apache Kafka.

      wget https://archive.apache.org/dist/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz

      Por ejemplo, si su clúster de MSK usa la versión 3.6.0 de Apache Kafka, ejecute el siguiente comando.

      wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz
      nota

      La siguiente lista presenta información alternativa sobre la descarga de Kafka que puede utilizar si tiene algún problema.

      • Si tiene problemas de conectividad o desea utilizar un sitio réplica, pruebe a utilizar el selector de réplicas de Apache, tal y como se muestra en el siguiente comando.

        wget https://www.apache.org/dyn/closer.cgi?path=/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz
      • Descargue la versión adecuada directamente desde el sitio web de Apache Kafka.

    2. Ejecute el siguiente comando en el directorio donde descargó el archivo TAR del paso anterior.

      tar -xzf kafka_2.13-$KAFKA_VERSION.tgz
    3. Guarde la ruta completa al directorio recién creado dentro de la variable de KAFKA_ROOT entorno.

      export KAFKA_ROOT=$(pwd)/kafka_2.13-$KAFKA_VERSION
  4. Configure la autenticación para su clúster de MSK.

    1. Descargue la versión más reciente del archivo JAR de IAM de Amazon MSK que se encuentra en el $KAFKA_ROOT/libs directorio. Use el siguiente comando para descargar el archivo y sustitúyalo por el {LATEST VERSION} número de versión real.

      cd $KAFKA_ROOT/libs wget https://github.com/aws/aws-msk-iam-auth/releases/latest/download/aws-msk-iam-auth-{LATEST VERSION}-all.jar

      El archivo JAR de IAM de Amazon MSK permite que su máquina cliente acceda al clúster de MSK mediante la autenticación de IAM.

      nota

      Antes de ejecutar cualquier comando de Kafka que interactúe con el clúster de MSK, puede que tenga que añadir el archivo JAR de IAM de Amazon MSK a la ruta de clases de Java. Establezca la variable de CLASSPATH entorno, como se muestra en el siguiente ejemplo.

      export CLASSPATH=$KAFKA_ROOT/libs/aws-msk-iam-auth-{LATEST VERSION}-all.jar

      Esto establece el valor CLASSPATH para toda la sesión, haciendo que el JAR esté disponible para todos los comandos de Kafka posteriores.

    2. Vaya al $KAFKA_ROOT/config directorio para crear el archivo de configuración del cliente.

      cd $KAFKA_ROOT/config
    3. Copie las siguientes configuraciones de propiedades y péguelas en un archivo nuevo. Guarde el archivo como client.properties.

      security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
  5. (Opcional) Si tiene algún problema relacionado con la memoria o está trabajando con un gran número de temas o particiones, puede ajustar el tamaño del montón de Java para las herramientas de Kafka. Para ello, defina la variable de KAFKA_HEAP_OPTS entorno antes de ejecutar los comandos de Kafka.

    El siguiente ejemplo establece el tamaño máximo e inicial del montón en 512 megabytes. Ajuste estos valores en función de sus requisitos específicos y de los recursos del sistema disponibles.

    export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
  6. Obtenga la información de conexión del clúster.

    1. Abra la consola de Amazon MSK en https://console.aws.amazon.com/msk/.

    2. Espere a que el estado del clúster pase a ser Activo. Esto podría tardar varios minutos. Cuando el estado pase a ser Activo, elija el nombre del clúster. Se le redirigirá a una página que contiene el resumen del clúster.

    3. Seleccione Ver información del cliente.

    4. Copie la cadena de conexión del punto de conexión privado.

      Obtendrá tres puntos de conexión para cada uno de los corredores. Guarde una de estas cadenas de conexión en la variable de entornoBOOTSTRAP_SERVER, como se muestra en el siguiente comando. <bootstrap-server-string>Sustitúyala por el valor real de la cadena de conexión.

      export BOOTSTRAP_SERVER=<bootstrap-server-string>
  7. Ejecute el siguiente comando para crear el tema.

    $KAFKA_ROOT/bin/kafka-topics.sh --create --bootstrap-server $BOOTSTRAP_SERVER --command-config $KAFKA_ROOT/config/client.properties --replication-factor 3 --partitions 1 --topic MSKTutorialTopic

    Si obtiene una NoSuchFileException para el client.properties archivo, asegúrese de que este archivo existe en el directorio de trabajo actual dentro del directorio bin de Kafka.

    nota

    Si prefiere no establecer la variable de CLASSPATH entorno para toda la sesión, también puede anteponer la variable a cada comando de Kafka. CLASSPATH Este enfoque aplica la ruta de clases solo a ese comando específico.

    CLASSPATH=$KAFKA_ROOT/libs/aws-msk-iam-auth-{LATEST VERSION}-all.jar \ $KAFKA_ROOT/bin/kafka-topics.sh --create \ --bootstrap-server $BOOTSTRAP_SERVER \ --command-config $KAFKA_ROOT/config/client.properties \ --replication-factor 3 \ --partitions 1 \ --topic MSKTutorialTopic
  8. (Opcional) Compruebe que el tema se haya creado correctamente.

    1. Si el comando se ejecuta correctamente, debería aparecer el siguiente mensaje: Created topic MSKTutorialTopic.

    2. Enumere todos los temas para confirmar que su tema existe.

      $KAFKA_ROOT/bin/kafka-topics.sh --list --bootstrap-server $BOOTSTRAP_SERVER --command-config $KAFKA_ROOT/config/client.properties

    Si el comando no funciona o se produce un error, consulte Solución de problemas del clúster de Amazon MSK para obtener información sobre la solución de problemas.

  9. (Opcional) Si desea conservar las variables de entorno para los siguientes pasos de este tutorial, omita este paso. De lo contrario, puede desconfigurar las variables, como se muestra en el siguiente ejemplo.

    unset KAFKA_VERSION KAFKA_ROOT BOOTSTRAP_SERVER CLASSPATH KAFKA_HEAP_OPTS

Paso siguiente

Paso 5: producción y consumo de datos