

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Utilisez des connecteurs pour déplacer des données dans le service géré pour Apache Flink avec l'API DataStream
<a name="how-connectors"></a>

Dans l' DataStream API Amazon Managed Service for Apache Flink, les *connecteurs* sont des composants logiciels qui déplacent les données vers et depuis une application Managed Service for Apache Flink. Les connecteurs sont des intégrations flexibles qui vous permettent de lire des fichiers et des répertoires. Les connecteurs sont constitués de modules complets permettant d’interagir avec les services Amazon et les systèmes tiers.

Les types de connecteurs sont les suivants :
+ [Ajouter des sources de données de streaming](how-sources.md) : fournit des données à votre application à partir d’un flux de données Kinesis, d’un fichier ou d’une autre source de données.
+ [Écrire des données à l'aide de récepteurs](how-sinks.md): envoyez des données depuis votre application vers un flux de données Kinesis, un flux Firehose ou une autre destination de données.
+ [Utiliser des E/S asynchrones](how-async.md) : fournit un accès asynchrone à une source de données (telle qu’une base de données) pour enrichir les événements de flux. 

## Connecteurs disponibles
<a name="how-connectors-list"></a>

L’environnement Apache Flink contient des connecteurs permettant d’accéder aux données provenant de diverses sources. Pour obtenir des informations sur les connecteurs disponibles dans l’environnement Apache Flink, consultez [Connectors](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/connectors/) dans la [documentation Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

**Avertissement**  
Si vous avez des applications exécutées sur Flink 1.6, 1.8, 1.11 ou 1.13 et que vous souhaitez les exécuter dans les régions du Moyen-Orient (EAU), de l'Asie-Pacifique (Hyderabad), d'Israël (Tel Aviv), de l'Europe (Zurich), du Moyen-Orient (EAU), de l'Asie-Pacifique (Melbourne) ou de l'Asie-Pacifique (Jakarta), vous devrez peut-être reconstruire l'archive de vos applications avec un connecteur mis à jour ou passer à Flink 1.18.   
Les connecteurs Apache Flink sont stockés dans leurs propres référentiels open source. Si vous effectuez une mise à niveau vers la version 1.18 ou ultérieure, vous devez mettre à jour vos dépendances. Pour accéder au référentiel des AWS connecteurs Apache Flink, consultez [flink-connector-aws](https://github.com/apache/flink-connector-aws).  
L'ancienne source Kinesis `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` n'est plus disponible et pourrait être supprimée dans une future version de Flink. Utilisez [plutôt Kinesis Source](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-source).  
Il n'y a aucune compatibilité entre les états `FlinkKinesisConsumer` et`KinesisStreamsSource`. Pour plus de détails, consultez la section [Migration de tâches existantes vers la nouvelle source Kinesis Streams](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#migrating-existing-jobs-to-new-kinesis-streams-source-from-kinesis-consumer) dans la documentation d'Apache Flink.  
 Voici les directives recommandées :   


**Améliorations de connecteurs**  

| Version Flink | Connecteur utilisé | Résolution | 
| --- | --- | --- | 
| 1,19, 1,20 | Source de Kinesis |  Lors de la mise à niveau vers Managed Service for Apache Flink versions 1.19 et 1.20, assurez-vous que vous utilisez le connecteur source Kinesis Data Streams le plus récent. Il doit s'agir de n'importe quelle version 5.0.0 ou ultérieure. Pour plus d'informations, consultez [Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/).  | 
| 1,19, 1,20 | Évier Kinesis |  Lors de la mise à niveau vers Managed Service for Apache Flink versions 1.19 et 1.20, assurez-vous que vous utilisez le connecteur récepteur Kinesis Data Streams le plus récent. Il doit s'agir de n'importe quelle version 5.0.0 ou ultérieure. Pour plus d'informations, consultez [Kinesis Streams Sink](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-sink).  | 
| 1,19, 1,20 | Source des flux DynamoDB |  Lors de la mise à niveau vers Managed Service for Apache Flink versions 1.19 et 1.20, assurez-vous que vous utilisez le connecteur source DynamoDB Streams le plus récent. Il doit s'agir de n'importe quelle version 5.0.0 ou ultérieure. Pour plus d'informations, consultez [Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/).  | 
| 1,19, 1,20 | Récepteur DynamoDB | Lors de la mise à niveau vers le service géré pour Apache Flink versions 1.19 et 1.20, assurez-vous que vous utilisez le connecteur récepteur DynamoDB le plus récent. Il doit s'agir de n'importe quelle version 5.0.0 ou ultérieure. Pour plus d'informations, consultez [Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/). | 
| 1,19, 1,20 | Évier Amazon SQS |  Lors de la mise à niveau vers Managed Service for Apache Flink versions 1.19 et 1.20, assurez-vous que vous utilisez le connecteur récepteur Amazon SQS le plus récent. Il doit s'agir de n'importe quelle version 5.0.0 ou ultérieure. Pour plus d'informations, consultez [Amazon SQS Sink.](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/sqs/)  | 
| 1,19, 1,20 | Service géré par Amazon pour Prometheus Sink |  Lors de la mise à niveau vers Managed Service for Apache Flink versions 1.19 et 1.20, assurez-vous d'utiliser le connecteur récepteur Amazon Managed Service for Prometheus le plus récent. Il doit s'agir de n'importe quelle version 1.0.0 ou ultérieure. Pour plus d'informations, consultez [Prometheus Sink](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/prometheus/).  | 

# Ajouter des sources de données de streaming au service géré pour Apache Flink
<a name="how-sources"></a>

Apache Flink fournit des connecteurs pour lire à partir de fichiers, de sockets, de collections et de sources personnalisées. Dans le code de votre application, vous utilisez une [source Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources) pour recevoir les données d’un flux. Cette section décrit les sources disponibles pour les services Amazon.

## Utiliser les flux de données Kinesis
<a name="input-streams"></a>

`KinesisStreamsSource`fournit des données de streaming à votre application à partir d'un flux de données Amazon Kinesis. 

### Créer une `KinesisStreamsSource`
<a name="input-streams-create"></a>

L’exemple de code suivant illustre la création d’un `KinesisStreamsSource` :

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

Pour plus d'informations sur l'utilisation d'un`KinesisStreamsSource`, consultez le connecteur [Amazon Kinesis Data Streams](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/) dans la documentation d'Apache Flink [et notre exemple KinesisConnectors public](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) sur Github.

### Créez un `KinesisStreamsSource` qui utilise un consommateur EFO
<a name="input-streams-efo"></a>

`KinesisStreamsSource`Il est désormais compatible avec [Enhanced Fan-Out (EFO)](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/). 

Si un client Kinesis utilise EFO, le service Kinesis Data Streams lui fournit sa propre bande passante dédiée, au lieu que le consommateur partage la bande passante fixe du flux avec les autres consommateurs lisant le flux.

Pour plus d'informations sur l'utilisation d'EFO avec les consommateurs Kinesis, [consultez FLIP-128 : Enhanced Fan Out](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers) for Kinesis Consumers. AWS 

Vous activez le consommateur EFO en définissant les paramètres suivants sur le consommateur Kinesis :
+ **READER\$1TYPE :** définissez ce paramètre **sur EFO** pour que votre application utilise un consommateur EFO pour accéder aux données Kinesis Data Stream. 
+ **EFO\$1CONSUMER\$1NAME :** définissez ce paramètre sur une valeur de chaîne unique parmi les consommateurs de ce flux. La réutilisation d’un nom de consommateur dans le même flux de données Kinesis entraînera la résiliation du client qui utilisait ce nom précédemment. 

Pour configurer un `KinesisStreamsSource` afin d’utiliser EFO, ajoutez les paramètres suivants au consommateur :

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

Pour un exemple de service géré pour une application Apache Flink utilisant un client EFO, consultez [notre exemple public de Kinesis Connectors](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) sur Github.

## Utiliser Amazon MSK
<a name="input-msk"></a>

La source `KafkaSource` fournit des données de streaming à votre application à partir d’une rubrique Amazon MSK. 

### Créer une `KafkaSource`
<a name="input-msk-create"></a>

L’exemple de code suivant illustre la création d’un `KafkaSource` :

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

Pour plus d’informations sur l’utilisation d’un `KafkaSource`, consultez [Réplication MSK](earlier.md#example-msk).

# Écrire des données à l'aide de récepteurs dans le service géré pour Apache Flink
<a name="how-sinks"></a>

Dans le code de votre application, vous pouvez utiliser n'importe quel connecteur [récepteur Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/) pour écrire dans des systèmes externes, y compris AWS des services tels que Kinesis Data Streams et DynamoDB.

Apache Flink fournit également des récepteurs pour les fichiers et les sockets, et vous pouvez implémenter des récepteurs personnalisés. Parmi les différents éviers pris en charge, les suivants sont fréquemment utilisés :

## Utiliser les flux de données Kinesis
<a name="sinks-streams"></a>

Apache Flink fournit des informations sur le connecteur [Kinesis Data Streams](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/) dans la documentation d’Apache Flink.

Pour un exemple d’application qui utilise un flux de données Kinesis pour l’entrée et la sortie, consultez [Tutoriel : Commencez à utiliser l' DataStream API dans Managed Service pour Apache Flink](getting-started.md).

## Utiliser Apache Kafka et Amazon Managed Streaming pour Apache Kafka (MSK)
<a name="sinks-MSK"></a>

Le [connecteur Apache Flink Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink) fournit un support complet pour la publication de données sur Apache Kafka et Amazon MSK, y compris des garanties « une seule fois ». Pour savoir comment écrire dans Kafka, consultez les [exemples de connecteurs Kafka](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors) dans la documentation d'Apache Flink.

## Utiliser Amazon S3
<a name="sinks-s3"></a>

Vous pouvez utiliser le `StreamingFileSink` Apache Flink pour écrire des objets dans un compartiment Amazon S3.

Pour un exemple sur la façon d’écrire des objets dans S3, consultez[Exemple : écriture dans un compartiment Amazon S3](earlier.md#examples-s3). 

## Utilisez Firehose
<a name="sinks-firehose"></a>

`FlinkKinesisFirehoseProducer`Il s'agit d'un récepteur Apache Flink fiable et évolutif permettant de stocker les résultats des applications à l'aide du service [Firehose](https://docs.aws.amazon.com/firehose/latest/dev/). Cette section décrit comment configurer un projet Maven pour créer et utiliser un `FlinkKinesisFirehoseProducer`.

**Topics**
+ [Créer une `FlinkKinesisFirehoseProducer`](#sinks-firehose-create)
+ [Exemple de code `FlinkKinesisFirehoseProducer`](#sinks-firehose-sample)

### Créer une `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-create"></a>

L’exemple de code suivant illustre la création d’un `FlinkKinesisFirehoseProducer` :

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### Exemple de code `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-sample"></a>

L'exemple de code suivant montre comment créer et configurer un flux de données Apache Flink `FlinkKinesisFirehoseProducer` et comment envoyer des données au service Firehose.

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

Pour un didacticiel complet sur l'utilisation du lavabo Firehose, voir. [Exemple : écrire dans Firehose](earlier.md#get-started-exercise-fh)

# Utiliser l'asynchrone I/O dans le service géré pour Apache Flink
<a name="how-async"></a>

Un I/O opérateur asynchrone enrichit les données de flux à l'aide d'une source de données externe telle qu'une base de données. Le service géré pour Apache Flink enrichit les événements du flux de manière asynchrone afin que les demandes puissent être groupées pour une plus grande efficacité. 

Pour plus d'informations, consultez la section [E/S asynchrones dans la documentation d'](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/asyncio/)Apache Flink.