

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Persyaratan untuk protokol komit yang dioptimalkan EMRFS S3
<a name="emr-spark-commit-protocol-reqs"></a>

Protokol komit yang dioptimalkan EMRFS S3 digunakan ketika kondisi berikut terpenuhi:
+ Anda menjalankan pekerjaan Spark yang menggunakan Spark, DataFrames, atau Datasets untuk menimpa tabel yang dipartisi.
+ Anda menjalankan pekerjaan Spark yang mode timpa partisi. `dynamic`
+ Multipart upload diaktifkan di Amazon EMR. Ini adalah opsi default. Untuk informasi selengkapnya, lihat [Protokol komit yang dioptimalkan EMRFS S3 dan unggahan multipart](emr-spark-commit-protocol-multipart.md). 
+ Cache sistem file untuk EMRFS diaktifkan. Ini adalah opsi default. Periksa apakah pengaturan `fs.s3.impl.disable.cache` diatur ke`false`. 
+ Dukungan sumber data bawaan Spark digunakan. Dukungan sumber data bawaan digunakan dalam keadaan berikut:
  + Ketika pekerjaan menulis ke sumber data bawaan atau tabel.
  + Ketika pekerjaan menulis ke meja Parket Hive metastore. Ini terjadi ketika `spark.sql.hive.convertInsertingPartitionedTable` dan `spark.sql.hive.convertMetastoreParquet` keduanya disetel ke true. Ini adalah pengaturan default.
  + Ketika pekerjaan menulis ke tabel ORC metastore Hive. Ini terjadi ketika `spark.sql.hive.convertInsertingPartitionedTable` dan `spark.sql.hive.convertMetastoreOrc` keduanya diatur ke`true`. Ini adalah pengaturan default.
+ Spark job operations yang menulis ke lokasi partisi default — misalnya, `${table_location}/k1=v1/k2=v2/` — menggunakan protokol commit. Protokol tidak digunakan jika operasi pekerjaan menulis ke lokasi partisi kustom - misalnya, jika lokasi partisi kustom diatur menggunakan `ALTER TABLE SQL` perintah.
+ Nilai berikut untuk Spark mesti digunakan:
  + `spark.sql.sources.commitProtocolClass` harus diatur ke `org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol`. Ini adalah pengaturan default untuk Amazon EMR rilis 5.30.0 dan lebih tinggi, dan 6.2.0 dan lebih tinggi. 
  + Opsi `partitionOverwriteMode` tulis atau `spark.sql.sources.partitionOverwriteMode` harus diatur ke`dynamic`. Pengaturan default-nya adalah `static`.
**catatan**  
Parameter `partitionOverwriteMode` menulis opsi diperkenalkan di Spark 2.4.0. Untuk Spark versi 2.3.2, disertakan dengan rilis Amazon EMR 5.19.0, mengatur `spark.sql.sources.partitionOverwriteMode` properti. 
  + Jika pekerjaan Spark menimpa ke meja Parket Hive metastore,, `spark.sql.hive.convertMetastoreParquet``spark.sql.hive.convertInsertingPartitionedTable`, dan harus diatur ke. `spark.sql.hive.convertMetastore.partitionOverwriteMode` `true` Ada pengaturan default. 
  + Jika pekerjaan Spark menimpa ke tabel ORC metastore Hive,, `spark.sql.hive.convertMetastoreOrc``spark.sql.hive.convertInsertingPartitionedTable`, dan harus disetel ke. `spark.sql.hive.convertMetastore.partitionOverwriteMode` `true` Ada pengaturan default.

**Example — Mode penimpaan partisi dinamis**  
Dalam contoh Scala ini, optimasi dipicu. Pertama, Anda mengatur `partitionOverwriteMode` properti ke`dynamic`. Ini hanya menimpa partisi yang Anda tulis data. Kemudian, Anda menentukan kolom partisi dinamis dengan `partitionBy` dan mengatur mode tulis ke`overwrite`.  

```
val dataset = spark.range(0, 10)
  .withColumn("dt", expr("date_sub(current_date(), id)"))

dataset.write.mode("overwrite")                 // "overwrite" instead of "insert"
  .option("partitionOverwriteMode", "dynamic")  // "dynamic" instead of "static"  
  .partitionBy("dt")                            // partitioned data instead of unpartitioned data
  .parquet("s3://amzn-s3-demo-bucket1/output")    // "s3://" to use Amazon EMR file system, instead of "s3a://" or "hdfs://"
```

