Anteriormente, o Amazon Managed Service for Apache Flink (Amazon MSF) era conhecido como Amazon Kinesis Data Analytics for Apache Flink.
Adicione fontes de dados de transmissão ao Managed Service for Apache Flink
O Apache Flink fornece conectores para leitura de arquivos, soquetes, coleções e fontes personalizadas. No código do seu aplicativo, você usa uma fonte do Apache Flink
Use fluxos de dados do Kinesis
A KinesisStreamsSource fornece dados de transmissão para seu aplicativo a partir de um fluxo de dados do Amazon Kinesis.
Crie um KinesisStreamsSource
O exemplo de código a seguir demonstra como criar um KinesisStreamsSource:
// Configure the KinesisStreamsSource Configuration sourceConfig = new Configuration(); sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST // Create a new KinesisStreamsSource to read from specified Kinesis Stream. KinesisStreamsSource<String> kdsSource = KinesisStreamsSource.<String>builder() .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream") .setSourceConfig(sourceConfig) .setDeserializationSchema(new SimpleStringSchema()) .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used. .build();
Para obter mais informações sobre o uso de um KinesisStreamsSource, consulte Conector do Amazon Kinesis Data Streams
Crie um KinesisStreamsSource que usa um consumidor EFO
O KinesisStreamsSource agora oferece suporte ao Enhanced Fan-Out (EFO)
Se um consumidor do Kinesis usa o EFO, o serviço Kinesis Data Streams fornece sua própria largura de banda dedicada, em vez de fazer com que o consumidor compartilhe a largura de banda fixa do stream com os outros consumidores que estão lendo o stream.
Para obter mais informações sobre como usar o EFO com o consumidor Kinesis, consulte FLIP-128: distribuição avançada para consumidores da AWS Kinesis
Você habilita o consumidor EFO definindo os seguintes parâmetros no consumidor do Kinesis:
READER_TYPE: defina esse parâmetro como EFO para que seu aplicativo use um consumidor EFO para acessar os dados do Kinesis Data Stream.
EFO_CONSUMER_NAME: defina esse parâmetro como um valor de sequência de caracteres que é exclusivo entre os consumidores desse fluxo. A reutilização de um nome de consumidor no mesmo Kinesis Data Stream fará com que o consumidor anterior que usava esse nome seja excluído.
Para configurar um KinesisStreamsSource para usar o EFO, adicione os seguintes parâmetros ao consumidor:
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO); sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
Para obter um exemplo de um aplicativo do Managed Service for Apache Flink que usa um consumidor EFO, consulte nosso exemplo público de Conectores do Kinesis no Github
Use o Amazon MSK
A fonte KafkaSource fornece dados de transmissão para seu aplicativo a partir de um tópico do Amazon MSK.
Crie um KafkaSource
O exemplo de código a seguir demonstra como criar um KafkaSource:
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
Para obter mais informações sobre como usar uma KafkaSource, consulte Replicação do MSK.