

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Utiliser un cluster Delta Lake avec Flink
<a name="Deltacluster-flink"></a>

Avec Amazon EMR version 6.11 ou ultérieure, vous pouvez utiliser Delta Lake avec votre cluster Flink. Les exemples suivants utilisent le AWS CLI pour travailler avec Delta Lake sur un cluster Amazon EMR Flink.

**Note**  
Amazon EMR prend en charge l' DataStream API Flink lorsque vous utilisez Delta Lake avec un cluster Flink.

## Création d'un cluster Delta Lake
<a name="Deltacreate-a-delta-cluster"></a>

1. Créez un fichier, `delta_configurations.json`, contenant les éléments suivants :

   ```
   [{"Classification":"delta-defaults",  
       "Properties":{"delta.enabled":"true"}}]
   ```

1. Créez un cluster avec la configuration suivante. Remplacez `example Amazon S3 bucket path` et `subnet ID` par les vôtres.

   ```
   aws emr create-cluster 
   --release-label emr-6.11.0   
   --applications Name=Flink  
   --configurations file://delta_configurations.json   
   --region us-east-1  --name My_Spark_Delta_Cluster  
   --log-uri  s3://amzn-s3-demo-bucket/  
   --instance-type m5.xlarge  
   --instance-count 3   
   --service-role EMR_DefaultRole_V2  
   --ec2-attributes  InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
   ```

## Initialisation d'une session Flink yarn
<a name="Deltainit-flink-yarn"></a>

Pour initialiser une session Flink yarn, exécutez la commande suivante :

```
flink-yarn-session -d
```

## Créer une tâche Flink avec Delta Lake
<a name="Deltabuild-flink-with-delta-lake"></a>

Les exemples suivants montrent comment utiliser sbt ou Maven pour créer votre tâche Flink avec Delta Lake.

------
#### [ sbt ]

[sbt](https://www.scala-sbt.org/1.x/docs/index.html) est un outil de construction pour Scala que vous pouvez utiliser avec peu ou pas de configuration lorsque vous avez de petits projets. 

```
libraryDependencies ++= Seq(
  "io.delta" %% "delta-flink" % deltaConnectorsVersion % "provided",
  "io.delta" %% "delta-standalone" % deltaConnectorsVersion % "provided",
  "org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-parquet" % flinkVersion % "provided",
  "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided",
  "org.apache.flink" % "flink-table-common" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-table-runtime" % flinkVersion % "provided")
```

------
#### [ Maven ]

[Maven](https://maven.apache.org) est un outil d'automatisation de build open source de l'Apache Software Foundation. Avec Maven, vous pouvez créer, publier et déployer une tâche Flink avec Delta Lake sur Amazon EMR.

```
<project>
<properties>
    <scala.main.version>2.12</scala.main.version>
    <delta-connectors-version>0.6.0</delta-connectors-version>
    <flink-version>1.16.1</flink-version>
    <hadoop-version>3.1.0</hadoop-version>
</properties>

<dependencies>
    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-flink</artifactId>
        <version>$delta-connectors-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-standalone_$scala-main-version</artifactId>
        <version>$delta-connectors-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-parquet</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>$hadoop-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-runtime</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
</dependencies>
```

------

## Écrire dans une table Delta avec l'API Flink Datastream
<a name="Deltawrite-delta-table-with-flink-datastream-api"></a>

Utilisez l'exemple suivant pour créer un DeltaSink pour écrire dans la table avec un `deltaTablePath:`

```
public static DataStream<RowData> createDeltaSink(
        DataStream<RowData> stream,
        String deltaTablePath,
        RowType rowType) {
    Configuration configuration = new Configuration();
    DeltaSink<RowData> deltaSink = DeltaSink
            .forRowData(
                    new org.apache.flink.core.fs.Path(deltaTablePath),
                    configuration,
                    rowType)
            .build();
    stream.sinkTo(deltaSink);
    return stream;
}
```

## Lire à partir d'une table Delta avec l'API Flink Datastream
<a name="Deltaread-delta-table-with-flink-datastream-api"></a>

Utilisez l'exemple suivant pour créer un borné DeltaSource à lire dans le tableau avec un `deltaTablePath:`

```
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
        StreamExecutionEnvironment env,
        String deltaTablePath) {
    Configuration configuration = new Configuration();
    DeltaSource<RowData> deltaSource = DeltaSource
            .forBoundedRowData(
                    new org.apache.flink.core.fs.Path(deltaTablePath),
                    configuration)
            .build();

    return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
```

## Création de récepteurs avec prise en charge de plusieurs clusters pour le système autonome de Delta Lake
<a name="Deltasink-creation-with-multi-cluster"></a>

Utilisez l'exemple suivant pour créer une table DeltaSink à écrire avec un `deltaTablePath` [support multi-clusters](https://docs.delta.io/latest/delta-standalone.html#multi-cluster-setup) :

```
public DataStream<RowData> createDeltaSink(
        DataStream<RowData> stream,
        String deltaTablePath) {
    Configuration configuration = new Configuration();
    configuration.set("spark.delta.logStore.s3.impl", "io.delta.storage.S3DynamoDBLogStore");
    configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", "delta_log");
    configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", "us-east-1");
        
    DeltaSink<RowData> deltaSink = DeltaSink
        .forRowData(
            new Path(deltaTablePath),
            configuration,
            rowType)
        .build();
    stream.sinkTo(deltaSink);
    return stream;
}
```

## Exécuter la tâche Flink
<a name="Deltarun-flink-job"></a>

Utilisez la commande suivante pour exécuter votre tâche :

```
flink run FlinkJob.jar
```