Adicione fontes de dados de transmissão ao Managed Service for Apache Flink - Managed Service for Apache Flink

Anteriormente, o Amazon Managed Service for Apache Flink (Amazon MSF) era conhecido como Amazon Kinesis Data Analytics for Apache Flink.

Adicione fontes de dados de transmissão ao Managed Service for Apache Flink

O Apache Flink fornece conectores para leitura de arquivos, soquetes, coleções e fontes personalizadas. No código do seu aplicativo, você usa uma fonte do Apache Flink para receber dados de um fluxo. Esta seção descreve as fontes que estão disponíveis para os serviços da Amazon.

Use fluxos de dados do Kinesis

A KinesisStreamsSource fornece dados de transmissão para seu aplicativo a partir de um fluxo de dados do Amazon Kinesis.

Crie um KinesisStreamsSource

O exemplo de código a seguir demonstra como criar um KinesisStreamsSource:

// Configure the KinesisStreamsSource Configuration sourceConfig = new Configuration(); sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST // Create a new KinesisStreamsSource to read from specified Kinesis Stream. KinesisStreamsSource<String> kdsSource = KinesisStreamsSource.<String>builder() .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream") .setSourceConfig(sourceConfig) .setDeserializationSchema(new SimpleStringSchema()) .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used. .build();

Para obter mais informações sobre o uso de um KinesisStreamsSource, consulte Conector do Amazon Kinesis Data Streams na documentação do Apache Flink e nosso exemplo público de KinesisConnectors no Github.

Crie um KinesisStreamsSource que usa um consumidor EFO

O KinesisStreamsSource agora oferece suporte ao Enhanced Fan-Out (EFO).

Se um consumidor do Kinesis usa o EFO, o serviço Kinesis Data Streams fornece sua própria largura de banda dedicada, em vez de fazer com que o consumidor compartilhe a largura de banda fixa do stream com os outros consumidores que estão lendo o stream.

Para obter mais informações sobre como usar o EFO com o consumidor Kinesis, consulte FLIP-128: distribuição avançada para consumidores da AWS Kinesis.

Você habilita o consumidor EFO definindo os seguintes parâmetros no consumidor do Kinesis:

  • READER_TYPE: defina esse parâmetro como EFO para que seu aplicativo use um consumidor EFO para acessar os dados do Kinesis Data Stream.

  • EFO_CONSUMER_NAME: defina esse parâmetro como um valor de sequência de caracteres que é exclusivo entre os consumidores desse fluxo. A reutilização de um nome de consumidor no mesmo Kinesis Data Stream fará com que o consumidor anterior que usava esse nome seja excluído.

Para configurar um KinesisStreamsSource para usar o EFO, adicione os seguintes parâmetros ao consumidor:

sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO); sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");

Para obter um exemplo de um aplicativo do Managed Service for Apache Flink que usa um consumidor EFO, consulte nosso exemplo público de Conectores do Kinesis no Github.

Use o Amazon MSK

A fonte KafkaSource fornece dados de transmissão para seu aplicativo a partir de um tópico do Amazon MSK.

Crie um KafkaSource

O exemplo de código a seguir demonstra como criar um KafkaSource:

KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

Para obter mais informações sobre como usar uma KafkaSource, consulte Replicação do MSK.