

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Spark で Delta Lake クラスターを使用する
<a name="Deltausing-cluster-spark"></a>

Amazon EMR バージョン 6.9.0 以降では、ブートストラップアクションを追加しなくても、Spark クラスターで Delta Lake を使用できます。Amazon EMR リリース 6.8.0 以前の場合、ブートストラップアクションを使用することで、依存関係上必要なファイルを事前にインストールできます。

次の例では AWS CLI 、 を使用して Amazon EMR Spark クラスターで Delta Lake を操作します。

で Amazon EMR で Delta Lake を使用するには AWS Command Line Interface、まずクラスターを作成します。で Delta Lake 分類を指定する方法については AWS Command Line Interface、[「クラスターの作成 AWS Command Line Interface 時に を使用して設定](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps-create-cluster.html#emr-configure-apps-create-cluster-cli)を指定する」または「クラスターの作成[時に Java SDK を使用して設定を指定する](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps-create-cluster.html#emr-configure-apps-create-cluster-sdk)」を参照してください。

1. 次のコンテンツを含む `configurations.json` ファイルを作成します。

   ```
   [{"Classification":"delta-defaults",  "Properties":{"delta.enabled":"true"} }]
   ```

1. 次のように設定してクラスターを作成し、サンプルの Amazon S3 **bucket path** と **subnet ID** を実際の値に置き換えます。

   ```
   aws emr create-cluster 
        --release-label  emr-6.9.0  
        --applications Name=Spark  
        --configurations file://delta_configurations.json   
        --region us-east-1  
        --name My_Spark_Delta_Cluster  
        --log-uri  s3://amzn-s3-demo-bucket/  
        --instance-type m5.xlarge  
        --instance-count 2   
        --service-role EMR_DefaultRole_V2  
        --ec2-attributes  InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
   ```

   次のファイルを、Spark ジョブに依存関係上必要な jar として使用することで、Amazon EMR クラスターと Spark アプリケーションを構築できます。

   ```
   /usr/share/aws/delta/lib/delta-core.jar,
   /usr/share/aws/delta/lib/delta-storage.jar,    
   /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar
   ```
**注記**  
Amazon EMR リリース 7.0.0 以降を使用する場合は、 `/usr/share/aws/delta/lib/delta-spark.jar`の代わりに を使用します`/usr/share/aws/delta/lib/delta-core.jar`。

   詳細については、「[Submitting Applications](https://spark.apache.org/docs/latest/submitting-applications.html#submitting-applications)」を参照してください。

   Spark ジョブに依存関係上必要な jar を指定するには、次の設定プロパティを Spark アプリケーションに追加します。

   ```
   --conf “spark.jars=/usr/share/aws/delta/lib/delta-core.jar,
        /usr/share/aws/delta/lib/delta-storage.jar,
        /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"
   ```

   [Spark ジョブの依存関係については、「Dependency Management](https://spark.apache.org/docs/latest/running-on-kubernetes.html#dependency-management)」を参照してください。

   Amazon EMR リリース 7.0.0 以降を使用する場合は、代わりに `/usr/share/aws/delta/lib/delta-spark.jar`設定を追加します。

   ```
   --conf “spark.jars=/usr/share/aws/delta/lib/delta-spark.jar,
        /usr/share/aws/delta/lib/delta-storage.jar,
        /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"
   ```

## Delta Lake の Spark セッションを初期化する
<a name="Deltainitialize-spark-session"></a>

次の例は、インタラクティブな Spark シェルを起動する方法、Spark 送信を使用する方法、Amazon EMR Notebooks を使用して Amazon EMR の Delta Lake を操作する方法をそれぞれ示しています。

------
#### [ spark-shell ]

1. SSH を使用してプライマリノードに接続します。詳細については、「*Amazon EMR 管理ガイド*」の「[Connect to the primary node using SSH](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)」を参照してください。

1. 以下のコマンドを入力して、Spark シェルを起動します。PySpark シェルを使用するには、`spark-shell` を `pyspark` に置き換えます。

   ```
   spark-shell \
      --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
      --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
   ```

   Amazon EMR リリース 6.15.0 以降を実行する場合は、次の設定を使用して、Delta Lake での Lake Formation に基づくきめ細かなアクセスコントロールを使用する必要があります。

   ```
   spark-shell \  
     --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension \  
     --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \  
     --conf spark.sql.catalog.spark_catalog.lf.managed=true
   ```

------
#### [ spark-submit ]

1. SSH を使用してプライマリノードに接続します。詳細については、「*Amazon EMR 管理ガイド*」の「[Connect to the primary node using SSH](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)」を参照してください。

1. Delta Lake の Spark セッションを起動するには、次のコマンドを入力します。

   ```
   spark-submit  
   —conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" 
   —conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
   ```

   Amazon EMR リリース 6.15.0 以降を実行する場合は、次の設定を使用して、Delta Lake での Lake Formation に基づくきめ細かなアクセスコントロールを使用する必要があります。

   ```
   spark-submit \  `
   --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension 
   --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \  
   --conf spark.sql.catalog.spark_catalog.lf.managed=true
   ```

------
#### [ EMR Studio notebooks ]

Amazon EMR Studio ノートブックを使用して Spark セッションを初期化するには、次の例のように、Amazon EMR Notebooks で **%%configure** マジックコマンドを使用して Spark セッションを設定します。詳細については、「*Amazon EMR 管理ガイド*」の「[Use EMR Notebooks magics](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-studio-magics.html#emr-magics)」を参照してください。

```
%%configure -f
{
  "conf": {
    "spark.sql.extensions":  "io.delta.sql.DeltaSparkSessionExtension",
     "spark.sql.catalog.spark_catalog":  "org.apache.spark.sql.delta.catalog.DeltaCatalog"
  }
}
```

Amazon EMR リリース 6.15.0 以降を実行する場合は、次の設定を使用して、Delta Lake での Lake Formation に基づくきめ細かなアクセスコントロールを使用する必要があります。

```
%%configure -f   
{
  "conf": {
    "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension",
    "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    "spark.sql.catalog.spark_catalog.lf.managed": "true"
  }
}
```

------

## Delta Lake テーブルに書き込む
<a name="Deltawrite-to-table"></a>

次の例は、DataFrame を作成し、それを Delta Lake データセットとして書き込む方法を示しています。また、デフォルトの Hadoop ユーザーとして、プライマリノードに SSH 接続し、Spark シェルでデータセットを操作する方法を示しています。

**注記**  
コードサンプルを Spark シェルに貼り付けるには、プロンプトで「:paste」と入力し、例を貼り付けて、[CTRL \$1 D] を押します。

------
#### [ PySpark ]

Spark には、Python ベースのシェルである `pyspark` も用意されており、Python で記述した Spark プログラムのプロトタイプ作成に使用できます。`spark-shell` の場合と同様に、プライマリノードで `pyspark` を呼び出します。

```
## Create a DataFrame
data =  spark.createDataFrame([("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101",  "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103",  "2015-01-01",  "2015-01-01T13:51:40.519832Z")],
["id", "creation_date",  "last_update_time"])

## Write a DataFrame as a Delta Lake dataset to the S3  location
spark.sql("""CREATE  TABLE IF NOT EXISTS delta_table (id string, creation_date string, 
last_update_time string)
USING delta location
's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'""");

data.writeTo("delta_table").append()
```

------
#### [ Scala ]

```
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
 
// Create a DataFrame
val data = Seq(("100",  "2015-01-01",  "2015-01-01T13:51:39.340396Z"),
("101",  "2015-01-01",  "2015-01-01T12:14:58.597216Z"),
("102",  "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103",  "2015-01-01",  "2015-01-01T13:51:40.519832Z")).toDF("id", "creation_date",  "last_update_time")

// Write a DataFrame as a Delta Lake dataset to the S3  location
spark.sql("""CREATE  TABLE IF NOT EXISTS delta_table (id string,
creation_date string,
last_update_time string)
USING delta location
's3://amzn-s3-demo-bucket/example-prefix/db/delta_table'""");

data.write.format("delta").mode("append").saveAsTable("delta_table")
```

------
#### [ SQL ]

```
-- Create a Delta  Lake table with the S3 location
CREATE TABLE delta_table(id string,
creation_date string, 
last_update_time string)
USING delta LOCATION
's3://amzn-s3-demo-bucket/example-prefix/db/delta_table';

-- insert data into the table
INSERT INTO delta_table VALUES  ("100", "2015-01-01",  "2015-01-01T13:51:39.340396Z"),
("101",  "2015-01-01",  "2015-01-01T12:14:58.597216Z"),
("102",  "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103",  "2015-01-01", "2015-01-01T13:51:40.519832Z");
```

------

## Delta Lake テーブルから読み取る
<a name="Deltaread-from-table"></a>

------
#### [ PySpark ]

```
ddf = spark.table("delta_table")
ddf.show()
```

------
#### [ Scala ]

```
val ddf =  spark.table("delta_table")
ddf.show()
```

------
#### [ SQL ]

```
SELECT * FROM delta_table;
```

------