Uso do Kafka Streams com agentes MSK Express e MSK Serverless
O Kafka Streams é compatível com transformações com e sem estado. Transformações com estado, como contar, agregar ou unir, usam operadores que armazenam seu estado em tópicos internos do Kafka. Além disso, algumas transformações sem estado, como groupBy ou repartição, armazenam seus resultados em tópicos internos do Kafka. Por padrão, o Kafka Streams nomeia esses tópicos internos com base no operador correspondente. Se esses tópicos não existirem, o Kafka Streams criará os tópicos internos do Kafka. Para criar os tópicos internos, o Kafka Streams codifica a configuração segment.bytes e a define em 50 MB. O MSK Provisioned com agentes Express e o MSK Serverless protegem algumas configurações de tópicos, incluindo segment.size, durante a criação do tópico. Portanto, uma aplicação do Kafka Streams com transformações com estado falha ao criar os tópicos internos usando os agentes MSK Express ou MSK Serverless.
Para executar essas aplicações do Kafka Streams em agentes MSK Express ou MSK Serverless, você mesmo deve criar os tópicos internos. Para isso, primeiro identifique e nomeie os operadores do Kafka Streams que exigem tópicos. Em seguida, crie os tópicos internos correspondentes do Kafka.
nota
-
É uma boa prática nomear os operadores manualmente no Kafka Streams, especialmente aqueles que dependem de tópicos internos. Para obter informações sobre como nomear operadores, consulte Nomeando operadores em uma aplicação DSL do Kafka Streams
na documentação do Kafka Streams. -
O nome do tópico interno de uma transformação com estado depende da
application.idda aplicação do Kafka Streams e do nome do operador com estado,application.id-statefuloperator_name.
Criação de uma aplicação Kafka Streams usando agentes MSK Express ou MSK Serverless
Se sua aplicação Kafka Streams tiver a application.id configurada como msk-streams-processing, você poderá criar uma aplicação Kafka Streams usando agentes MSK Express ou MSK Serverless. Para fazer isso, use o operador count(), que requer um tópico interno com o nome. Por exemplo, msk-streams-processing-count-store.
Para criar uma aplicação Kafka Streams, faça o seguinte:
Tópicos
Identifique e nomeie os operadores
-
Identifique os processadores com estado usando as Transformações com estado
na documentação do Kafka Streams. Alguns exemplos de processadores com estado incluem
count,aggregateoujoin. -
Identifique os processadores que criam tópicos para reparticionamento.
O exemplo a seguir contém uma operação
count()que precisa de um estado.var stream = paragraphStream .groupByKey() .count() .toStream(); -
Para nomear o tópico, adicione um nome para cada processador com estado. Com base no tipo de processador, a nomenclatura é feita por uma classe de nomenclatura diferente. Por exemplo, a operação
count()é de agregação. Portanto, ela precisa da classeMaterialized.Para obter informações sobre as classes de nomenclatura para as operações com estado, consulte a Conclusão
na documentação do Kafka Streams. O exemplo a seguir define o nome do operador
count()paracount-storeusar a classeMaterialized.var stream = paragraphStream .groupByKey() .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count-store") // descriptive name for the store .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Long())) .toStream();
Criação dos tópicos internos
O Kafka Streams coloca prefixos application.id em nomes de tópicos internos, em que application.id é definido pelo usuário. Por exemplo, application.id-internal_topic_name. Os tópicos internos são normais do Kafka, e você pode criar os tópicos usando as informações disponíveis em Criar um tópico do Apache Kafka ou AdminClient da API do Kafka.
Dependendo do seu caso de uso, você pode usar as políticas padrão de limpeza e retenção do Kafka Streams ou personalizar seus valores. Você os define em cleanup.policy e retention.ms.
O exemplo a seguir cria os tópicos com a API AdminClient e define o application.id como msk-streams-processing.
try (AdminClient client = AdminClient.create(configs.kafkaProps())) { Collection<NewTopic> topics = new HashSet<>(); topics.add(new NewTopic("msk-streams-processing-count-store", 3, (short) 3)); client.createTopics(topics); }
Depois que os tópicos forem criados no cluster, sua aplicação Kafka Streams poderá usar o tópico msk-streams-processing-count-store na operação count().
(Opcional) Verificar o nome do tópico
Você pode usar o descritor de topografia para descrever a topologia do seu stream e visualizar os nomes dos tópicos internos. O exemplo a seguir mostra como executar o descritor da topologia.
final StreamsBuilder builder = new StreamsBuilder(); Topology topology = builder.build(); System.out.println(topology.describe());
A saída a seguir mostra a topologia do fluxo para o exemplo anterior.
Topology Description: Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [input_topic]) --> KSTREAM-AGGREGATE-0000000001 Processor: KSTREAM-AGGREGATE-0000000001 (stores: [count-store]) --> KTABLE-TOSTREAM-0000000002 <-- KSTREAM-SOURCE-0000000000 Processor: KTABLE-TOSTREAM-0000000002 (stores: []) --> KSTREAM-SINK-0000000003 <-- KSTREAM-AGGREGATE-0000000001 Sink: KSTREAM-SINK-0000000003 (topic: output_topic) <-- KTABLE-TOSTREAM-0000000002
Para obter informações sobre como usar o descritor de topologia, consulte Nomeando operadores em uma aplicação DSL do Kafka Streams
Exemplos de nomenclatura de operadores
Esta seção fornece exemplos de operações de nomenclatura.
Exemplo de operador de nomenclatura para groupByKey()
groupByKey() -> groupByKey(Grouped.as("kafka-stream-groupby"))
Exemplo de operador de nomenclatura para contagem normal()
normal count() -> .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("kafka-streams-window") // descriptive name for the store .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Long()))
Exemplo de operador de nomenclatura para contagem em janela()
windowed count() -> .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("kafka-streams-window") // descriptive name for the store .withKeySerde(Serdes.String()) .withValueSerde(Serdes.Long()))
Exemplo de operador de nomenclatura para suprimido em janela()
windowed suppressed() -> Suppressed<Windowed> suppressed = Suppressed .untilWindowCloses(Suppressed.BufferConfig.unbounded()) .withName("kafka-suppressed"); .suppress(suppressed)