Anteriormente, o Amazon Managed Service for Apache Flink era conhecido como Amazon Kinesis Data Analytics for Apache Flink.
As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Adicione fontes de dados de streaming ao Managed Service for Apache Flink
O Apache Flink fornece conectores para leitura de arquivos, soquetes, coleções e fontes personalizadas. No código do seu aplicativo, você usa uma fonte do Apache Flink
Use streams de dados do Kinesis
O KinesisStreamsSource
fornece dados de streaming para seu aplicativo a partir de um stream de dados do Amazon Kinesis.
Criar uma KinesisStreamsSource
O exemplo de código a seguir demonstra como criar um KinesisStreamsSource
:
// Configure the KinesisStreamsSource Configuration sourceConfig = new Configuration(); sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST // Create a new KinesisStreamsSource to read from specified Kinesis Stream. KinesisStreamsSource<String> kdsSource = KinesisStreamsSource.<String>builder() .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream") .setSourceConfig(sourceConfig) .setDeserializationSchema(new SimpleStringSchema()) .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used. .build();
Para obter mais informações sobre o uso de umKinesisStreamsSource
, consulte o Amazon Kinesis Data
Crie um KinesisStreamsSource
que use um consumidor EFO
KinesisStreamsSource
Agora é compatível com Enhanced Fan-Out (EFO
Se um consumidor do Kinesis usa o EFO, o serviço Kinesis Data Streams fornece sua própria largura de banda dedicada, em vez de fazer com que o consumidor compartilhe a largura de banda fixa do stream com os outros consumidores que estão lendo o stream.
Para obter mais informações sobre como usar o EFO com o consumidor Kinesis, consulte FLIP-128: Enhanced Fan Out
Você habilita o consumidor EFO definindo os seguintes parâmetros no consumidor do Kinesis:
READER_TYPE: defina esse parâmetro como EFO para que seu aplicativo use um consumidor EFO para acessar os dados do Kinesis Data Stream.
EFO_CONSUMER_NAME: defina esse parâmetro como um valor de sequência de caracteres que é exclusivo entre os consumidores desse fluxo. A reutilização de um nome de consumidor no mesmo Kinesis Data Stream fará com que o consumidor anterior que usava esse nome seja excluído.
Para configurar um KinesisStreamsSource
para usar o EFO, adicione os seguintes parâmetros ao consumidor:
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO); sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
Para ver um exemplo de um serviço gerenciado para o aplicativo Apache Flink que usa um consumidor EFO, consulte nosso exemplo público do Kinesis
Use o Amazon MSK
A fonte KafkaSource
fornece dados de transmissão para seu aplicativo a partir de um tópico do Amazon MSK.
Criar uma KafkaSource
O exemplo de código a seguir demonstra como criar um KafkaSource
:
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
Para obter mais informações sobre como usar uma KafkaSource
, consulte Replicação do MSK.