

# Use the EMRFS S3-optimized committer
<a name="emr-spark-s3-optimized-committer"></a>

The EMRFS S3-optimized committer is an alternative [OutputCommitter](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/OutputCommitter.html) implementation that is optimized for writing files to Amazon S3 when using EMRFS. The EMRFS S3-optimized committer improves application performance by avoiding list and rename operations done in Amazon S3 during job and task commit phases. The committer is available with Amazon EMR release 5.19.0 and later, and is enabled by default with Amazon EMR 5.20.0 and later. The committer is used for Spark jobs that use Spark, DataFrames, or Datasets. Starting with Amazon EMR 6.4.0, this committer can be used for all common formats including parquet, ORC, and text-based formats (including CSV and JSON). For releases prior to Amazon EMR 6.4.0, only the Parquet format is supported. There are circumstances under which the committer is not used. For more information, see [Requirements for the EMRFS S3-optimized committer](emr-spark-committer-reqs.md).

**Topics**
+ [

# Requirements for the EMRFS S3-optimized committer
](emr-spark-committer-reqs.md)
+ [

# The EMRFS S3-optimized committer and multipart uploads
](emr-spark-committer-multipart.md)
+ [

# Job tuning considerations
](emr-spark-committer-tuning.md)
+ [

# Enable the EMRFS S3-optimized committer for Amazon EMR 5.19.0
](emr-spark-committer-enable.md)

# Requirements for the EMRFS S3-optimized committer
<a name="emr-spark-committer-reqs"></a>

The EMRFS S3-optimized committer is used when the following conditions are met:
+ You run Spark jobs that use Spark, DataFrames, or Datasets to write files to Amazon S3. Starting with Amazon EMR 6.4.0, this committer can be used for all common formats including parquet, ORC, and text-based formats (including CSV and JSON). For releases prior to Amazon EMR 6.4.0, only the Parquet format is supported.
+ Multipart uploads are enabled in Amazon EMR . This is the default. For more information, see [The EMRFS S3-optimized committer and multipart uploads](emr-spark-committer-multipart.md). 
+ Spark's built-in file format support is used. The built-in file format support is used in the following circumstances:
  + For Hive metastore tables, when `spark.sql.hive.convertMetastoreParquet` is set to `true` for Parquet tables, or `spark.sql.hive.convertMetastoreOrc` is set to `true` for Orc tables with Amazon EMR 6.4.0 or above. These are the default settings.
  + When jobs write to file format data sources or tables—for example, the target table is created with the `USING parquet` clause. 
  + When jobs write to non-partitioned Hive metastore Parquet tables. Spark's built-in Parquet support does not support partitioned Hive tables, which is a known limitation. For more information, see [Hive metastore Parquet table conversion](https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#hive-metastore-parquet-table-conversion) in the Apache Spark, DataFrames and Datasets Guide.
+ Spark job operations that write to a default partition location—for example, `${table_location}/k1=v1/k2=v2/`—use the committer. The committer is not used if a job operation writes to a custom partition location—for example, if a custom partition location is set using the `ALTER TABLE SQL` command.
+ The following values for Spark must be used:
  + The `spark.sql.parquet.fs.optimized.committer.optimization-enabled` property must be set to `true`. This is the default setting with Amazon EMR 5.20.0 and later. With Amazon EMR 5.19.0, the default value is `false`. For information about configuring this value, see [Enable the EMRFS S3-optimized committer for Amazon EMR 5.19.0](emr-spark-committer-enable.md).
  + If writing to non-partitioned Hive metastore tables, only Parquet and Orc file formats are supported. `spark.sql.hive.convertMetastoreParquet` must be set to `true` if writing to non-partitioned Parquet Hive metastore tables. `spark.sql.hive.convertMetastoreOrc` must be set to `true` if writing to non-partitioned Orc Hive metastore tables. These are the default settings.
  + `spark.sql.parquet.output.committer.class` must be set to `com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter`. This is the default setting.
  + `spark.sql.sources.commitProtocolClass` must be set to `org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol` or `org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol`. `org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol` is the default setting for the Amazon EMR 5.x series version 5.30.0 and higher, and for the Amazon EMR 6.x series version 6.2.0 and higher. `org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol` is the default setting for previous Amazon EMR versions.
  + If Spark jobs overwrite partitioned Parquet datasets with dynamic partition columns, then the `partitionOverwriteMode` write option and `spark.sql.sources.partitionOverwriteMode` must be set to `static`. This is the default setting.
