

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Requisitos do protocolo de confirmação otimizado para EMRFS S3
<a name="emr-spark-commit-protocol-reqs"></a>

O protocolo de confirmação otimizado para EMRFS S3 é usado quando as seguintes condições são atendidas:
+ Você executa trabalhos do Spark que usam o Spark ou conjuntos de dados para substituir tabelas particionadas. DataFrames
+ Você executa trabalhos do Spark cujo modo de substituição de partição é `dynamic`.
+ Carregamentos multipart estão habilitados no Amazon EMR. Esse é o padrão. Para obter mais informações, consulte [O protocolo de confirmação otimizado para EMRFS S3 e carregamentos multipart](emr-spark-commit-protocol-multipart.md). 
+ O cache do sistema de arquivos para o EMRFS está habilitado. Esse é o padrão. Verifique se a configuração `fs.s3.impl.disable.cache` está definida como `false`. 
+ O suporte integrado de fonte de dados do Spark é usado. O suporte integrado ao Parquet é usado nas seguintes circunstâncias:
  + Quando os trabalhos gravam em fontes de dados ou tabelas integradas.
  + Quando os trabalhos gravam em tabelas do Parquet do metastore do Hive. Isso acontece quando `spark.sql.hive.convertInsertingPartitionedTable` e `spark.sql.hive.convertMetastoreParquet` são definidos como verdadeiros. Essas são as configurações padrão.
  + Quando os trabalhos gravam na tabela do ORC do metastore do Hive. Isso acontece quando `spark.sql.hive.convertInsertingPartitionedTable` e `spark.sql.hive.convertMetastoreOrc` são definidos como `true`. Essas são as configurações padrão.
+ As operações de trabalhos do Spark que gravam em um local de partição padrão, por exemplo, `${table_location}/k1=v1/k2=v2/`, usam o confirmador. O protocolo não será usado se uma operação de trabalho gravar em um local de partição personalizado, por exemplo, se o local de uma partição personalizado for definido usando o comando `ALTER TABLE SQL`.
+ Os valores a seguir para o Spark devem ser usados:
  + `spark.sql.sources.commitProtocolClass` deve ser definido como `org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol`. Essa é a configuração padrão para as versões 5.30.0 e superiores e 6.2.0 e superiores do Amazon EMR. 
  + A opção de gravação `partitionOverwriteMode` ou `spark.sql.sources.partitionOverwriteMode` deve ser definida como `dynamic`. A configuração padrão é `static`.
**nota**  
A opção de gravação `partitionOverwriteMode` foi introduzida no Spark 2.4.0. Para a versão 2.3.2 do Spark, incluída com a versão 5.19.0 do Amazon EMR, defina a propriedade `spark.sql.sources.partitionOverwriteMode`. 
  + Se os trabalhos do Spark substituírem a tabela do Parquet do metastore do Hive, `spark.sql.hive.convertMetastoreParquet`, `spark.sql.hive.convertInsertingPartitionedTable` e `spark.sql.hive.convertMetastore.partitionOverwriteMode` deverão ser configurados como `true`. Existem as configurações padrão. 
  + Se os trabalhos do Spark substituírem a tabela do ORC do metastore do Hive, `spark.sql.hive.convertMetastoreOrc`, `spark.sql.hive.convertInsertingPartitionedTable` e `spark.sql.hive.convertMetastore.partitionOverwriteMode` deverão ser configurados como `true`. Existem as configurações padrão.

**Example – Modo de substituição de partição dinâmica**  
Neste exemplo do Scala, a otimização é acionada. Primeiro, você define a propriedade `partitionOverwriteMode` como `dynamic`. Isso só substitui as partições nas quais você está gravando dados. Em seguida, você especifica as colunas de partição dinâmica com `partitionBy` e define o modo de gravação como `overwrite`.  

```
val dataset = spark.range(0, 10)
  .withColumn("dt", expr("date_sub(current_date(), id)"))

dataset.write.mode("overwrite")                 // "overwrite" instead of "insert"
  .option("partitionOverwriteMode", "dynamic")  // "dynamic" instead of "static"  
  .partitionBy("dt")                            // partitioned data instead of unpartitioned data
  .parquet("s3://amzn-s3-demo-bucket1/output")    // "s3://" to use Amazon EMR file system, instead of "s3a://" or "hdfs://"
```

