Desenvolver uma aplicação de consumo da Kinesis Client Library em Node.js - Amazon Kinesis Data Streams

Desenvolver uma aplicação de consumo da Kinesis Client Library em Node.js

Importante

As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. A KCL 1.x chegará ao fim do suporte em 30 de janeiro de 2026. É altamente recomendável que você migre suas aplicações da KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a página da Amazon Kinesis Client Library no GitHub. Para obter informações sobre as versões mais recentes da KCL, consulte Usar a Kinesis Client Library. Para ter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte Migrar da KCL 1.x para a KCL 3.x.

É possível usar a Kinesis Client Library (KCL) para criar aplicações que processam dados dos fluxos de dados do Kinesis. A Kinesis Client Library está disponível em várias linguagens. Este tópico discute Node.js.

A KCL é uma biblioteca Java. O suporte para linguagens diferentes de Java é fornecido usando uma interface multilíngue chamada MultilangDaemon. Esse daemon baseado em Java é executado em segundo plano quando você usa uma linguagem de KCL diferente de Java. Portanto, mesmo que a KCL para Node.js seja instalada e a aplicação de consumo seja inteiramente criada em Node.js, ainda é necessário ter Java instalado no sistema por causa do MultiLangDaemon. Além disso, o MultiLangDaemon tem algumas configurações padrão, como a região da AWS à qual ele se conecta, podem ser personalizadas para cada caso de uso. Para obter mais informações sobre a MultiLangDaemon no GitHub, acesse a página do projeto da MultiLangDaemon da KCL.

Para fazer download da KCL Node.js do GitHub, acesse Kinesis Client Library (Node.js).

Downloads de códigos de exemplo

Há dois exemplos de código disponíveis para KCL em Node.js:

  • basic-sample

    Usado nas seções a seguir para ilustrar os conceitos básicos de criação de uma aplicação de consumo da KCL em Node.js.

  • click-stream-sample

    Levemente mais avançado e usa um cenário real, para depois que houver familiaridade com o código de exemplo básico. Esse exemplo não é discutido aqui, mas há um arquivo README com mais informações.

É necessário concluir as seguintes tarefas ao implementar uma aplicação de consumo da KCL em Node.js:

Implementar o processador de registros

A aplicação de consumo mais simples possível usando a KCL para Node.js deve implementar uma função recordProcessor, que, por sua vez, contém as funções initialize, processRecords e shutdown. O exemplo fornece uma implementação que pode ser usada como ponto de partida (consulte sample_kcl_app.js).

