

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 平行處理任務
<a name="parallelize-tasks"></a>

若要最佳化效能，請務必平行處理資料載入和轉換的任務。正如我們在 [Apache Spark 中討論的關鍵主題](key-topics-apache-spark.md)所述，彈性分散式資料集 (RDD) 分割區的數量很重要，因為它決定平行處理的程度。Spark 建立的每個任務都會以 1：1 為基礎對應至 RDD 分割區。若要獲得最佳效能，您需要了解如何決定 RDD 分割區的數量，以及如何最佳化該數量。

如果您沒有足夠的平行處理，以下症狀將記錄在 [CloudWatch 指標](https://docs.aws.amazon.com/glue/latest/dg/monitoring-awsglue-with-cloudwatch-metrics.html)和 Spark UI 中。

## CloudWatch 指標
<a name="parallelize-metrics"></a>

檢查 **CPU 負載**和**記憶體使用率**。如果某些執行器未在任務的某個階段期間處理，則適合改善平行處理。在此情況下，在視覺化時間範圍內，**執行器 1** 正在執行任務，但剩餘的執行器 (2、3 和 4) 並未執行。您可以推斷 Spark 驅動程式未指派這些執行器的任務。



![\[圖表僅顯示驅動程式和一個執行器。\]](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/tuning-aws-glue-for-apache-spark/images/cpu-load.png)


## Spark 使用者介面
<a name="parallelize-spark"></a>

在 Spark UI 的**階段**索引標籤上，您可以查看階段中的任務** **數量。* *在此情況下，Spark 只執行一個任務。



![\[""\]](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/tuning-aws-glue-for-apache-spark/images/stage-tasks.png)


此外，事件時間軸會顯示**執行器 1** 處理一個任務。這表示此階段中的工作完全在某個執行器上執行，而其他執行器則處於閒置狀態。



![\[僅顯示一個任務的事件時間軸。\]](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/tuning-aws-glue-for-apache-spark/images/event-timeline-2.png)


如果您觀察到這些症狀，請嘗試每個資料來源的下列解決方案。

### 從 Amazon S3 平行載入資料
<a name="parallelize-data-load"></a>

若要平行處理來自 Amazon S3 的資料載入，請先檢查預設的分割區數量。然後，您可以手動判斷分割區的目標數量，但請務必避免擁有太多分割區。

*決定預設的分割區數量*

對於 Amazon S3，初始數量的 Spark RDD 分割區 （每個分割區對應至 Spark 任務） 取決於 Amazon S3 資料集的功能 （例如，格式、壓縮和大小）。當您從存放在 Amazon S3 中的 CSV 物件建立 an AWS Glue DynamicFrame 或 Spark DataFrame 時，初始 RDD 分割區數量 (`NumPartitions`) 可大致如下計算：
+ 物件大小 <= 64 MB： `NumPartitions = Number of Objects`
+ 物件大小 > 64 MB： `NumPartitions = Total Object Size / 64 MB`
+ 不可分割 (gzip)： `NumPartitions = Number of Objects`

如[減少資料掃描量](reduce-data-scan.md)一節所述，Spark 會將大型 S3 物件分成可平行處理的分割。當物件大於分割大小時，Spark 會分割物件，並為每個分割建立 RDD 分割區 （和任務）。Spark 的分割大小取決於您的資料格式和執行時間環境，但這是合理的開始近似值。有些物件是使用 gzip 等不可分割的壓縮格式進行壓縮，因此 Spark 無法分割它們。

`NumPartitions` 值可能會根據您的資料格式、壓縮、 AWS Glue 版本、 AWS Glue 工作者數量和 Spark 組態而有所不同。

例如，當您使用 Spark DataFrame 載入單一 10 GB `csv.gz` 物件時，Spark 驅動程式只會建立一個 RDD 分割區 (`NumPartitions=1`)，因為 gzip 是不可分割的。這會導致一個特定 Spark 執行器的繁重負載，並且不會將任何任務指派給剩餘的執行器，如下圖所述。

在 [Spark Web UI 階段索引標籤上檢查](https://docs.aws.amazon.com/glue/latest/dg/monitor-spark-ui.html)**階段**的實際任務數量 (`NumPartitions`)，或在程式碼`df.rdd.getNumPartitions()`中執行 以檢查平行處理。

遇到 10 GB gzip 檔案時，請檢查產生該檔案的系統是否可以以可分割格式產生該檔案。如果這不是選項，您可能需要[擴展叢集容量](scale-cluster-capacity.md)來處理檔案。若要有效對您載入的資料執行轉換，您需要使用重新分割來重新平衡叢集中整個工作者的 RDD。

*手動判斷分割區的目標數量*

視資料屬性和 Spark 實作特定功能而定，即使基礎工作仍可平行化，最終仍可能具有較低的`NumPartitions`值。如果 `NumPartitions` 太小，請執行 `df.repartition(N)` 以增加分割區數量，以便將處理分散到多個 Spark 執行器。

在這種情況下，執行 `NumPartitions` `df.repartition(100)`會從 1 增加到 100，建立 100 個資料分割區，每個分割區都有可指派給其他執行器的任務。

操作會將整個資料平均`repartition(N)`分割 (10 GB/100 個分割區 = 100 MB/分割區），避免資料扭曲至特定分割區。

**注意**  
`join` 執行 等隨機播放操作時，分割區的數量會根據 `spark.sql.shuffle.partitions`或 的值動態增加或減少`spark.default.parallelism`。這有助於在 Spark 執行器之間更有效率地交換資料。如需詳細資訊，請參閱 [Spark 文件](https://spark.apache.org/docs/latest/configuration.html#runtime-sql-configuration)。

您在決定分割區的目標數量時，目標是最大化佈建 AWS Glue 工作者的使用率。 AWS Glue 工作者數量和 Spark 任務數量與 vCPUs 數量相關。Spark 支援每個 vCPU 核心一個任務。在 3.0 AWS Glue 版或更新版本中，您可以使用下列公式計算分割區的目標數量。

```
# Calculate NumPartitions by WorkerType
numExecutors = (NumberOfWorkers - 1)
numSlotsPerExecutor = 
  4 if WorkerType is G.1X
  8 if WorkerType is G.2X
  16 if WorkerType is G.4X
  32 if WorkerType is G.8X
NumPartitions = numSlotsPerExecutor * numExecutors

# Example: Glue 4.0 / G.1X / 10 Workers
numExecutors = ( 10 - 1 ) = 9  # 1 Worker reserved on Spark Driver
numSlotsPerExecutor       = 4  # G.1X has 4 vCpu core ( Glue 3.0 or later )
NumPartitions = 9  * 4    = 36
```

在此範例中，每個 G.1X 工作者都會提供四個 vCPU 核心給 Spark 執行器 (`spark.executor.cores = 4`)。Spark 為每個 vCPU Core 支援一個任務，因此 G.1X Spark 執行器可以同時執行四個任務 (`numSlotPerExecutor`)。如果任務需要相同的時間量，則此分割區數量會充分利用叢集。不過，某些任務需要比其他任務更長的時間，才能建立閒置核心。如果發生這種情況，請考慮`numPartitions`將 2 或 3 相乘，以分解並有效率地排程瓶頸任務。

*太多分割區*

過多的分割區會建立過多的任務。這會導致 Spark 驅動程式負載過重，因為與分散式處理相關的額外負荷，例如管理任務和 Spark 執行器之間的資料交換。

如果您任務中的分割區數量遠大於您的分割區目標數量，請考慮減少分割區數量。您可以使用下列選項來減少分割區：
+ 如果您的檔案大小很小，請使用 AWS Glue [groupFiles](https://docs.aws.amazon.com/glue/latest/dg/grouping-input-files.html)。您可以減少啟動 Apache Spark 任務以處理每個檔案所產生的過度平行處理。
+ 使用 `coalesce(N)`將分割區合併在一起。這是一個低成本的程序。減少分割區數量時， `coalesce(N)` 優先於 `repartition(N)`，因為 會`repartition(N)`執行隨機播放來平均分配每個分割區中的記錄數量。這會增加成本和管理開銷。
+ 使用 Spark 3.x 自適應查詢執行。如 [Apache Spark 章節中的關鍵主題](key-topics-apache-spark.md)所述，自適應查詢執行會提供函數，以自動合併分割區的數量。當您在執行執行之前不知道分割區數量時，可以使用此方法。

### 從 JDBC 平行載入資料
<a name="parallelize-data-load-jdbc"></a>

Spark RDD 分割區的數量取決於組態。請注意，根據預設，只會執行單一任務來透過`SELECT`查詢掃描整個來源資料集。

 AWS Glue DynamicFrames 和 Spark DataFrames 都支援跨多個任務的平行 JDBC 資料載入。這可透過使用`where`述詞將一個`SELECT`查詢分割為多個查詢來完成。若要平行化 JDBC 的讀取，請設定下列選項：
+ For AWS Glue DynamicFrame，設定 `hashfield`（或 `hashexpression)`和 `hashpartition`。若要進一步了解，請參閱[平行讀取 JDBC 資料表](https://docs.aws.amazon.com/glue/latest/dg/run-jdbc-parallel-read-job.html)。

  ```
  connection_mysql8_options = {
      "url": "jdbc:mysql://XXXXXXXXXX.XXXXXXX.us-east-1.rds.amazonaws.com:3306/test",
      "dbtable": "medicare_tb",
      "user": "test",
      "password": "XXXXXXXXX",
      "hashexpression":"id",
      "hashpartitions":"10"
  }
  datasource0 = glueContext.create_dynamic_frame.from_options(
      'mysql', 
      connection_options=connection_mysql8_options,
      transformation_ctx= "datasource0"
  )
  ```
+ 針對 Spark DataFrame，設定 `numPartitions`、`lowerBound`、 `partitionColumn`和 `upperBound`。若要進一步了解，請參閱 [JDBC 至其他資料庫](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)。

  ```
  df = spark.read \
      .format("jdbc") \
      .option("url", "jdbc:mysql://XXXXXXXXXX.XXXXXXX.us-east-1.rds.amazonaws.com:3306/test") \
      .option("dbtable", "medicare_tb") \
      .option("user", "test") \
      .option("password", "XXXXXXXXXX") \
      .option("partitionColumn", "id") \
      .option("numPartitions", "10") \
      .option("lowerBound", "0") \
      .option("upperBound", "1141455") \
      .load()
  
  df.write.format("json").save("s3://bucket_name/Tests/sparkjdbc/with_parallel/")
  ```

### 使用 ETL 連接器時，從 DynamoDB 平行載入資料
<a name="dynamodb-etl-connector"></a>

Spark RDD 分割區的數量由 `dynamodb.splits` 參數決定。若要平行處理來自 Amazon DynamoDB 的讀取，請設定下列選項：
+ 增加 的值`dynamodb.splits`。
+ 依照 for [Spark 中 ETL 的連線類型和選項中說明 AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-connect.html#aws-glue-programming-etl-connect-dynamodb)的公式來最佳化 參數。

### 從 Kinesis Data Streams 平行載入資料
<a name="kinesis-data-streams"></a>

Spark RDD 分割區的數量取決於來源 Amazon Kinesis Data Streams 資料串流中的碎片數量。如果您的資料串流中只有幾個碎片，則只會有幾個 Spark 任務。這可能會導致下游程序的平行處理率低。若要平行化 Kinesis Data Streams 的讀取，請設定下列選項：
+ 從 Kinesis Data Streams 載入資料時，增加碎片數量以取得更多平行處理。
+ 如果您在微型批次中的邏輯夠複雜，請考慮在捨棄不需要的資料欄之後，在批次開始時重新分割資料。

如需詳細資訊，請參閱[最佳化 AWS Glue 串流 ETL 任務成本和效能的最佳實務](https://aws.amazon.com/blogs/big-data/best-practices-to-optimize-cost-and-performance-for-aws-glue-streaming-etl-jobs/)。

### 資料載入後平行處理任務
<a name="parallelize-after"></a>

若要在資料載入後平行處理任務，請使用下列選項來增加 RDD 分割區的數量：
+ 重新分割資料以產生更多分割區，特別是如果負載本身無法平行化，則在初始載入之後。

  在 DynamicFrame 或 DataFrame `repartition()`上呼叫 ，指定分割區的數量。良好的經驗法則是可用核心數目的兩倍或三倍。

  不過，寫入分割的資料表時，這可能會導致檔案爆炸 （每個分割區可能會在每個資料表分割區中產生檔案）。若要避免這種情況，您可以依資料欄重新分割 DataFrame。這會使用資料表分割區資料欄，以便在寫入之前整理資料。您可以指定更多分割區，而不會在資料表分割區上取得小型檔案。不過，請小心避免資料扭曲，其中某些分割區值最終會變成大部分的資料，並延遲任務的完成。
+ 有隨機播放時，請增加 `spark.sql.shuffle.partitions`值。這也有助於解決隨機播放時的任何記憶體問題。

  當您有超過 2，001 個隨機分割區時，Spark 會使用壓縮的記憶體格式。如果您的數字接近此值，您可能想要設定超過該限制`spark.sql.shuffle.partitions`的值，以取得更有效率的表示法。