

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 经 EMRFS S3 优化的提交协议的要求
<a name="emr-spark-commit-protocol-reqs"></a>

满足以下条件时，将使用经 EMRFS S3 优化的提交协议：
+ 你运行使用 Spark DataFrames、或数据集来覆盖分区表的 Spark 作业。
+ 您可以运行分区覆盖模式为 `dynamic` 的 Spark 任务。
+ 分段上传在 Amazon EMR 中已启用。这是默认值。有关更多信息，请参阅 [经 EMRFS S3 优化的提交协议和分段上传](emr-spark-commit-protocol-multipart.md)。
+ EMRFS 的文件系统缓存已启用。这是默认值。检查设置 `fs.s3.impl.disable.cache` 是否设置为 `false`。
+ 使用 Spark 的内置数据来源支持。内置数据来源支持用于以下情况：
  + 当任务写入内置数据来源或表时。
  + 当任务写入 Hive 元存储 Parquet 表时。当 `spark.sql.hive.convertInsertingPartitionedTable` 和 `spark.sql.hive.convertMetastoreParquet` 都设置为 true 时，就会发生这种情况。这些是默认设置。
  + 当任务写入 Hive 元存储 ORC 表时。当 `spark.sql.hive.convertInsertingPartitionedTable` 和 `spark.sql.hive.convertMetastoreOrc` 都设置为 `true` 时，就会发生这种情况。这些是默认设置。
+ 写入默认分区位置的 Spark 任务操作，例如 `${table_location}/k1=v1/k2=v2/`，使用提交协议。如果任务操作写入自定义分区位置，则不使用协议，例如，如果使用 `ALTER TABLE SQL` 命令设置自定义分区位置。
+ 必须使用 Spark 的以下值：
  + `spark.sql.sources.commitProtocolClass` 必须设置为 `org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol`。这是 Amazon EMR 发行版 5.30.0 及更高版本和 Amazon EMR 发行版 6.2.0 及更高版本的默认设置。
  + `partitionOverwriteMode` 写入选项或 `spark.sql.sources.partitionOverwriteMode` 必须设置为 `dynamic`。默认设置为 `static`。
**注意**  
Spark 2.4.0 中引入了 `partitionOverwriteMode` 写入选项。对于随附了 Amazon EMR 版本 5.19.0 的 Spark 版本 2.3.2，请设置 `spark.sql.sources.partitionOverwriteMode` 属性。
  + 如果 Spark 任务覆盖了 Hive 元存储 Parquet 表，则 `spark.sql.hive.convertMetastoreParquet`、`spark.sql.hive.convertInsertingPartitionedTable` 和 `spark.sql.hive.convertMetastore.partitionOverwriteMode` 必须设置为 `true`。这些是默认设置。
  + 如果 Spark 任务覆盖了 Hive 元存储 ORC 表，则 `spark.sql.hive.convertMetastoreOrc`、`spark.sql.hive.convertInsertingPartitionedTable` 和 `spark.sql.hive.convertMetastore.partitionOverwriteMode` 必须设置为 `true`。这些是默认设置。

**Example ：动态分区覆盖模式**  
在此 Scala 示例中已触发优化。首先，将 `partitionOverwriteMode` 属性设置为 `dynamic`。这只会覆盖您正在写入数据的那些分区。然后，通过 `partitionBy` 指定动态分区列，并将写入模式设置为 `overwrite`。  

```
val dataset = spark.range(0, 10)
  .withColumn("dt", expr("date_sub(current_date(), id)"))

dataset.write.mode("overwrite")                 // "overwrite" instead of "insert"
  .option("partitionOverwriteMode", "dynamic")  // "dynamic" instead of "static"  
  .partitionBy("dt")                            // partitioned data instead of unpartitioned data
  .parquet("s3://amzn-s3-demo-bucket1/output")    // "s3://" to use Amazon EMR file system, instead of "s3a://" or "hdfs://"
```

## 当不使用经 EMRFS S3 优化的提交协议时
<a name="emr-spark-commit-protocol-reqs-anti"></a>

通常，经 EMRFS S3 优化的提交协议的工作原理与开源默认 Spark 提交协议 `org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol` 相同。以下情况下不会进行优化。


****  

| 情况 | 为什么不使用提交协议 | 
| --- | --- | 
| 当您向 HDFS 写入时 | 提交协议只支持使用 EMRFS 写入 Amazon S3。 | 
| 当您使用 S3A 文件系统时 | 提交协议只支持 EMRFS。 | 
| 当你使用 MapReduce 或 Spark 的 RDD API 时 | 提交协议仅支持使用 sparkSQL DataFrame、或数据集。 APIs | 
| 当没有触发动态分区覆盖时 | 提交协议仅优化动态分区覆盖情况。有关其他情况，请参阅 [使用经 EMRFS S3 优化的提交程序](emr-spark-s3-optimized-committer.md)。 | 

以下 Scala 示例演示了经 EMRFS S3 优化的提交协议委托给 `SQLHadoopMapReduceCommitProtocol` 的一些其他情况。

**Example ：具有自定义分区位置的动态分区覆盖模式**  
在此示例中，Scala 程序以动态分区覆盖模式覆盖两个分区。一个分区具有自定义分区位置。另一个分区使用默认分区位置。经 EMRFS S3 优化的提交协议仅改进了使用默认分区位置的分区。  

```
val table = "dataset"
val inputView = "tempView"
val location = "s3://bucket/table"
                            
spark.sql(s"""
  CREATE TABLE $table (id bigint, dt date) 
  USING PARQUET PARTITIONED BY (dt) 
  LOCATION '$location'
""")

// Add a partition using a custom location
val customPartitionLocation = "s3://bucket/custom"
spark.sql(s"""
  ALTER TABLE $table ADD PARTITION (dt='2019-01-28') 
  LOCATION '$customPartitionLocation'
""")

// Add another partition using default location
spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')")

def asDate(text: String) = lit(text).cast("date")   
                       
spark.range(0, 10)
  .withColumn("dt",
    when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29")))
  .createTempView(inputView)
  
// Set partition overwrite mode to 'dynamic'
spark.sql(s"SET spark.sql.sources.partitionOverwriteMode=dynamic")
  
spark.sql(s"INSERT OVERWRITE TABLE $table SELECT * FROM $inputView")
```
Scala 代码创建以下 Amazon S3 对象：  

```
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$
```
在早期 Spark 版本中写入自定义分区位置可能会导致数据丢失。在此示例中，分区 `dt='2019-01-28'` 将丢失。有关详细信息，请参阅 [SPARK-35106](https://issues.apache.org/jira/browse/SPARK-35106)。此问题已在 Amazon EMR 发行版 5.33.0 及更高版本（6.0.x 和 6.1.x 除外）中得到修复。

当写入到自定义位置的分区时，Spark 会使用类似于上一个示例的提交算法，如下所述。与前面的示例一样，该算法会导致顺序重命名，这可能会影响性能。

Spark 2.4.0 中的算法遵循以下步骤：

1. 在将输出写入自定义位置的分区时，任务会写入到 Spark 的暂存目录下的文件中，该目录是在最终输出位置下创建的。该文件的名称包含一个随机 UUID，以防止文件冲突。任务尝试跟踪每个文件以及最终所需的输出路径。

1. 在任务成功完成后，它会为驱动程序提供这些文件及其最终所需的输出路径。

1. 所有任务都完成后，作业提交阶段会按顺序将为自定义位置的分区写入的所有文件重命名为其最终输出路径。

1. 暂存目录会在作业提交阶段完成之前删除。