function recordProcessor() { // return an object that implements initialize, processRecords and shutdown functions.}
inicializar

A KCL chama a função initialize quando o processador de registros é iniciado. Esse processador de registros processa apenas o ID do fragmento passado como initializeInput.shardId e, normalmente, o inverso também é verdadeiro (esse fragmento é processado somente por esse processador de registro). No entanto, a aplicação de consumo deve considerar a possibilidade de que um registro de dados pode ser processado mais de uma vez. Isso acontece porque a semântica do Kinesis Data Streams é do tipo pelo menos uma vez, o que significa que cada registro de dados de um fragmento é processado pelo menos uma vez por um operador na aplicação de consumo. Para obter mais informações sobre casos em que um fragmento específico pode ser processado por mais de um operador, consulte Use refragmentação, escalonamento e processamento paralelo para alterar o número de fragmentos.

initialize: function(initializeInput, completeCallback)
processRecords

A KCL chama essa função com uma entrada contendo uma lista de registros de dados do fragmento especificado para a função initialize. O processador de registros implementado processará os dados nesses registros de acordo com a semântica da aplicação de consumo. Por exemplo, o operador pode executar uma transformação nos dados e, em seguida, armazenar o resultado em um bucket do Amazon Simple Storage Service (Amazon S3).

processRecords: function(processRecordsInput, completeCallback)

Além dos dados em si, o registro também contém um número de sequência e uma chave de partição, que o operador pode usar ao processar os dados. Por exemplo, o operador pode escolher o bucket do S3 no qual armazenar os dados com base no valor da chave de partição. O dicionário de record expõe os seguintes pares de chave/valor para acessar os dados do registro, o número de sequência e a chave de partição:

record.data record.sequenceNumber record.partitionKey

Observe que os dados são codificados em Base64.

No exemplo básico, a função processRecords tem código que mostra como um operador pode acessar os dados do registro, o número de sequência e a chave de partição.

O Kinesis Data Streams requer que o processador de registros rastreie os registros que já foram processados em um fragmento. A KCL faz esse rastreamento com um objeto checkpointer passado como processRecordsInput.checkpointer. O processador de registros chama a função checkpointer.checkpoint para informar a KCL sobre o progresso do processamento dos registros no fragmento. Se o operador falhar, a KCL usará essas informações ao reiniciar o processamento do fragmento para continuar a partir do último registro processado conhecido.

Em uma operação de divisão ou mesclagem, a KCL só começará a processar os novos fragmentos quando os processadores dos fragmentos originais chamarem checkpoint para indicar que o processamento dos fragmentos originais foi concluído.

Se u mnúmero de sequência não for passado para a função checkpoint, a KCL presumirá que a chamada para checkpoint significa que todos os registros foram processados até o último registro passado para o processador de registros. Portanto, o processador de registros deve chamar checkpoint somente após ter processado todos os registros na lista que foi passada para ele. Os processadores de registros não precisam chamar checkpoint em cada chamada para processRecords. Um processador pode, por exemplo, chamar checkpoint a cada terceira chamada, ou algum evento externo para o processador de registros, como um serviço de validação/verificação personalizado que tenha sido implementado.

É possível, opcionalmente, especificar o número de sequência exato de um registro como um parâmetro para checkpoint. Nesse caso, a KCL presume que todos os registros foram processados somente até o registro especificado.

O aplicativo de exemplo básico mostra a chamada mais simples possível para a função checkpointer.checkpoint. É possível adicionar outra lógica de verificação que precisar para o consumidor neste ponto da função.

shutdown

A KCL chama a função shutdown quando o processamento termina (shutdownInput.reason é TERMINATE) ou quando o operador não está mais respondendo (shutdownInput.reason é ZOMBIE).

shutdown: function(shutdownInput, completeCallback)

O processamento termina quando o processador de registros não recebe mais registros do fragmento porque ele foi dividido ou intercalado, ou o fluxo foi excluído.

A KCL também passa um objeto shutdownInput.checkpointer para shutdown. Se o motivo do desligamento for TERMINATE, é necessário verificar se o processador de registros terminou o processamento de todos os registros de dados e, em seguida, chamar a função checkpoint nessa interface.

Modificar as propriedades de configuração

O exemplo fornece valores padrão para as propriedades de configuração. É possível substituir qualquer uma dessas propriedades por seus próprios valores (consulte sample.properties no exemplo básico).

Nome da aplicação

A KCL exige uma aplicação exclusiva entre as aplicações e as tabelas do Amazon DynamoDB na mesma região. Ela usa o valor de configuração de nome de aplicativo das seguintes formas:

  • Presume-se que todos os operadores associados com esse nome de aplicativo estejam trabalhando juntos no mesmo fluxo. Esses operadores podem ser distribuídos em várias instâncias. Ao executar uma instância adicional do mesmo código da aplicação, mas com um nome diferente, a KCL tratará a segunda instância como uma aplicação totalmente independente operando no mesmo fluxo.

  • A KCL cria uma tabela do DynamoDB com o nome da aplicação e usa essa tabela para manter informações de estado (como pontos de verificação e mapeamento de operador-fragmento) da aplicação. Cada aplicação tem sua própria tabela do DynamoDB. Para obter mais informações, consulte Usar uma tabela de concessões para monitorar os fragmentos processados pela aplicação de consumo da KCL.

Configurar credenciais

Você precisa disponibilizar as credenciais da AWS para um dos provedores de credenciais na cadeia de provedores de credenciais padrão. A propriedade AWSCredentialsProvider pode ser usada para definir um provedor de credenciais. O arquivo sample.properties precisa disponibilizar as credenciais para um dos provedores de credenciais na cadeia de provedores de credenciais padrão. Ao executar o consumidor em uma instância do Amazon EC2, recomenda-se que a instância seja configurada com um perfil do IAM. As credenciais da AWS que refletem as permissões associadas a esse perfil do IAM são disponibilizadas às aplicações na instância por meio dos metadados da instância. Essa é a maneira mais segura de gerenciar credenciais para uma aplicação de consumo em execução em uma instância do EC2.

O exemplo a seguir configura a KCL para processar um fluxo de dados do Kinesis chamado kclnodejssample usando o processador de registros fornecido em sample_kcl_app.js:

# The Node.js executable script executableName = node sample_kcl_app.js # The name of an Amazon Kinesis stream to process streamName = kclnodejssample # Unique KCL application name applicationName = kclnodejssample # Use default AWS credentials provider chain AWSCredentialsProvider = DefaultAWSCredentialsProviderChain # Read from the beginning of the stream initialPositionInStream = TRIM_HORIZON