

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Requisitos del protocolo de confirmación optimizado para S3 de EMRFS
<a name="emr-spark-commit-protocol-reqs"></a>

El protocolo de confirmación optimizado para S3 de EMRFS se utiliza cuando se cumplen las siguientes condiciones:
+ Ejecutas trabajos de Spark que usan Spark o Datasets para sobrescribir tablas particionadas. DataFrames
+ Ejecuta trabajos de Spark cuyo modo de sobrescritura de particiones es `dynamic`.
+ Las cargas multiparte están habilitadas en Amazon EMR. Esta es la opción predeterminada. Para obtener más información, consulte [El protocolo de confirmación optimizado para S3 de EMRFS y las cargas multiparte](emr-spark-commit-protocol-multipart.md). 
+ La caché del sistema de archivos para EMRFS está habilitada. Es el valor predeterminado. Compruebe que la configuración `fs.s3.impl.disable.cache` esté establecida en `false`. 
+ Se utiliza la compatibilidad de orígenes de datos integrados de Spark. La compatibilidad de orígenes de datos integrados se utiliza en las siguientes circunstancias:
  + Cuando los trabajos escriben en orígenes de datos o tablas integrados.
  + Cuando los trabajos escriben en tablas Parquet del metaalmacén de Hive. Esto sucede cuando `spark.sql.hive.convertInsertingPartitionedTable` y `spark.sql.hive.convertMetastoreParquet` se establecen en true. Esta es la configuración predeterminada.
  + Cuando los trabajos escriben en tablas ORC del metaalmacén de Hive. Esto sucede cuando `spark.sql.hive.convertInsertingPartitionedTable` y `spark.sql.hive.convertMetastoreOrc` se establecen en `true`. Esta es la configuración predeterminada.
+ Las operaciones de trabajos de Spark que escriben en una ubicación de partición predeterminada (por ejemplo, `${table_location}/k1=v1/k2=v2/`) utilizan el protocolo de confirmación. El protocolo no se utiliza si una operación de trabajo escribe en una ubicación de partición personalizada, por ejemplo, si una ubicación de partición personalizada se establece con el comando `ALTER TABLE SQL`.
+ Se deben utilizar los valores siguientes para Spark:
  + `spark.sql.sources.commitProtocolClass` se debe establecer en `org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol`. Esta es la configuración predeterminada para las versiones 5.30.0 y posteriores y 6.2.0 y posteriores de Amazon EMR. 
  + La opción de escritura `partitionOverwriteMode` o `spark.sql.sources.partitionOverwriteMode` debe estar establecida en `dynamic`. El ajuste predeterminado es `static`.
**nota**  
La opción de escritura `partitionOverwriteMode` se introdujo en Spark 2.4.0. En el caso de la versión 2.3.2 de Spark, incluida con la versión 5.19.0 de Amazon EMR, establezca la propiedad `spark.sql.sources.partitionOverwriteMode`. 
  + Si los trabajos de Spark sobrescriben en la tabla Parquet del metaalmacén de Hive, `spark.sql.hive.convertMetastoreParquet`, `spark.sql.hive.convertInsertingPartitionedTable` y `spark.sql.hive.convertMetastore.partitionOverwriteMode` debe establecerse en `true`. Esta es la configuración predeterminada. 
  + Si los trabajos de Spark sobrescriben en la tabla ORC del metaalmacén de Hive, `spark.sql.hive.convertMetastoreOrc`, `spark.sql.hive.convertInsertingPartitionedTable` y `spark.sql.hive.convertMetastore.partitionOverwriteMode` debe establecerse en `true`. Esta es la configuración predeterminada.

**Example – Modo de sobrescritura de partición dinámica**  
En este ejemplo de Scala, se activa la optimización. En primer lugar, establece la propiedad `partitionOverwriteMode` en `dynamic`. Esto solo sobrescribe las particiones en las que escribe datos. A continuación, especifica las columnas de particiones dinámicas mediante `partitionBy` y establece el modo de escritura en `overwrite`.  

```
val dataset = spark.range(0, 10)
  .withColumn("dt", expr("date_sub(current_date(), id)"))

dataset.write.mode("overwrite")                 // "overwrite" instead of "insert"
  .option("partitionOverwriteMode", "dynamic")  // "dynamic" instead of "static"  
  .partitionBy("dt")                            // partitioned data instead of unpartitioned data
  .parquet("s3://amzn-s3-demo-bucket1/output")    // "s3://" to use Amazon EMR file system, instead of "s3a://" or "hdfs://"
```