## Ketika protokol komit yang dioptimalkan EMRFS S3 tidak digunakan
<a name="emr-spark-commit-protocol-reqs-anti"></a>

Umumnya, protokol komit yang dioptimalkan EMRFS S3 bekerja sama dengan protokol komit Spark default open source,. `org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol` Optimasi tidak akan terjadi dalam situasi berikut.


****  

| Situasi | Mengapa protokol komit tidak digunakan | 
| --- | --- | 
| Saat Anda menulis ke HDFS | Protokol komit hanya mendukung penulisan ke Amazon S3 menggunakan EMRFS. | 
| Saat Anda menggunakan sistem file S3A | Protokol komit hanya mendukung EMRFS. | 
| Saat Anda menggunakan MapReduce atau API RDD Spark | Protokol commit hanya mendukung penggunaan DataFrame SparkSQL,, atau Dataset. APIs | 
| Saat penimpaan partisi dinamis tidak dipicu | Protokol komit hanya mengoptimalkan kasus penimpaan partisi dinamis. Untuk kasus lain, lihat[Gunakan committer yang dioptimalkan EMRFS S3](emr-spark-s3-optimized-committer.md). | 

Contoh Scala berikut menunjukkan beberapa situasi tambahan yang didelegasikan oleh protokol komit yang dioptimalkan EMRFS S3. `SQLHadoopMapReduceCommitProtocol`

**Example - Mode penimpaan partisi dinamis dengan lokasi partisi khusus**  
Dalam contoh ini, program Scala menimpa dua partisi dalam mode penimpaan partisi dinamis. Satu partisi memiliki lokasi partisi kustom. Partisi lain menggunakan lokasi partisi default. Protokol komit yang dioptimalkan EMRFS S3 hanya meningkatkan partisi yang menggunakan lokasi partisi default.  

```
val table = "dataset"
val inputView = "tempView"
val location = "s3://bucket/table"
                            
spark.sql(s"""
  CREATE TABLE $table (id bigint, dt date) 
  USING PARQUET PARTITIONED BY (dt) 
  LOCATION '$location'
""")

// Add a partition using a custom location
val customPartitionLocation = "s3://bucket/custom"
spark.sql(s"""
  ALTER TABLE $table ADD PARTITION (dt='2019-01-28') 
  LOCATION '$customPartitionLocation'
""")

// Add another partition using default location
spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')")

def asDate(text: String) = lit(text).cast("date")   
                       
spark.range(0, 10)
  .withColumn("dt",
    when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29")))
  .createTempView(inputView)
  
// Set partition overwrite mode to 'dynamic'
spark.sql(s"SET spark.sql.sources.partitionOverwriteMode=dynamic")
  
spark.sql(s"INSERT OVERWRITE TABLE $table SELECT * FROM $inputView")
```
Kode Scala menciptakan objek Amazon S3 berikut:  

```
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$
```
Menulis ke lokasi partisi khusus di versi Spark sebelumnya dapat mengakibatkan kehilangan data. Dalam contoh ini, partisi `dt='2019-01-28'` akan hilang. Untuk lebih jelasnya, lihat [SPARK-35106](https://issues.apache.org/jira/browse/SPARK-35106). Ini diperbaiki di Amazon EMR rilis 5.33.0 dan yang lebih baru, tidak termasuk 6.0.x dan 6.1.x.

Ketika menulis ke partisi di lokasi kustom, Spark menggunakan algoritma komit mirip dengan contoh sebelumnya, yang diuraikan di bawah ini. Seperti contoh sebelumnya, algoritme menghasilkan penggantian nama berurutan, yang dapat berdampak negatif pada kinerja.

Algoritma di Spark 2.4.0 mengikuti langkah-langkah berikut:

1. Ketika menulis output ke partisi di lokasi kustom, tugas menulis ke file di bawah direktori pementasan Spark ini, yang dibuat di bawah lokasi output akhir. Nama file termasuk UUID acak untuk melindungi terhadap tabrakan file. Upaya tugas melacak setiap file bersama dengan path output akhir yang diinginkan.

1. Ketika tugas selesai berhasil, menyediakan driver dengan file dan akhir yang diinginkan output jalan mereka.

1. Setelah semua tugas selesai, pekerjaan commit fase berurutan mengganti nama semua file yang ditulis untuk partisi di lokasi kustom ke jalur output akhir mereka.

1. Direktori pementasan dihapus sebelum pekerjaan komit fase selesai.