

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 將 Delta Lake 叢集與 Flink 搭配使用
<a name="Deltacluster-flink"></a>

使用 Amazon EMR 6.11 版及更高版本，您可以將 Delta Lake 與 Flink 叢集搭配使用。下列範例使用 AWS CLI 在 Amazon EMR Flink 叢集上使用 Delta Lake。

**注意**  
當您將 Delta Lake 與 Flink 叢集搭配使用時，Amazon EMR 支援 Flink DataStream API。

## 建立 Delta Lake 叢集
<a name="Deltacreate-a-delta-cluster"></a>

1. 使用下列內容建立檔案 `delta_configurations.json`：

   ```
   [{"Classification":"delta-defaults",  
       "Properties":{"delta.enabled":"true"}}]
   ```

1. 使用下列組態建立叢集。將 `example Amazon S3 bucket path` 和 `subnet ID` 取代為您自己的值。

   ```
   aws emr create-cluster 
   --release-label emr-6.11.0   
   --applications Name=Flink  
   --configurations file://delta_configurations.json   
   --region us-east-1  --name My_Spark_Delta_Cluster  
   --log-uri  s3://amzn-s3-demo-bucket/  
   --instance-type m5.xlarge  
   --instance-count 3   
   --service-role EMR_DefaultRole_V2  
   --ec2-attributes  InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
   ```

## 初始化 Flink yarn 工作階段
<a name="Deltainit-flink-yarn"></a>

若要初始化 Flink yarn 工作階段，請執行下列命令：

```
flink-yarn-session -d
```

## 使用 Delta Lake 建置 Flink 作業
<a name="Deltabuild-flink-with-delta-lake"></a>

下列範例示範如何使用 sbt 或 Maven 透過 Delta Lake 建置 Flink 作業。

------
#### [ sbt ]

[sbt](https://www.scala-sbt.org/1.x/docs/index.html) 是用於 Scala 的建置工具，當您有小型專案時，只需很少的組態甚至無需組態即可使用。

```
libraryDependencies ++= Seq(
  "io.delta" %% "delta-flink" % deltaConnectorsVersion % "provided",
  "io.delta" %% "delta-standalone" % deltaConnectorsVersion % "provided",
  "org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-parquet" % flinkVersion % "provided",
  "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided",
  "org.apache.flink" % "flink-table-common" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-table-runtime" % flinkVersion % "provided")
```

------
#### [ Maven ]

[Maven](https://maven.apache.org) 是來自 Apache Software Foundation 的開放原始碼建置自動化工具。使用 Maven，您可以在 Amazon EMR 上使用 Delta Lake 建置、發布和部署 Flink 作業。

```
<project>
<properties>
    <scala.main.version>2.12</scala.main.version>
    <delta-connectors-version>0.6.0</delta-connectors-version>
    <flink-version>1.16.1</flink-version>
    <hadoop-version>3.1.0</hadoop-version>
</properties>

<dependencies>
    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-flink</artifactId>
        <version>$delta-connectors-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-standalone_$scala-main-version</artifactId>
        <version>$delta-connectors-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-parquet</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>$hadoop-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-runtime</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
</dependencies>
```

------

## 使用 Flink Datastream API 寫入至 Delta 資料表
<a name="Deltawrite-delta-table-with-flink-datastream-api"></a>

使用下列範例建立 DeltaSink 以寫入至具有 `deltaTablePath:` 的資料表

```
public static DataStream<RowData> createDeltaSink(
        DataStream<RowData> stream,
        String deltaTablePath,
        RowType rowType) {
    Configuration configuration = new Configuration();
    DeltaSink<RowData> deltaSink = DeltaSink
            .forRowData(
                    new org.apache.flink.core.fs.Path(deltaTablePath),
                    configuration,
                    rowType)
            .build();
    stream.sinkTo(deltaSink);
    return stream;
}
```

## 使用 Flink Datastream API 從 Delta 資料表中讀取
<a name="Deltaread-delta-table-with-flink-datastream-api"></a>

使用下列範例建立一個限制的 DeltaSource 以從具有 `deltaTablePath:` 的資料表中讀取

```
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
        StreamExecutionEnvironment env,
        String deltaTablePath) {
    Configuration configuration = new Configuration();
    DeltaSource<RowData> deltaSource = DeltaSource
            .forBoundedRowData(
                    new org.apache.flink.core.fs.Path(deltaTablePath),
                    configuration)
            .build();

    return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
```

## 為 Delta Lake 獨立版建立具有多叢集支援的接收器
<a name="Deltasink-creation-with-multi-cluster"></a>

使用下列範例建立 DeltaSink 以寫入至具有 `deltaTablePath` 和[多叢集支援](https://docs.delta.io/latest/delta-standalone.html#multi-cluster-setup)的資料表

```
public DataStream<RowData> createDeltaSink(
        DataStream<RowData> stream,
        String deltaTablePath) {
    Configuration configuration = new Configuration();
    configuration.set("spark.delta.logStore.s3.impl", "io.delta.storage.S3DynamoDBLogStore");
    configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", "delta_log");
    configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", "us-east-1");
        
    DeltaSink<RowData> deltaSink = DeltaSink
        .forRowData(
            new Path(deltaTablePath),
            configuration,
            rowType)
        .build();
    stream.sinkTo(deltaSink);
    return stream;
}
```

## 執行 Flink 作業
<a name="Deltarun-flink-job"></a>

使用下列命令來執行您的作業：

```
flink run FlinkJob.jar
```