

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# EMRFS S3 向けに最適化されたコミットプロトコルの要件
<a name="emr-spark-commit-protocol-reqs"></a>

以下の条件が満たされる場合に、EMRFS S3 向けに最適化されたコミットプロトコルが使用されます。
+ Spark、DataFrame、または Dataset によりパーティション化されたテーブルを書き込む Spark ジョブを実行する。
+ パーティション上書きモードが `dynamic` である Spark ジョブを実行する。
+ Amazon EMR でマルチパートアップロードが有効になっている。これがデフォルトです。詳細については、「[EMRFS S3 向けに最適化されたコミットプロトコルとマルチパートアップロード](emr-spark-commit-protocol-multipart.md)」を参照してください。
+ EMRFS のファイルシステムキャッシュは有効になっています。これがデフォルトです。`fs.s3.impl.disable.cache` 設定が `false` に設定されていることを確認します。
+ Spark のビルトインデータソースサポートが使用されています。組み込みデータソースサポートは以下の状況で使用されます。
  + ジョブでビルトインのデータソースまたはテーブルに書き込む場合。
  + ジョブで Hive メタストア Parquet テーブルに書き込む場合。これは `spark.sql.hive.convertInsertingPartitionedTable` と `spark.sql.hive.convertMetastoreParquet` が両方とも true に設定されている場合に発生します。これらはデフォルトの設定です。
  + ジョブで Hive メタストア ORC テーブルに書き込む場合。これは、`spark.sql.hive.convertInsertingPartitionedTable` と `spark.sql.hive.convertMetastoreOrc` が両方 `true` に設定されている場合に発生します。これらはデフォルトの設定です。
+ デフォルトのパーティションの場所 (`${table_location}/k1=v1/k2=v2/` など) に書き込む Spark ジョブオペレーションでコミットプロトコルが使用される。ジョブオペレーションによってカスタムのパーティション場所に書き込まれる場合、例えば、カスタムのパーティション場所が `ALTER TABLE SQL` コマンドを使用して設定されている場合、プロトコルは使用されません。
+ Spark で以下の値を使用する。
  + `spark.sql.sources.commitProtocolClass` を `org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol` に設定する必要があります。これは、Amazon EMR リリース 5.30.0 以降および 6.2.0 以降のデフォルト設定です。
  + `partitionOverwriteMode` 書き込みオプションまたは `spark.sql.sources.partitionOverwriteMode` は `dynamic` に設定する必要があります。デフォルトの設定は `static` です。
**注記**  
`partitionOverwriteMode` 書き込みオプションは Spark 2.4.0 で導入されました。Amazon EMR リリース 5.19.0 に含まれている Spark バージョン 2.3.2 では、`spark.sql.sources.partitionOverwriteMode` プロパティを設定します。
  + Spark ジョブが Hive メタストアの Parquet テーブルに上書きされる場合は、`spark.sql.hive.convertMetastoreParquet`、`spark.sql.hive.convertInsertingPartitionedTable` および `spark.sql.hive.convertMetastore.partitionOverwriteMode` を `true` に設定する必要があります。これらはデフォルトの設定です。
  + Spark ジョブが Hive メタストアの ORC テーブルに上書きされる場合は、`spark.sql.hive.convertMetastoreOrc`、`spark.sql.hive.convertInsertingPartitionedTable` および `spark.sql.hive.convertMetastore.partitionOverwriteMode` を `true` に設定する必要があります。これらはデフォルトの設定です。

**Example - 動的パーティション上書きモード**  
この Scala の例では、最適化がトリガーされます。まず、`partitionOverwriteMode` プロパティを `dynamic` に設定します。これにより、データを書き込んでいるパーティションのみが上書きされます。次に、動的パーティション列が `partitionBy` で指定され、書き込みモードが `overwrite` に設定されます。  

```
val dataset = spark.range(0, 10)
  .withColumn("dt", expr("date_sub(current_date(), id)"))

dataset.write.mode("overwrite")                 // "overwrite" instead of "insert"
  .option("partitionOverwriteMode", "dynamic")  // "dynamic" instead of "static"  
  .partitionBy("dt")                            // partitioned data instead of unpartitioned data
  .parquet("s3://amzn-s3-demo-bucket1/output")    // "s3://" to use Amazon EMR file system, instead of "s3a://" or "hdfs://"
```