**Note**  
The `partitionOverwriteMode` write option was introduced in Spark 2.4.0. For Spark version 2.3.2, included with Amazon EMR release 5.19.0, set the `spark.sql.sources.partitionOverwriteMode` property. 

## Occasions when EMRFS S3-optimized committer is not used
<a name="emr-spark-committer-reqs-anti"></a>

Generally, the EMRFS S3-optimized committer isn't used in the following situations.


****  

| Situation | Why the committer is not used | 
| --- | --- | 
| When you write to HDFS | The committer only supports writing to Amazon S3 using EMRFS. | 
| When you use the S3A file system | The committer only supports EMRFS. | 
| When you use MapReduce or Spark's RDD API | The committer only supports using SparkSQL, DataFrame, or Dataset APIs. | 

The following Scala examples demonstrate some additional situations that prevent the EMRFS S3-optimized committer from being used in whole (the first example) and in part (the second example).

**Example – Dynamic partition overwrite mode**  
The following Scala example instructs Spark to use a different commit algorithm, which prevents use of the EMRFS S3-optimized committer altogether. The code sets the `partitionOverwriteMode` property to `dynamic` to overwrite only those partitions to which you're writing data. Then, dynamic partition columns are specified by `partitionBy`, and the write mode is set to `overwrite`.   

```
val dataset = spark.range(0, 10)
  .withColumn("dt", expr("date_sub(current_date(), id)"))

dataset.write.mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .partitionBy("dt")
  .parquet("s3://amzn-s3-demo-bucket1/output")
```
You must configure all three settings to avoid using the EMRFS S3-optimized committer. When you do so, Spark executes a different commit algorithm that's specified in Spark's commit protocol. For Amazon EMR 5.x releases earlier than 5.30.0 and for Amazon EMR 6.x releases earlier than 6.2.0, the commit protocol uses Spark's staging directory, which is a temporary directory created under the output location that starts with `.spark-staging`. The algorithm sequentially renames partition directories, which can negatively impact performance. For more information about Amazon EMR releases 5.30.0 and later and 6.2.0 and later, see [Use the EMRFS S3-optimized commit protocol](emr-spark-s3-optimized-commit-protocol.md).   
The algorithm in Spark 2.4.0 follows these steps:  

1. Task attempts write their output to partition directories under Spark's staging directory—for example, `${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/`.

1. For each partition written, the task attempt keeps track of relative partition paths—for example, `k1=v1/k2=v2`.

1. When a task completes successfully, it provides the driver with all relative partition paths that it tracked.

1. After all tasks complete, the job commit phase collects all the partition directories that successful task attempts wrote under Spark's staging directory. Spark sequentially renames each of these directories to its final output location using directory tree rename operations.

1. The staging directory is deleted before the job commit phase completes.

**Example – Custom partition location**  
In this example, the Scala code inserts into two partitions. One partition has a custom partition location. The other partition uses the default partition location. The EMRFS S3-optimized committer is only used for writing task output to the partition that uses the default partition location.  

```
val table = "dataset"
val location = "s3://bucket/table"
                            
spark.sql(s"""
  CREATE TABLE $table (id bigint, dt date) 
  USING PARQUET PARTITIONED BY (dt) 
  LOCATION '$location'
""")
                            
// Add a partition using a custom location
val customPartitionLocation = "s3://bucket/custom"
spark.sql(s"""
  ALTER TABLE $table ADD PARTITION (dt='2019-01-28') 
  LOCATION '$customPartitionLocation'
""")
                            
// Add another partition using default location
spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')")
                            
def asDate(text: String) = lit(text).cast("date")
                            
spark.range(0, 10)
  .withColumn("dt",
    when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29")))
  .write.insertInto(table)
```
The Scala code creates the following Amazon S3 objects:  

```
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$
```
When writing to partitions at custom locations, Spark uses a commit algorithm similar to the previous example, which is outlined below. As with the earlier example, the algorithm results in sequential renames, which may negatively impact performance.  