## Quando o protocolo de confirmação otimizado para EMRFS S3 não é usado
<a name="emr-spark-commit-protocol-reqs-anti"></a>

Geralmente, o protocolo de confirmação otimizado para EMRFS S3 funciona da mesma forma que o protocolo de confirmação padrão do Spark de código aberto, `org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol`. A otimização não ocorrerá nas situações a seguir.


****  

| Situação | Por que o protocolo de confirmação não é usado | 
| --- | --- | 
| Quando você grava no HDFS | O protocolo de confirmação só permite gravação no Amazon S3 com o uso do EMRFS. | 
| Quando você usa o sistema de arquivos S3A | O protocolo de confirmação só é compatível com EMRFS. | 
| Quando você usa MapReduce nossa API RDD do Spark | O protocolo de confirmação só oferece suporte ao uso de SparkSQL ou Dataset DataFrame. APIs | 
| Quando a substituição da partição dinâmica não é acionada | O protocolo de confirmação só otimiza os casos de substituição de partição dinâmica. Para outros casos, consulte [Usar o confirmador otimizado para EMRFS S3](emr-spark-s3-optimized-committer.md). | 

Os exemplos de Scala a seguir demonstram algumas situações adicionais que o protocolo de confirmação otimizado para EMRFS S3 delega para `SQLHadoopMapReduceCommitProtocol`.

**Example – Modo de substituição de partição dinâmica com local de partição personalizado**  
Neste exemplo, os programas Scala sobrescrevem duas partições no modo de substituição dinâmica de partição. Uma partição tem um local de partição personalizado. A outra partição usa o local de partição padrão. O protocolo de confirmação otimizado para EMRFS S3 só melhora a partição que usa o local de partição padrão.  

```
val table = "dataset"
val inputView = "tempView"
val location = "s3://bucket/table"
                            
spark.sql(s"""
  CREATE TABLE $table (id bigint, dt date) 
  USING PARQUET PARTITIONED BY (dt) 
  LOCATION '$location'
""")

// Add a partition using a custom location
val customPartitionLocation = "s3://bucket/custom"
spark.sql(s"""
  ALTER TABLE $table ADD PARTITION (dt='2019-01-28') 
  LOCATION '$customPartitionLocation'
""")

// Add another partition using default location
spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')")

def asDate(text: String) = lit(text).cast("date")   
                       
spark.range(0, 10)
  .withColumn("dt",
    when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29")))
  .createTempView(inputView)
  
// Set partition overwrite mode to 'dynamic'
spark.sql(s"SET spark.sql.sources.partitionOverwriteMode=dynamic")
  
spark.sql(s"INSERT OVERWRITE TABLE $table SELECT * FROM $inputView")
```
O código Scala cria os seguintes objetos do Amazon S3:  

```
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$
```
Gravar em locais de partição personalizados em versões anteriores do Spark pode resultar em perda de dados. Neste exemplo, a partição `dt='2019-01-28'` seria perdida. Para obter mais detalhes, consulte [SPARK-35106](https://issues.apache.org/jira/browse/SPARK-35106). Isso foi corrigido nas versões 5.33.0 e posteriores do Amazon EMR, excluindo 6.0.x e 6.1.x.

Ao gravar em partições em locais personalizados, o Spark usa um algoritmo de confirmação semelhante ao exemplo anterior, que é descrito abaixo. Como no exemplo anterior, o algoritmo resulta em renomeações sequenciais, o que pode afetar negativamente a performance.

O algoritmo no Spark 2.4.0 segue estas etapas:

1. Ao gravar a saída em uma partição em um local personalizado, as tarefas gravam em um arquivo no diretório de preparação do Spark, que é criado no local de saída final. O nome do arquivo inclui um UUID aleatório para evitar colisões de nomes de arquivos. A tentativa de tarefa controla cada arquivo junto com o caminho de saída final desejado.

1. Quando uma tarefa é concluída com êxito, ela fornece o driver com os arquivos e os caminhos desejados de saída final.

1. Depois que todas as tarefas forem concluídas, a fase de confirmação do trabalho renomeará sequencialmente todos os arquivos que foram gravados para partições em locais personalizados em seus caminhos de saída final.

1. O diretório de preparação é excluído antes de a fase de confirmação de trabalho ser concluída.