## EMRFS S3 向けに最適化されたコミットプロトコルが使用されない場合
<a name="emr-spark-commit-protocol-reqs-anti"></a>

一般的に、EMRFS S3 向けに最適化されたコミットプロトコルは、オープンソースのデフォルトの Spark コミットプロトコル `org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol` と同じように機能します。次の状況では、最適化は行われません。


****  

| 状況 | コミットプロトコルが使用されない理由 | 
| --- | --- | 
| HDFS に書き込む場合 | コミットプロトコルは、EMRFS を使用した Amazon S3 への書き込みのみをサポートします。 | 
| S3A ファイルシステムを使用する場合 | コミットプロトコルは EMRFS のみをサポートします。 | 
| MapReduce または Spark の RDD API を使用する場合 | コミットプロトコルは、SparkSQL、DataFrame、またはデータセット API の使用のみをサポートします。 | 
| 動的パーティションの上書きがトリガーされない場合 | コミットプロトコルは、動的なパーティション上書きの場合のみを最適化します。その他の場合については、「[EMRFS S3 向けに最適化されたコミッターを使用する](emr-spark-s3-optimized-committer.md)」を参照してください。 | 

次の Scala の例は、EMRFS S3 向けに最適化されたコミットプロトコルが `SQLHadoopMapReduceCommitProtocol` に委任するその他の状況を示しています。

**Example - カスタムパーティションの場所を使用する動的パーティション上書きモード**  
この例では、Scala プログラムは動的パーティション上書きモードで 2 つのパーティションを上書きします。1 つのパーティションはカスタムのパーティション場所を使用します。もう 1 つのパーティションはデフォルトのパーティション場所を使用します。EMRFS S3 向けに最適化されたコミットプロトコルは、デフォルトのパーティション場所を使用するパーティションのみを改善します。  

```
val table = "dataset"
val inputView = "tempView"
val location = "s3://bucket/table"
                            
spark.sql(s"""
  CREATE TABLE $table (id bigint, dt date) 
  USING PARQUET PARTITIONED BY (dt) 
  LOCATION '$location'
""")

// Add a partition using a custom location
val customPartitionLocation = "s3://bucket/custom"
spark.sql(s"""
  ALTER TABLE $table ADD PARTITION (dt='2019-01-28') 
  LOCATION '$customPartitionLocation'
""")

// Add another partition using default location
spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')")

def asDate(text: String) = lit(text).cast("date")   
                       
spark.range(0, 10)
  .withColumn("dt",
    when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29")))
  .createTempView(inputView)
  
// Set partition overwrite mode to 'dynamic'
spark.sql(s"SET spark.sql.sources.partitionOverwriteMode=dynamic")
  
spark.sql(s"INSERT OVERWRITE TABLE $table SELECT * FROM $inputView")
```
Scala コードは以下の Amazon S3 オブジェクトを作成します。  

```
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$
```
以前の Spark バージョンでは、カスタムパーティションの場所に書き込むと、データが失われる可能性があります。この例では、パーティション `dt='2019-01-28'` が失われます。詳細については「[SPARK-35106](https://issues.apache.org/jira/browse/SPARK-35106)」を参照してください。Amazon EMR リリース 5.33.0 以降 (6.0.x および 6.1.x は除く) では、この問題は修正されています。

カスタムのパーティション場所に書き込むとき、先ほどの例と同様のコミットアルゴリズムが使用されます。先ほどの例と同様、このアルゴリズムでは名前が順番に変更されるため、パフォーマンスが低下する可能性があります。

Spark 2.4.0 のアルゴリズムは以下の手順に従います。

1. カスタムのパーティション場所に出力を書き込むとき、タスクでは最終的な出力場所に作成される Spark のステージングディレクトリにファイルを書き込みます。ファイルの名前には、ファイルの競合から保護するためのランダムな UUID が含まれます。タスクの試行によって各ファイルが最終的な出力パスと共に追跡されます。

1. タスクが正常に完了すると、ドライバーにファイルとそれらの最終的な出力パスが渡されます。

1. すべてのタスクが完了した後、ジョブのコミットフェーズでは、カスタムのパーティション場所に書き込まれたすべてのファイルの名前が、最終的な出力パスに順番に変更されます。

1. ステージングディレクトリがジョブのコミットフェーズの完了前に削除されます。