1. When writing output to a partition at a custom location, tasks write to a file under Spark's staging directory, which is created under the final output location. The name of the file includes a random UUID to protect against file collisions. The task attempt keeps track of each file along with the final desired output path.

1. When a task completes successfully, it provides the driver with the files and their final desired output paths.

1. After all tasks complete, the job commit phase sequentially renames all files that were written for partitions at custom locations to their final output paths.

1. The staging directory is deleted before the job commit phase completes.

# The EMRFS S3-optimized committer and multipart uploads
<a name="emr-spark-committer-multipart"></a>

To use the EMRFS S3-optimized committer, you must enable multipart uploads for Amazon EMR . Multipart uploads are enabled by default. You can re-enable it if required. For more information, see [Configure multipart upload for Amazon S3](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-upload-s3.html#Config_Multipart) in the *Amazon EMR Management Guide*. 

The EMRFS S3-optimized committer uses the transaction-like characteristics of multipart uploads to ensure files written by task attempts only appear in the job's output location upon task commit. By using multipart uploads in this way, the committer improves task commit performance over the default FileOutputCommitter algorithm version 2. When using the EMRFS S3-optimized committer, there are some key differences from traditional multipart upload behavior to consider:
+ Multipart uploads are always performed regardless of the file size. This differs from the default behavior of EMRFS, where the `fs.s3n.multipart.uploads.split.size` property controls the file size at which multipart uploads are triggered.
+ Multipart uploads are left in an incomplete state for a longer period of time until the task commits or aborts. This differs from the default behavior of EMRFS where a multipart upload completes when a task finishes writing a given file.

Because of these differences, if a Spark Executor JVM crashes or is killed while tasks are running and writing data to Amazon S3, incomplete multipart uploads are more likely to be left behind. For this reason, when you use the EMRFS S3-optimized committer, be sure to follow the best practices for managing failed multipart uploads. For more information, see [Best practices](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-upload-s3.html#emr-bucket-bestpractices) for working with Amazon S3 buckets in the *Amazon EMR Management Guide*.

# Job tuning considerations
<a name="emr-spark-committer-tuning"></a>

The EMRFS S3-optimized committer consumes a small amount of memory for each file written by a task attempt until the task gets committed or aborted. In most jobs, the amount of memory consumed is negligible. For jobs that have long-running tasks that write a large number of files, the memory that the committer consumes may be noticeable and require adjustments to the memory allocated for Spark executors. You can tune executor memory using the `spark.executor.memory` property. As a guideline, a single task writing 100,000 files would typically require an additional 100MB of memory. For more information, see [Application properties](https://spark.apache.org/docs/latest/configuration.html#application-properties) in the Apache Spark Configuration documentation.

# Enable the EMRFS S3-optimized committer for Amazon EMR 5.19.0
<a name="emr-spark-committer-enable"></a>

If you are using Amazon EMR 5.19.0 , you can manually set the `spark.sql.parquet.fs.optimized.committer.optimization-enabled` property to `true` when you create a cluster or from within Spark if you are using Amazon EMR .

## Enabling the EMRFS S3-optimized committer when creating a cluster
<a name="w2aac62c61c17c13b5"></a>

Use the `spark-defaults` configuration classification to set the `spark.sql.parquet.fs.optimized.committer.optimization-enabled` property to `true`. For more information, see [Configure applications](emr-configure-apps.md).

## Enabling the EMRFS S3-optimized committer from Spark
<a name="w2aac62c61c17c13b7"></a>

You can set `spark.sql.parquet.fs.optimized.committer.optimization-enabled` to `true` by hard-coding it in a `SparkConf`, passing it as a `--conf` parameter in the Spark shell or `spark-submit` and `spark-sql` tools, or in `conf/spark-defaults.conf`. For more information, see [Spark configuration](https://spark.apache.org/docs/latest/configuration.html) in Apache Spark documentation.

The following example shows how to enable the committer while running a spark-sql command.

```
spark-sql \
  --conf spark.sql.parquet.fs.optimized.committer.optimization-enabled=true \
  -e "INSERT OVERWRITE TABLE target_table SELECT * FROM source_table;"
```