O Amazon Managed Service para Apache Flink (Amazon MSF) era conhecido anteriormente como Amazon Kinesis Data Analytics for Apache Flink.
As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Adicione fontes de dados de streaming ao Managed Service for Apache Flink
O Apache Flink fornece conectores para leitura de arquivos, soquetes, coleções e fontes personalizadas. No código do seu aplicativo, você usa uma fonte do Apache Flink
Use streams de dados do Kinesis
O KinesisStreamsSource
fornece dados de streaming para seu aplicativo a partir de um stream de dados do Amazon Kinesis.
Criar uma KinesisStreamsSource
O exemplo de código a seguir demonstra como criar um KinesisStreamsSource
:
// Configure the KinesisStreamsSource Configuration sourceConfig = new Configuration(); sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST // Create a new KinesisStreamsSource to read from specified Kinesis Stream. KinesisStreamsSource<String> kdsSource = KinesisStreamsSource.<String>builder() .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream") .setSourceConfig(sourceConfig) .setDeserializationSchema(new SimpleStringSchema()) .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used. .build();
Para obter mais informações sobre o uso de umKinesisStreamsSource
, consulte o Amazon Kinesis Data
Crie um KinesisStreamsSource
que use um consumidor EFO
KinesisStreamsSource
Agora é compatível com Enhanced Fan-Out (EFO
Se um consumidor do Kinesis usa o EFO, o serviço Kinesis Data Streams fornece sua própria largura de banda dedicada, em vez de fazer com que o consumidor compartilhe a largura de banda fixa do stream com os outros consumidores que estão lendo o stream.
Para obter mais informações sobre como usar o EFO com o consumidor Kinesis, consulte FLIP-128: Enhanced Fan Out
Você habilita o consumidor EFO definindo os seguintes parâmetros no consumidor do Kinesis:
READER_TYPE: defina esse parâmetro como EFO para que seu aplicativo use um consumidor EFO para acessar os dados do Kinesis Data Stream.
EFO_CONSUMER_NAME: defina esse parâmetro como um valor de sequência de caracteres que é exclusivo entre os consumidores desse fluxo. A reutilização de um nome de consumidor no mesmo Kinesis Data Stream fará com que o consumidor anterior que usava esse nome seja excluído.
Para configurar um KinesisStreamsSource
para usar o EFO, adicione os seguintes parâmetros ao consumidor:
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO); sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
Para ver um exemplo de um serviço gerenciado para o aplicativo Apache Flink que usa um consumidor EFO, consulte nosso exemplo público do Kinesis
Use o Amazon MSK
A fonte KafkaSource
fornece dados de transmissão para seu aplicativo a partir de um tópico do Amazon MSK.
Criar uma KafkaSource
O exemplo de código a seguir demonstra como criar um KafkaSource
:
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
Para obter mais informações sobre como usar uma KafkaSource
, consulte Replicação do MSK.