Adicione fontes de dados de streaming ao Managed Service for Apache Flink - Managed Service for Apache Flink

Anteriormente, o Amazon Managed Service for Apache Flink era conhecido como Amazon Kinesis Data Analytics for Apache Flink.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Adicione fontes de dados de streaming ao Managed Service for Apache Flink

O Apache Flink fornece conectores para leitura de arquivos, soquetes, coleções e fontes personalizadas. No código do seu aplicativo, você usa uma fonte do Apache Flink para receber dados de um fluxo. Esta seção descreve as fontes que estão disponíveis para os serviços da Amazon.

Use streams de dados do Kinesis

O KinesisStreamsSource fornece dados de streaming para seu aplicativo a partir de um stream de dados do Amazon Kinesis.

Criar uma KinesisStreamsSource

O exemplo de código a seguir demonstra como criar um KinesisStreamsSource:

// Configure the KinesisStreamsSource Configuration sourceConfig = new Configuration(); sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST // Create a new KinesisStreamsSource to read from specified Kinesis Stream. KinesisStreamsSource<String> kdsSource = KinesisStreamsSource.<String>builder() .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream") .setSourceConfig(sourceConfig) .setDeserializationSchema(new SimpleStringSchema()) .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used. .build();

Para obter mais informações sobre o uso de umKinesisStreamsSource, consulte o Amazon Kinesis Data Streams Connector na documentação do Apache Flink e KinesisConnectors nosso exemplo público no Github.

Crie um KinesisStreamsSource que use um consumidor EFO

KinesisStreamsSourceAgora é compatível com Enhanced Fan-Out (EFO).

Se um consumidor do Kinesis usa o EFO, o serviço Kinesis Data Streams fornece sua própria largura de banda dedicada, em vez de fazer com que o consumidor compartilhe a largura de banda fixa do stream com os outros consumidores que estão lendo o stream.

Para obter mais informações sobre como usar o EFO com o consumidor Kinesis, consulte FLIP-128: Enhanced Fan Out for Kinesis Consumers. AWS

Você habilita o consumidor EFO definindo os seguintes parâmetros no consumidor do Kinesis:

  • READER_TYPE: defina esse parâmetro como EFO para que seu aplicativo use um consumidor EFO para acessar os dados do Kinesis Data Stream.

  • EFO_CONSUMER_NAME: defina esse parâmetro como um valor de sequência de caracteres que é exclusivo entre os consumidores desse fluxo. A reutilização de um nome de consumidor no mesmo Kinesis Data Stream fará com que o consumidor anterior que usava esse nome seja excluído.

Para configurar um KinesisStreamsSource para usar o EFO, adicione os seguintes parâmetros ao consumidor:

sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO); sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");

Para ver um exemplo de um serviço gerenciado para o aplicativo Apache Flink que usa um consumidor EFO, consulte nosso exemplo público do Kinesis Connectors no Github.

Use o Amazon MSK

A fonte KafkaSource fornece dados de transmissão para seu aplicativo a partir de um tópico do Amazon MSK.

Criar uma KafkaSource

O exemplo de código a seguir demonstra como criar um KafkaSource:

KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

Para obter mais informações sobre como usar uma KafkaSource, consulte Replicação do MSK.