

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 将 Delta Lake 集群与 Flink 结合使用
<a name="Deltacluster-flink"></a>

从 Amazon EMR 6.11 版本开始，您可以将 Delta Lake 与您的 Flink 集群结合使用。以下示例使用在 Amazon EMR Flink 集群上使用 Delta Lake。 AWS CLI 

**注意**  
当你将 Delta Lake 与 Flink 集群配合使用时，亚马逊 EMR 支持 Flink DataStream API。

## 创建 Delta Lake 集群
<a name="Deltacreate-a-delta-cluster"></a>

1. 创建文件 `delta_configurations.json` 并输入以下内容：

   ```
   [{"Classification":"delta-defaults",  
       "Properties":{"delta.enabled":"true"}}]
   ```

1. 使用以下配置创建集群。在该 URL 中，将 `example Amazon S3 bucket path` 和 `subnet ID` 替换为您自己的值。

   ```
   aws emr create-cluster 
   --release-label emr-6.11.0   
   --applications Name=Flink  
   --configurations file://delta_configurations.json   
   --region us-east-1  --name My_Spark_Delta_Cluster  
   --log-uri  s3://amzn-s3-demo-bucket/  
   --instance-type m5.xlarge  
   --instance-count 3   
   --service-role EMR_DefaultRole_V2  
   --ec2-attributes  InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
   ```

## 初始化 Flink yarn 会话
<a name="Deltainit-flink-yarn"></a>

要初始化 Flink yarn 会话，请运行以下命令：

```
flink-yarn-session -d
```

## 使用 Delta Lake 创建 Flink 作业
<a name="Deltabuild-flink-with-delta-lake"></a>

以下示例展示如何使用 sbt 或 Maven 在 Delta Lake 中构建 Flink 作业。

------
#### [ sbt ]

[sbt](https://www.scala-sbt.org/1.x/docs/index.html) 是 Scala 的构建工具，当您处理小型项目时，只需很少甚至不需要配置即可使用。

```
libraryDependencies ++= Seq(
  "io.delta" %% "delta-flink" % deltaConnectorsVersion % "provided",
  "io.delta" %% "delta-standalone" % deltaConnectorsVersion % "provided",
  "org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-parquet" % flinkVersion % "provided",
  "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided",
  "org.apache.flink" % "flink-table-common" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-table-runtime" % flinkVersion % "provided")
```

------
#### [ Maven ]

[Maven](https://maven.apache.org) 是 Apache Software Foundation 推出的开源构建自动化工具。使用 Maven，您可以在 Amazon EMR 上使用 Delta Lake 构建、发布和部署 Flink 作业。

```
<project>
<properties>
    <scala.main.version>2.12</scala.main.version>
    <delta-connectors-version>0.6.0</delta-connectors-version>
    <flink-version>1.16.1</flink-version>
    <hadoop-version>3.1.0</hadoop-version>
</properties>

<dependencies>
    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-flink</artifactId>
        <version>$delta-connectors-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-standalone_$scala-main-version</artifactId>
        <version>$delta-connectors-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-parquet</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>$hadoop-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-runtime</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
</dependencies>
```

------

## 通过 Flink Datastream API 写入 Delta 表
<a name="Deltawrite-delta-table-with-flink-datastream-api"></a>

使用以下示例创建要写入 DeltaSink 到带有 a 的表中 `deltaTablePath:`

```
public static DataStream<RowData> createDeltaSink(
        DataStream<RowData> stream,
        String deltaTablePath,
        RowType rowType) {
    Configuration configuration = new Configuration();
    DeltaSink<RowData> deltaSink = DeltaSink
            .forRowData(
                    new org.apache.flink.core.fs.Path(deltaTablePath),
                    configuration,
                    rowType)
            .build();
    stream.sinkTo(deltaSink);
    return stream;
}
```

## 通过 Flink Datastream API 从 Delta 表中读取
<a name="Deltaread-delta-table-with-flink-datastream-api"></a>

使用以下示例创建要从表中读 DeltaSource 取的有界值，其中带有 `deltaTablePath:`

```
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
        StreamExecutionEnvironment env,
        String deltaTablePath) {
    Configuration configuration = new Configuration();
    DeltaSource<RowData> deltaSource = DeltaSource
            .forBoundedRowData(
                    new org.apache.flink.core.fs.Path(deltaTablePath),
                    configuration)
            .build();

    return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
```

## 使用对 Delta Lake 独立版的多集群支持创建接收器
<a name="Deltasink-creation-with-multi-cluster"></a>

使用以下示例创建 DeltaSink [支持`deltaTablePath`和多集群](https://docs.delta.io/latest/delta-standalone.html#multi-cluster-setup)的待写入表：

```
public DataStream<RowData> createDeltaSink(
        DataStream<RowData> stream,
        String deltaTablePath) {
    Configuration configuration = new Configuration();
    configuration.set("spark.delta.logStore.s3.impl", "io.delta.storage.S3DynamoDBLogStore");
    configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", "delta_log");
    configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", "us-east-1");
        
    DeltaSink<RowData> deltaSink = DeltaSink
        .forRowData(
            new Path(deltaTablePath),
            configuration,
            rowType)
        .build();
    stream.sinkTo(deltaSink);
    return stream;
}
```

## 运行 Flink 作业
<a name="Deltarun-flink-job"></a>

使用下列命令以运行您的作业：

```
flink run FlinkJob.jar
```