

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 將 Delta Lake 叢集與 Spark 搭配使用
<a name="Deltausing-cluster-spark"></a>

從 Amazon EMR 6.9.0 版開始，您可以將 Delta Lake 與 Spark 叢集搭配使用，而無需執行引導操作。對於 Amazon EMR 6.8.0 版及更低版本，您可以使用引導操作來預先安裝必要的依存項目。

下列範例使用 AWS CLI 在 Amazon EMR Spark 叢集上使用 Delta Lake。

若要將 Delta Lake on Amazon EMR 與 搭配使用 AWS Command Line Interface，請先建立叢集。如需有關如何使用 指定 Delta Lake 分類的資訊 AWS Command Line Interface，請參閱[在建立叢集 AWS Command Line Interface 時使用 提供組態](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps-create-cluster.html#emr-configure-apps-create-cluster-cli)，或在[建立叢集時使用 Java 開發套件提供組態](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps-create-cluster.html#emr-configure-apps-create-cluster-sdk)。

1. 使用下列內容建立檔案 `configurations.json`：

   ```
   [{"Classification":"delta-defaults",  "Properties":{"delta.enabled":"true"} }]
   ```

1. 使用下列組態建立叢集，並將範例 Amazon S3 **bucket path** 和 **subnet ID** 取代為您自己的值。

   ```
   aws emr create-cluster 
        --release-label  emr-6.9.0  
        --applications Name=Spark  
        --configurations file://delta_configurations.json   
        --region us-east-1  
        --name My_Spark_Delta_Cluster  
        --log-uri  s3://amzn-s3-demo-bucket/  
        --instance-type m5.xlarge  
        --instance-count 2   
        --service-role EMR_DefaultRole_V2  
        --ec2-attributes  InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
   ```

   或者，您可以建立 Amazon EMR 叢集和 Spark 應用程式，並將下列檔案作為 Spark 作業中的 JAR 依存項目：

   ```
   /usr/share/aws/delta/lib/delta-core.jar,
   /usr/share/aws/delta/lib/delta-storage.jar,    
   /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar
   ```
**注意**  
如果您使用 Amazon EMR 7.0.0 版或更新版本，請使用 `/usr/share/aws/delta/lib/delta-spark.jar`而非 `/usr/share/aws/delta/lib/delta-core.jar`。

   如需詳細資訊，請參閱[提交應用程式](https://spark.apache.org/docs/latest/submitting-applications.html#submitting-applications)。

   若要在 Spark 作業中包含 jar 依存項目，您可以將下列組態屬性新增至 Spark 應用程式：

   ```
   --conf “spark.jars=/usr/share/aws/delta/lib/delta-core.jar,
        /usr/share/aws/delta/lib/delta-storage.jar,
        /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"
   ```

   如需有關 Spark 作業相依性的詳細資訊，請參閱[相依性管理](https://spark.apache.org/docs/latest/running-on-kubernetes.html#dependency-management)。

   如果您使用 Amazon EMR 7.0.0 版或更新版本，請改為新增`/usr/share/aws/delta/lib/delta-spark.jar`組態。

   ```
   --conf “spark.jars=/usr/share/aws/delta/lib/delta-spark.jar,
        /usr/share/aws/delta/lib/delta-storage.jar,
        /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"
   ```

## 初始化 Delta Lake 的 Spark 工作階段
<a name="Deltainitialize-spark-session"></a>

下列範例顯示如何啟動互動式 Spark Shell、使用 Spark 提交，或透過 Amazon EMR Notebooks 在 Amazon EMR 上使用 Delta Lake。

------
#### [ spark-shell ]

1. 使用 SSH 連接至主節點。如需詳細資訊，請參閱《Amazon EMR 管理指南》**中的[使用 SSH 連接至主節點](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)。

1. 輸入以下命令啟動 Spark shell。若要使用 PySpark Shell，請將 `spark-shell` 取代為 `pyspark`。

   ```
   spark-shell \
      --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
      --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
   ```

   如果您執行 Amazon EMR 6.15.0 版或更新版本，您還必須使用下列組態，以使用根據 Lake Formation 搭配 Delta Lake 的精細存取控制。

   ```
   spark-shell \  
     --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension \  
     --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \  
     --conf spark.sql.catalog.spark_catalog.lf.managed=true
   ```

------
#### [ spark-submit ]

1. 使用 SSH 連接至主節點。如需詳細資訊，請參閱《Amazon EMR 管理指南》**中的[使用 SSH 連接至主節點](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)。

1. 輸入下列命令啟動 Delta Lake 的 Spark 工作階段。

   ```
   spark-submit  
   —conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" 
   —conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
   ```

   如果您執行 Amazon EMR 6.15.0 版或更新版本，您還必須使用下列組態，以使用根據 Lake Formation 搭配 Delta Lake 的精細存取控制。

   ```
   spark-submit \  `
   --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension 
   --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \  
   --conf spark.sql.catalog.spark_catalog.lf.managed=true
   ```

------
#### [ EMR Studio notebooks ]

若要使用 Amazon EMR Studio 筆記本初始化 Spark 工作階段，請使用 Amazon EMR Notebooks 中的 **%%configure** 魔法命令來設定 Spark 工作階段，如下列範例所示。如需詳細資訊，請參閱《Amazon EMR 管理指南》**中的[使用 EMR Notebooks 魔法](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-studio-magics.html#emr-magics)。

```
%%configure -f
{
  "conf": {
    "spark.sql.extensions":  "io.delta.sql.DeltaSparkSessionExtension",
     "spark.sql.catalog.spark_catalog":  "org.apache.spark.sql.delta.catalog.DeltaCatalog"
  }
}
```

如果您執行 Amazon EMR 6.15.0 版或更新版本，您還必須使用下列組態，以使用根據 Lake Formation 搭配 Delta Lake 的精細存取控制。

```
%%configure -f   
{
  "conf": {
    "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension",
    "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    "spark.sql.catalog.spark_catalog.lf.managed": "true"
  }
}
```

------

## 寫入至 Delta Lake 資料表
<a name="Deltawrite-to-table"></a>

下列範例顯示如何建立 DataFrame 並將其寫入為 Delta Lake 資料集。此範例顯示如何使用 Spark Shell 處理資料集，同時使用 SSH 作為預設 hadoop 使用者連接至主節點。

**注意**  
若要將程式碼範例貼到 Spark Shell 中，請在提示字元中鍵入 :paste、貼上範例，然後按 CTRL \$1 D。

------
#### [ PySpark ]

Spark 包含 Python 型 Shell `pyspark`，您可以使用該 Shell 來開發以 Python 撰寫之 Spark 程式的原型。如同使用 `spark-shell`，在主節點上調用 `pyspark`。

```
## Create a DataFrame
data =  spark.createDataFrame([("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101",  "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103",  "2015-01-01",  "2015-01-01T13:51:40.519832Z")],
["id", "creation_date",  "last_update_time"])

## Write a DataFrame as a Delta Lake dataset to the S3  location
spark.sql("""CREATE  TABLE IF NOT EXISTS delta_table (id string, creation_date string, 
last_update_time string)
USING delta location
's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'""");

data.writeTo("delta_table").append()
```

------
#### [ Scala ]

```
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
 
// Create a DataFrame
val data = Seq(("100",  "2015-01-01",  "2015-01-01T13:51:39.340396Z"),
("101",  "2015-01-01",  "2015-01-01T12:14:58.597216Z"),
("102",  "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103",  "2015-01-01",  "2015-01-01T13:51:40.519832Z")).toDF("id", "creation_date",  "last_update_time")

// Write a DataFrame as a Delta Lake dataset to the S3  location
spark.sql("""CREATE  TABLE IF NOT EXISTS delta_table (id string,
creation_date string,
last_update_time string)
USING delta location
's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'""");

data.write.format("delta").mode("append").saveAsTable("delta_table")
```

------
#### [ SQL ]

```
-- Create a Delta  Lake table with the S3 location
CREATE TABLE delta_table(id string,
creation_date string, 
last_update_time string)
USING delta LOCATION
's3://amzn-s3-demo-bucket/example-prefix/db/delta_table';

-- insert data into the table
INSERT INTO delta_table VALUES  ("100", "2015-01-01",  "2015-01-01T13:51:39.340396Z"),
("101",  "2015-01-01",  "2015-01-01T12:14:58.597216Z"),
("102",  "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103",  "2015-01-01", "2015-01-01T13:51:40.519832Z");
```

------

## 從 Delta Lake 資料表中讀取
<a name="Deltaread-from-table"></a>

------
#### [ PySpark ]

```
ddf = spark.table("delta_table")
ddf.show()
```

------
#### [ Scala ]

```
val ddf =  spark.table("delta_table")
ddf.show()
```

------
#### [ SQL ]

```
SELECT * FROM delta_table;
```

------