

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Verwenden Sie einen Delta Lake-Cluster mit Flink
<a name="Deltacluster-flink"></a>

Mit Amazon-EMR-Version 6.11 und höher können Sie Delta Lake mit Ihrem Flink-Cluster verwenden. In den folgenden Beispielen wird der verwendet AWS CLI , um mit Delta Lake auf einem Amazon EMR Flink-Cluster zu arbeiten.

**Anmerkung**  
Amazon EMR unterstützt die DataStream Flink-API, wenn Sie Delta Lake mit einem Flink-Cluster verwenden.

## Einen Delta-Lake-Cluster erstellen
<a name="Deltacreate-a-delta-cluster"></a>

1. Erstellen Sie eine Datei, `delta_configurations.json`, mit folgendem Inhalt:

   ```
   [{"Classification":"delta-defaults",  
       "Properties":{"delta.enabled":"true"}}]
   ```

1. Erstellen Sie einen Cluster mit der folgenden Konfiguration. Ersetzen Sie `example Amazon S3 bucket path` und `subnet ID` durch Ihre eigenen Werte.

   ```
   aws emr create-cluster 
   --release-label emr-6.11.0   
   --applications Name=Flink  
   --configurations file://delta_configurations.json   
   --region us-east-1  --name My_Spark_Delta_Cluster  
   --log-uri  s3://amzn-s3-demo-bucket/  
   --instance-type m5.xlarge  
   --instance-count 3   
   --service-role EMR_DefaultRole_V2  
   --ec2-attributes  InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
   ```

## Eine Flink-Yarn-Sitzung installieren
<a name="Deltainit-flink-yarn"></a>

Um eine Flink-Yarn-Sitzung zu initialisieren, führen Sie den folgenden Befehl aus:

```
flink-yarn-session -d
```

## Einen Flink-Auftrag mit Delta Lake erstellen
<a name="Deltabuild-flink-with-delta-lake"></a>

Die folgenden Beispiele zeigen, wie Sie mit SBT oder Maven für die Erstellung Ihres Flink-Auftrags mit Delta Lake verwenden.

------
#### [ sbt ]

[sbt](https://www.scala-sbt.org/1.x/docs/index.html) ist ein Build-Tool für Scala, das Sie mit wenig bis gar keiner Konfiguration verwenden können, wenn Sie kleine Projekte haben. 

```
libraryDependencies ++= Seq(
  "io.delta" %% "delta-flink" % deltaConnectorsVersion % "provided",
  "io.delta" %% "delta-standalone" % deltaConnectorsVersion % "provided",
  "org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-parquet" % flinkVersion % "provided",
  "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided",
  "org.apache.flink" % "flink-table-common" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-table-runtime" % flinkVersion % "provided")
```

------
#### [ Maven ]

[Maven](https://maven.apache.org) ist ein Open-Source-Tool zur Build-Automatisierung von der Apache Software Foundation. Mit Maven können Sie einen Flink-Auftrag mit Delta Lake auf Amazon EMR erstellen, veröffentlichen und bereitstellen.

```
<project>
<properties>
    <scala.main.version>2.12</scala.main.version>
    <delta-connectors-version>0.6.0</delta-connectors-version>
    <flink-version>1.16.1</flink-version>
    <hadoop-version>3.1.0</hadoop-version>
</properties>

<dependencies>
    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-flink</artifactId>
        <version>$delta-connectors-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-standalone_$scala-main-version</artifactId>
        <version>$delta-connectors-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-parquet</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>$hadoop-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-runtime</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
</dependencies>
```

------

## Mit der Flink-Datastream-API in eine Delta-Tabelle schreiben
<a name="Deltawrite-delta-table-with-flink-datastream-api"></a>

Verwenden Sie das folgende Beispiel, um eine DeltaSink zu erstellen, mit der Sie in die Tabelle schreiben können `deltaTablePath:`

```
public static DataStream<RowData> createDeltaSink(
        DataStream<RowData> stream,
        String deltaTablePath,
        RowType rowType) {
    Configuration configuration = new Configuration();
    DeltaSink<RowData> deltaSink = DeltaSink
            .forRowData(
                    new org.apache.flink.core.fs.Path(deltaTablePath),
                    configuration,
                    rowType)
            .build();
    stream.sinkTo(deltaSink);
    return stream;
}
```

## Mit der Flink-Datastream-API aus einer Delta-Tabelle lesen
<a name="Deltaread-delta-table-with-flink-datastream-api"></a>

Verwenden Sie das folgende Beispiel, um eine Grenze zu erstellen DeltaSource , die aus der Tabelle mit einem gelesen werden soll `deltaTablePath:`

```
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
        StreamExecutionEnvironment env,
        String deltaTablePath) {
    Configuration configuration = new Configuration();
    DeltaSource<RowData> deltaSource = DeltaSource
            .forBoundedRowData(
                    new org.apache.flink.core.fs.Path(deltaTablePath),
                    configuration)
            .build();

    return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
```

## Sink-Erstellung mit Multi-Cluster-Unterstützung für Delta Lake Standalone
<a name="Deltasink-creation-with-multi-cluster"></a>

Verwenden Sie das folgende Beispiel, um eine Tabelle DeltaSink zum Schreiben in eine Tabelle mit A `deltaTablePath` - und [Multi-Cluster-Unterstützung](https://docs.delta.io/latest/delta-standalone.html#multi-cluster-setup) zu erstellen:

```
public DataStream<RowData> createDeltaSink(
        DataStream<RowData> stream,
        String deltaTablePath) {
    Configuration configuration = new Configuration();
    configuration.set("spark.delta.logStore.s3.impl", "io.delta.storage.S3DynamoDBLogStore");
    configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", "delta_log");
    configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", "us-east-1");
        
    DeltaSink<RowData> deltaSink = DeltaSink
        .forRowData(
            new Path(deltaTablePath),
            configuration,
            rowType)
        .build();
    stream.sinkTo(deltaSink);
    return stream;
}
```

## Den Flink-Auftrag ausführen
<a name="Deltarun-flink-job"></a>

Den folgenden Befehl verwenden, um Ihren Job auszuführen:

```
flink run FlinkJob.jar
```