

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Flink와 함께 Delta Lake 클러스터 사용
<a name="Deltacluster-flink"></a>

Amazon EMR 릴리스 6.11 이상에서는 Flink 클러스터에서 Delta Lake를 사용할 수 있습니다. 다음 예제에서는 AWS CLI 를 사용하여 Amazon EMR Flink 클러스터에서 Delta Lake로 작업합니다.

**참고**  
Amazon EMR은 Flink 클러스터에서 Delta Lake를 사용할 때 Flink DataStream API를 지원합니다.

## Delta Lake 클러스터 생성
<a name="Deltacreate-a-delta-cluster"></a>

1. 다음 콘텐츠가 포함된 `delta_configurations.json` 파일을 생성합니다.

   ```
   [{"Classification":"delta-defaults",  
       "Properties":{"delta.enabled":"true"}}]
   ```

1. 다음 구성을 사용하여 클러스터를 생성합니다. `example Amazon S3 bucket path` 및 `subnet ID`를 사용자 정보로 바꿉니다.

   ```
   aws emr create-cluster 
   --release-label emr-6.11.0   
   --applications Name=Flink  
   --configurations file://delta_configurations.json   
   --region us-east-1  --name My_Spark_Delta_Cluster  
   --log-uri  s3://amzn-s3-demo-bucket/  
   --instance-type m5.xlarge  
   --instance-count 3   
   --service-role EMR_DefaultRole_V2  
   --ec2-attributes  InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
   ```

## Flink yarn 세션 초기화
<a name="Deltainit-flink-yarn"></a>

Flink yarn 세션을 초기화하려면 다음 명령을 실행합니다.

```
flink-yarn-session -d
```

## Delta Lake를 사용하여 Flink 작업 빌드
<a name="Deltabuild-flink-with-delta-lake"></a>

다음 예제에서는 Delta Lake에서 sbt 또는 Maven을 사용해 Flink 작업을 빌드하는 방법을 보여줍니다.

------
#### [ sbt ]

[sbt](https://www.scala-sbt.org/1.x/docs/index.html)는 소규모 프로젝트의 경우 거의 또는 전혀 구성하지 않고도 사용할 수 있는 Scala용 빌드 도구입니다.

```
libraryDependencies ++= Seq(
  "io.delta" %% "delta-flink" % deltaConnectorsVersion % "provided",
  "io.delta" %% "delta-standalone" % deltaConnectorsVersion % "provided",
  "org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-parquet" % flinkVersion % "provided",
  "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided",
  "org.apache.flink" % "flink-table-common" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-table-runtime" % flinkVersion % "provided")
```

------
#### [ Maven ]

[Maven](https://maven.apache.org)은 Apache Software Foundation의 오픈 소스 빌드 자동화 도구입니다. Maven을 사용하면 Amazon EMR에서 Delta Lake를 사용하여 Flink 작업을 빌드, 게시 및 배포할 수 있습니다.

```
<project>
<properties>
    <scala.main.version>2.12</scala.main.version>
    <delta-connectors-version>0.6.0</delta-connectors-version>
    <flink-version>1.16.1</flink-version>
    <hadoop-version>3.1.0</hadoop-version>
</properties>

<dependencies>
    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-flink</artifactId>
        <version>$delta-connectors-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-standalone_$scala-main-version</artifactId>
        <version>$delta-connectors-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-parquet</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>$hadoop-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-runtime</artifactId>
        <version>$flink-version</version>
        <scope>provided</scope>
    </dependency>
</dependencies>
```

------

## Flink Datastream API를 사용하여 Delta 테이블에 쓰기
<a name="Deltawrite-delta-table-with-flink-datastream-api"></a>

다음 예제를 사용하여 `deltaTablePath:`를 통해 테이블에 쓰도록 DeltaSink 생성

```
public static DataStream<RowData> createDeltaSink(
        DataStream<RowData> stream,
        String deltaTablePath,
        RowType rowType) {
    Configuration configuration = new Configuration();
    DeltaSink<RowData> deltaSink = DeltaSink
            .forRowData(
                    new org.apache.flink.core.fs.Path(deltaTablePath),
                    configuration,
                    rowType)
            .build();
    stream.sinkTo(deltaSink);
    return stream;
}
```

## Flink Datastream API를 사용하여 Delta 테이블에서 읽기
<a name="Deltaread-delta-table-with-flink-datastream-api"></a>

다음 예제를 사용하여 `deltaTablePath:`를 통해 테이블에서 데이터를 읽도록 바인딩된 DeltaSource 생성

```
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
        StreamExecutionEnvironment env,
        String deltaTablePath) {
    Configuration configuration = new Configuration();
    DeltaSource<RowData> deltaSource = DeltaSource
            .forBoundedRowData(
                    new org.apache.flink.core.fs.Path(deltaTablePath),
                    configuration)
            .build();

    return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
```

## Delta Lake 독립 실행형에 대한 다중 클러스터 지원을 통해 싱크 생성
<a name="Deltasink-creation-with-multi-cluster"></a>

다음 예제를 사용하여 `deltaTablePath` 및 [다중 클러스터 지원](https://docs.delta.io/latest/delta-standalone.html#multi-cluster-setup)을 통해 테이블에 쓰도록 DeltaSink를 생성합니다.

```
public DataStream<RowData> createDeltaSink(
        DataStream<RowData> stream,
        String deltaTablePath) {
    Configuration configuration = new Configuration();
    configuration.set("spark.delta.logStore.s3.impl", "io.delta.storage.S3DynamoDBLogStore");
    configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", "delta_log");
    configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", "us-east-1");
        
    DeltaSink<RowData> deltaSink = DeltaSink
        .forRowData(
            new Path(deltaTablePath),
            configuration,
            rowType)
        .build();
    stream.sinkTo(deltaSink);
    return stream;
}
```

## Flink 작업 실행
<a name="Deltarun-flink-job"></a>

다음 명령을 사용하여 작업을 실행합니다.

```
flink run FlinkJob.jar
```