## Cuando no se utiliza el protocolo de confirmación optimizado para S3 de EMRFS
<a name="emr-spark-commit-protocol-reqs-anti"></a>

Por lo general, el protocolo de confirmación optimizado para S3 de EMRFS funciona igual que el protocolo de confirmación de Spark predeterminado de código abierto, `org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol`. La optimización no se producirá en las siguientes situaciones.


****  

| Situación | Por qué no se utiliza el protocolo de confirmación | 
| --- | --- | 
| Cuando escribe en HDFS | El protocolo de confirmación solo admite la escritura en Amazon S3 mediante EMRFS. | 
| Cuando utiliza el sistema de archivos S3A | El protocolo de confirmación solo admite EMRFS. | 
| Cuando utilizas la MapReduce API RDD de Spark | El protocolo de confirmación solo admite el uso de SparkSQL o DataFrame Dataset. APIs | 
| Cuando no se activa la sobrescritura de particiones dinámicas | El protocolo de confirmación solo optimiza los casos de sobrescritura de particiones dinámicas. Para los demás casos, consulte [Uso del confirmador optimizado para S3 de EMRFS](emr-spark-s3-optimized-committer.md). | 

En los siguientes ejemplos de Scala se muestran algunas situaciones adicionales en las que el protocolo de confirmación optimizado para S3 de EMRFS delega en `SQLHadoopMapReduceCommitProtocol`.

**Example – Modo de sobrescritura de particiones dinámica con ubicación de partición personalizada**  
En este ejemplo, los programas de Scala sobrescriben dos particiones en el modo de sobrescritura de particiones dinámicas. Una partición tiene una ubicación de partición personalizada. La otra partición utiliza la ubicación de partición predeterminada. El protocolo de confirmación optimizado para S3 de EMRFS solo mejora la partición que utiliza la ubicación de partición predeterminada.  

```
val table = "dataset"
val inputView = "tempView"
val location = "s3://bucket/table"
                            
spark.sql(s"""
  CREATE TABLE $table (id bigint, dt date) 
  USING PARQUET PARTITIONED BY (dt) 
  LOCATION '$location'
""")

// Add a partition using a custom location
val customPartitionLocation = "s3://bucket/custom"
spark.sql(s"""
  ALTER TABLE $table ADD PARTITION (dt='2019-01-28') 
  LOCATION '$customPartitionLocation'
""")

// Add another partition using default location
spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')")

def asDate(text: String) = lit(text).cast("date")   
                       
spark.range(0, 10)
  .withColumn("dt",
    when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29")))
  .createTempView(inputView)
  
// Set partition overwrite mode to 'dynamic'
spark.sql(s"SET spark.sql.sources.partitionOverwriteMode=dynamic")
  
spark.sql(s"INSERT OVERWRITE TABLE $table SELECT * FROM $inputView")
```
El código de Scala crea los siguientes objetos de Amazon S3:  

```
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$
```
Escribir en ubicaciones de particiones personalizadas en versiones anteriores de Spark puede provocar la pérdida de datos. En este ejemplo, la partición `dt='2019-01-28'` se perdería. Para obtener más información, consulte [SPARK-35106](https://issues.apache.org/jira/browse/SPARK-35106). Esto se corrigió en la versión 5.33.0 y posteriores de Amazon EMR, excluidas las 6.0.x y 6.1.x.

Al escribir en particiones de ubicaciones personalizadas, Spark utiliza un algoritmo de confirmación similar al del ejemplo anterior, que se detalla a continuación. Como ocurre en el ejemplo anterior, el algoritmo da como resultado cambios de nombre secuenciales, lo que puede afectar negativamente al rendimiento.

El algoritmo de Spark 2.4.0 sigue estos pasos:

1. Al escribir la salida en una partición de una ubicación personalizada, las tareas escriben en un archivo del directorio de ensayo de Spark, que se crea en la ubicación de salida final. El nombre del archivo incluye un UUID aleatorio para proteger contra las colisiones de archivo. La tarea intenta mantener un seguimiento de cada archivo junto con la ruta de salida deseada final.

1. Cuando una tarea se completa de forma satisfactoria, proporciona al controlador los archivos y las rutas de salida deseada final.

1. Una vez completadas todas las tareas, la fase de confirmación de trabajo cambia el nombre de forma secuencial de todos los archivos que se escribieron para particiones en ubicaciones personalizadas en sus rutas de salida finales.

1. El directorio de ensayo se elimina antes de que se complete la fase de confirmación de trabajo.