Uso de Kafka Streams con agentes Express de MSK y Amazon MSK sin servidor
Kafka Streams admite transformaciones sin estado y con estado. Las transformaciones con estado, como el recuento, la agregación o la unión, utilizan operadores que almacenan su estado en temas internos de Kafka. Además, algunas transformaciones sin estado, como groupBy o la redistribución de particiones, almacenan sus resultados en temas internos de Kafka. De forma predeterminada, Kafka Streams asigna nombres a estos temas internos en función del operador correspondiente. Si estos temas no existen, Kafka Streams crea temas internos de Kafka. Para crear los temas internos, Kafka Streams codifica de forma rígida la configuración segment.bytes y la establece en 50 MB. MSK aprovisionado con agentes Express y MSK sin servidor protegen algunas configuraciones de temas, incluido el tamaño del segmento, durante la creación del tema. Por lo tanto, una aplicación de Kafka Streams con transformaciones con estado no puede crear los temas internos cuando se utilizan agentes Express de MSK o MSK sin servidor.
Para ejecutar este tipo de aplicaciones de Kafka Streams en agentes Express de MSK o en MSK sin servidor, debe crear los temas internos por su cuenta. Para ello, primero identifique y asigne un nombre a los operadores de Kafka Streams, que requieren temas. A continuación, cree los temas internos de Kafka correspondientes.
nota
-
Se considera una práctica recomendada asignar nombres manualmente a los operadores en Kafka Streams, especialmente a aquellos que dependen de temas internos. Para obtener información sobre la asignación de nombres a operadores, consulte Asignación de nombres a operadores en una aplicación de DSL de Kafka Streams
en la documentación de Kafka Streams. -
El nombre del tema interno de una transformación con estado depende del
application.idde la aplicación de Kafka Streams y del nombre del operador con estado,application.id-statefuloperator_name.
Creación de una aplicación de Kafka Streams con agentes Express de MSK o MSK sin servidor
Si la aplicación de Kafka Streams tiene su application.id establecido en msk-streams-processing, puede crear una aplicación de Kafka Streams con agentes Express de MSK o MSK sin servidor. Para ello, utilice el operador count(), que requiere un tema interno con ese nombre. Por ejemplo, msk-streams-processing-count-store.
Para crear una aplicación de Kafka Streams, haga lo siguiente:
Temas
Cómo identificar operadores y asignarles un nombre
-
Identifique los procesadores con estado mediante Transformaciones con estado
en la documentación de Kafka Streams. Algunos ejemplos de procesadores con estado incluyen
count,aggregateojoin. -
Identifique los procesadores que crean temas para la redistribución de particiones.
El siguiente ejemplo contiene una operación
count(), que requiere estado.var stream = paragraphStream .groupByKey() .count() .toStream(); -
Para asignar un nombre al tema, agregue un nombre para cada procesador con estado. Según el tipo de procesador, la asignación de nombres se realiza mediante una clase de asignación de nombres diferente. Por ejemplo, la operación
count()es una operación de agregación. Por lo tanto, requiere la claseMaterialized.Para obtener información sobre las clases de asignación de nombres para las operaciones con estado, consulte Conclusión
en la documentación de Kafka Streams. En el siguiente ejemplo, se establece el nombre del operador
count()comocount-storemediante la claseMaterialized.var stream = paragraphStream .groupByKey() .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count-store") // descriptive name for the store .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Long())) .toStream();
Creación de los temas internos
Kafka Streams antepone el prefijo application.id a los nombres de los temas internos, donde application.id es definido por el usuario. Por ejemplo, application.id-internal_topic_name. Los temas internos son temas normales de Kafka, y puede crear los temas con la información disponible en Creación de un tema de Apache Kafka o AdminClient de la API de Kafka.
Según el caso de uso, puede utilizar las políticas predeterminadas de limpieza y retención de Kafka Streams o personalizar sus valores. Estas políticas se definen en cleanup.policy yretention.ms .
En el siguiente ejemplo, se crean los temas con la API de AdminClient y se establece el application.id en msk-streams-processing.
try (AdminClient client = AdminClient.create(configs.kafkaProps())) { Collection<NewTopic> topics = new HashSet<>(); topics.add(new NewTopic("msk-streams-processing-count-store", 3, (short) 3)); client.createTopics(topics); }
Una vez que los temas se han creado en el clúster, su aplicación de Kafka Streams puede utilizar el tema msk-streams-processing-count-store para la operación count().
(Opcional) Comprobación del nombre del tema
Puede utilizar el descriptor de topología para describir la topología del flujo y ver los nombres de los temas internos. En el siguiente ejemplo, se muestra cómo ejecutar el descriptor de topología.
final StreamsBuilder builder = new StreamsBuilder(); Topology topology = builder.build(); System.out.println(topology.describe());
La siguiente salida muestra la topología del flujo del ejemplo anterior.
Topology Description: Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic]) --> KSTREAM-AGGREGATE-0000000001 Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store]) --> KTABLE-TOSTREAM-0000000002 <-- KSTREAM-SOURCE-0000000000 Processor: KTABLE-TOSTREAM-0000000002 (stores: []) --> KSTREAM-SINK-0000000003 <-- KSTREAM-AGGREGATE-0000000001 Sink: KSTREAM-SINK-0000000003 (topic: output_topic) <-- KTABLE-TOSTREAM-0000000002
Para obtener información sobre cómo usar el descriptor de topología, consulte Asignación de nombres a operadores en una aplicación de DSL de Kafka Streams
Ejemplos de asignación de nombres a operadores
Esta sección proporciona algunos ejemplos de asignación de nombres a operadores.
Ejemplo de asignación de nombre a un operador para la groupByKey()
groupByKey() -> groupByKey(Grouped.as("kafka-stream-groupby"))
Ejemplo de asignación de nombre a un operador para normal count()
normal count() -> .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("kafka-streams-window") // descriptive name for the store .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Long()))
Ejemplo de asignación de nombre a un operador para windowed count()
windowed count() -> .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("kafka-streams-window") // descriptive name for the store .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Long()))
Ejemplo de asignación de nombre a un operador para windowed suppressed()
windowed suppressed() -> Suppressed<Windowed> suppressed = Suppressed .untilWindowCloses(Suppressed.BufferConfig.unbounded()) .withName("kafka-suppressed"); .suppress(suppressed)