

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Spark와 함께 Delta Lake 클러스터 사용
<a name="Deltausing-cluster-spark"></a>

Amazon EMR 버전 6.9.0부터 부트스트랩 작업을 포함할 필요 없이 Spark 클러스터에서 Delta Lake를 사용할 수 있습니다. Amazon EMR 버전 6.8.0 이하의 경우 부트스트랩 작업을 사용하여 필요한 모든 종속 항목을 사전 설치할 수 있습니다.

다음 예제에서는 AWS CLI 를 사용하여 Amazon EMR Spark 클러스터에서 Delta Lake로 작업합니다.

에서 Amazon EMR의 Delta Lake를 사용하려면 AWS Command Line Interface먼저 클러스터를 생성합니다. Delta Lake 분류를 지정하는 방법에 대한 자세한 내용은 [클러스터를 생성할 AWS Command Line Interface 때를 사용하여 구성 제공](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps-create-cluster.html#emr-configure-apps-create-cluster-cli) 또는 클러스터를 생성할 때 Java SDK로 구성 제공을 AWS Command Line Interface참조하세요. [https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps-create-cluster.html#emr-configure-apps-create-cluster-sdk](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps-create-cluster.html#emr-configure-apps-create-cluster-sdk) 

1. 다음 콘텐츠가 포함된 `configurations.json` 파일을 생성합니다.

   ```
   [{"Classification":"delta-defaults",  "Properties":{"delta.enabled":"true"} }]
   ```

1. 다음과 같은 구성으로 클러스터를 생성하고 Amazon S3 **bucket path** 및 **subnet ID** 예제를 사용자 정보로 바꿉니다.

   ```
   aws emr create-cluster 
        --release-label  emr-6.9.0  
        --applications Name=Spark  
        --configurations file://delta_configurations.json   
        --region us-east-1  
        --name My_Spark_Delta_Cluster  
        --log-uri  s3://amzn-s3-demo-bucket/  
        --instance-type m5.xlarge  
        --instance-count 2   
        --service-role EMR_DefaultRole_V2  
        --ec2-attributes  InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
   ```

   또는 Spark 작업에서 다음 파일을 JAR 종속 항목으로 사용하여 Amazon EMR 클러스터와 Spark 애플리케이션을 생성할 수 있습니다.

   ```
   /usr/share/aws/delta/lib/delta-core.jar,
   /usr/share/aws/delta/lib/delta-storage.jar,    
   /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar
   ```
**참고**  
Amazon EMR 릴리스 7.0.0 이상을 사용하는 경우 `/usr/share/aws/delta/lib/delta-spark.jar` 대신를 사용합니다`/usr/share/aws/delta/lib/delta-core.jar`.

   자세한 내용은 [애플리케이션 제출](https://spark.apache.org/docs/latest/submitting-applications.html#submitting-applications)을 참조하세요.

   jar를 Spark 작업에 jar 종속 항목으로 포함하려면 Spark 애플리케이션에 다음 구성 속성을 추가할 수 있습니다.

   ```
   --conf “spark.jars=/usr/share/aws/delta/lib/delta-core.jar,
        /usr/share/aws/delta/lib/delta-storage.jar,
        /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"
   ```

   Spark 작업 종속 항목에 대한 자세한 내용은 [Dependency Management](https://spark.apache.org/docs/latest/running-on-kubernetes.html#dependency-management)를 참조하세요.

   Amazon EMR 릴리스 7.0.0 이상을 사용하는 경우 대신 `/usr/share/aws/delta/lib/delta-spark.jar` 구성을 추가합니다.

   ```
   --conf “spark.jars=/usr/share/aws/delta/lib/delta-spark.jar,
        /usr/share/aws/delta/lib/delta-storage.jar,
        /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"
   ```

## Delta Lake용 Spark 세션 초기화
<a name="Deltainitialize-spark-session"></a>

다음 예제에서는 대화식 Spark 쉘을 시작하거나 Spark 제출을 사용하거나 Amazon EMR에서 Delta Lake를 작업하기 위해 Amazon EMR Notebooks를 사용하는 방법을 보여줍니다.

------
#### [ spark-shell ]

1. SSH를 사용하여 프라이머리 노드에 연결합니다. 자세한 내용은 *Amazon EMR 관리 안내서*에서 [SSH를 사용하여 프라이머리 노드에 연결](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)을 참조하세요.

1. Spark 셸을 시작하려면 다음 명령을 입력합니다. PySpark 쉘을 사용하려면 `spark-shell`을 `pyspark`로 바꿉니다.

   ```
   spark-shell \
      --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
      --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
   ```

   Amazon EMR 릴리스 6.15.0 이상을 실행하는 경우 다음 구성을 사용하여 Delta Lake에서 Lake Formation을 기반으로 세분화된 액세스 제어를 사용해야 합니다.

   ```
   spark-shell \  
     --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension \  
     --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \  
     --conf spark.sql.catalog.spark_catalog.lf.managed=true
   ```

------
#### [ spark-submit ]

1. SSH를 사용하여 프라이머리 노드에 연결합니다. 자세한 내용은 *Amazon EMR 관리 안내서*에서 [SSH를 사용하여 프라이머리 노드에 연결](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)을 참조하세요.

1. Delta Lake용 Spark 세션을 시작하려면 다음 명령을 입력합니다.

   ```
   spark-submit  
   —conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" 
   —conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
   ```

   Amazon EMR 릴리스 6.15.0 이상을 실행하는 경우 다음 구성을 사용하여 Delta Lake에서 Lake Formation을 기반으로 세분화된 액세스 제어를 사용해야 합니다.

   ```
   spark-submit \  `
   --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension 
   --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \  
   --conf spark.sql.catalog.spark_catalog.lf.managed=true
   ```

------
#### [ EMR Studio notebooks ]

Amazon EMR Studio 노트북을 사용하여 Spark 세션을 초기화하려면 다음 예제와 같이 Amazon EMR Notebooks에서 **%%configure** 매직 명령을 사용하여 Spark 세션을 구성합니다. 자세한 내용은 *Amazon EMR 관리 안내서*에서 [EMR Notebooks 매직 사용](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-studio-magics.html#emr-magics)을 참조하세요.

```
%%configure -f
{
  "conf": {
    "spark.sql.extensions":  "io.delta.sql.DeltaSparkSessionExtension",
     "spark.sql.catalog.spark_catalog":  "org.apache.spark.sql.delta.catalog.DeltaCatalog"
  }
}
```

Amazon EMR 릴리스 6.15.0 이상을 실행하는 경우 다음 구성을 사용하여 Delta Lake에서 Lake Formation을 기반으로 세분화된 액세스 제어를 사용해야 합니다.

```
%%configure -f   
{
  "conf": {
    "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension",
    "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    "spark.sql.catalog.spark_catalog.lf.managed": "true"
  }
}
```

------

## Delta Lake 테이블에 쓰기
<a name="Deltawrite-to-table"></a>

다음 예제에서는 Spark DataFrame을 생성하고 Delta Lake 데이터 세트로 쓰는 방법을 보여줍니다. 이 예제에서는 SSH를 기본 Hadoop 사용자로 사용하여 프라이머리 노드에 연결된 상태에서 Spark 쉘로 데이터 세트를 처리하는 방법을 보여줍니다.

**참고**  
코드 샘플을 Spark 쉘에 붙여넣으려면 프롬프트에 :paste를 입력하고 예제를 붙여넣은 다음 CTRL \$1 D를 누릅니다.

------
#### [ PySpark ]

Spark에는 Python 기반 쉘인 `pyspark`도 포함되어 있으며, 이 쉘을 사용하여 Python에서 작성된 Spark 프로그램을 시제품화할 수 있습니다. `spark-shell`과 마찬가지로 프라이머리 노드에서 `pyspark`를 간접 호출합니다.

```
## Create a DataFrame
data =  spark.createDataFrame([("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101",  "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103",  "2015-01-01",  "2015-01-01T13:51:40.519832Z")],
["id", "creation_date",  "last_update_time"])

## Write a DataFrame as a Delta Lake dataset to the S3  location
spark.sql("""CREATE  TABLE IF NOT EXISTS delta_table (id string, creation_date string, 
last_update_time string)
USING delta location
's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'""");

data.writeTo("delta_table").append()
```

------
#### [ Scala ]

```
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
 
// Create a DataFrame
val data = Seq(("100",  "2015-01-01",  "2015-01-01T13:51:39.340396Z"),
("101",  "2015-01-01",  "2015-01-01T12:14:58.597216Z"),
("102",  "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103",  "2015-01-01",  "2015-01-01T13:51:40.519832Z")).toDF("id", "creation_date",  "last_update_time")

// Write a DataFrame as a Delta Lake dataset to the S3  location
spark.sql("""CREATE  TABLE IF NOT EXISTS delta_table (id string,
creation_date string,
last_update_time string)
USING delta location
's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'""");

data.write.format("delta").mode("append").saveAsTable("delta_table")
```

------
#### [ SQL ]

```
-- Create a Delta  Lake table with the S3 location
CREATE TABLE delta_table(id string,
creation_date string, 
last_update_time string)
USING delta LOCATION
's3://amzn-s3-demo-bucket/example-prefix/db/delta_table';

-- insert data into the table
INSERT INTO delta_table VALUES  ("100", "2015-01-01",  "2015-01-01T13:51:39.340396Z"),
("101",  "2015-01-01",  "2015-01-01T12:14:58.597216Z"),
("102",  "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103",  "2015-01-01", "2015-01-01T13:51:40.519832Z");
```

------

## Delta Lake 테이블에서 읽기
<a name="Deltaread-from-table"></a>

------
#### [ PySpark ]

```
ddf = spark.table("delta_table")
ddf.show()
```

------
#### [ Scala ]

```
val ddf =  spark.table("delta_table")
ddf.show()
```

------
#### [ SQL ]

```
SELECT * FROM delta_table;
```

------