

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# 태스크 병렬화
<a name="parallelize-tasks"></a>

성능을 최적화하려면 데이터 로드 및 변환에 대한 태스크를 병렬화하는 것이 중요합니다. [Apache Spark의 주요 주제](key-topics-apache-spark.md)에서 설명한 것처럼 복원력이 뛰어난 분산 데이터세트(RDD) 파티션의 수는 병렬화 수준을 결정하기 때문에 중요합니다. Spark가 생성하는 각 태스크는 1:1 기준으로 RDD 파티션에 해당합니다. 최상의 성능을 얻으려면 RDD 파티션 수를 결정하는 방법과 해당 수를 최적화하는 방법을 이해해야 합니다.

병렬 처리가 충분하지 않은 경우 [CloudWatch 지표](https://docs.aws.amazon.com/glue/latest/dg/monitoring-awsglue-with-cloudwatch-metrics.html) 및 Spark UI에 다음 증상이 기록됩니다.

## CloudWatch 지표
<a name="parallelize-metrics"></a>

**CPU 로드** 및 **메모리 사용률**을 확인합니다. 작업 단계에서 일부 실행기가 처리 중이 아닌 경우 병렬 처리를 개선하는 것이 좋습니다. 이 경우 시각화된 기간에 **실행기 1**이 태스크를 수행했지만 나머지 실행기(2, 3, 4)는 수행하지 않았습니다. Spark 드라이버에서 해당 실행기에 태스크가 할당되지 않았다고 추론할 수 있습니다.



![드라이버와 하나의 실행기만 보여주는 그래프.](http://docs.aws.amazon.com/ko_kr/prescriptive-guidance/latest/tuning-aws-glue-for-apache-spark/images/cpu-load.png)


## Spark UI
<a name="parallelize-spark"></a>

Spark UI의 **Stage** 탭에서 단계의 *태스크 수*를** **볼 수 있습니다. 이 경우 Spark는 하나의 태스크만 수행했습니다.



![""](http://docs.aws.amazon.com/ko_kr/prescriptive-guidance/latest/tuning-aws-glue-for-apache-spark/images/stage-tasks.png)


또한 이벤트 타임라인에는 **실행기 1**이 하나의 태스크를 처리하는 것으로 표시됩니다. 즉, 이 단계의 작업은 하나의 실행기에서 완전히 수행되었고 다른 실행기는 유휴 상태였습니다.



![하나의 태스크만 보여주는 이벤트 타임라인.](http://docs.aws.amazon.com/ko_kr/prescriptive-guidance/latest/tuning-aws-glue-for-apache-spark/images/event-timeline-2.png)


이러한 증상이 관찰되면 각 데이터 소스에 대해 다음 솔루션을 시도해 보세요.

### Amazon S3에서 데이터 로드 병렬화
<a name="parallelize-data-load"></a>

Amazon S3에서 데이터 로드를 병렬화하려면 먼저 기본 파티션 수를 확인합니다. 그런 다음 대상 파티션 수를 수동으로 결정할 수 있지만 파티션이 너무 많지 않도록 해야 합니다.

*기본 파티션 수 결정*

Amazon S3의 경우 초기 Spark RDD 파티션 수(각각 Spark 태스크에 해당)는 Amazon S3 데이터세트의 기능(예: 형식, 압축 및 크기)에 따라 결정됩니다. Amazon S3에 저장된 CSV 객체에서 AWS Glue DynamicFrame 또는 Spark DataFrame을 생성하면 다음과 같이 초기 RDD 파티션(`NumPartitions`) 수를 대략적으로 계산할 수 있습니다.
+ 객체 크기가 64MB 이하인 경우: `NumPartitions = Number of Objects`
+ 객체 크기가 64MB를 초과하는 경우: `NumPartitions = Total Object Size / 64 MB`
+ 분할 불가(gzip): `NumPartitions = Number of Objects`

[데이터 스캔 양 감소](reduce-data-scan.md) 섹션에서 설명한 대로 Spark는 큰 S3 객체를 병렬로 처리할 수 있는 분할로 나눕니다. 객체가 분할 크기보다 크면 Spark는 객체를 분할하고 각 분할에 대해 RDD 파티션(및 태스크)을 생성합니다. Spark의 분할 크기는 데이터 형식과 런타임 환경에 기반하지만 이는 합리적인 시작 근사치입니다. 일부 객체는 gzip과 같은 분할할 수 없는 압축 형식을 사용하여 압축되므로 Spark는 이를 분할할 수 없습니다.

`NumPartitions` 값은 데이터 형식, 압축, AWS Glue 버전, AWS Glue 작업자 수 및 Spark 구성에 따라 다를 수 있습니다.

예를 들어 Spark DataFrame을 사용하여 단일 10GB `csv.gz` 객체를 로드하면 gzip은 분할할 수 없으므로 Spark 드라이버는 하나의 RDD 파티션(`NumPartitions=1`)만 생성합니다. 따라서 다음 그림에 설명된 대로 하나의 특정 Spark 실행기에 과도한 로드가 생기고 나머지 실행기에 태스크가 할당되지 않습니다.

[Spark Web UI](https://docs.aws.amazon.com/glue/latest/dg/monitor-spark-ui.html) **단계** 탭에서 단계의 실제 태스크 수(`NumPartitions`)를 확인하거나 코드에서 `df.rdd.getNumPartitions()`를 실행하여 병렬화를 확인합니다.

10GB gzip 파일이 생기면 해당 파일을 생성하는 시스템이 분할 가능한 형식으로 파일을 생성할 수 있는지 검사합니다. 옵션이 아닌 경우 [클러스터 용량을 조정](scale-cluster-capacity.md)하여 파일을 처리해야 할 수 있습니다. 로드한 데이터에 대해 변환을 효율적으로 실행하려면 다시 분할을 사용하여 클러스터의 작업자 간에 RDD를 재조정해야 합니다.

*수동으로 대상 파티션 수 확인*

데이터의 속성과 Spark의 특정 기능 구현에 따라 기본 작업을 계속 병렬화할 수 있더라도 `NumPartitions` 값이 낮아질 수 있습니다. `NumPartitions`가 너무 작은 경우 `df.repartition(N)`을 실행하여 파티션 수를 늘리면 처리가 여러 Spark 실행기에 분산될 수 있습니다.

이 경우 `df.repartition(100)`을 실행하면 `NumPartitions`가 1에서 100으로 증가하고 각각 다른 실행기에 할당할 수 있는 태스크를 포함하는 100개의 데이터 파티션이 생성됩니다.

`repartition(N)` 작업은 전체 데이터를 균등하게 나누어(10GB/100개 파티션 = 100MB/파티션) 특정 파티션에 대한 데이터 스큐를 방지합니다.

**참고**  
`join`과 같은 셔플 작업이 실행되면 `spark.sql.shuffle.partitions` 또는 `spark.default.parallelism`의 값에 따라 파티션 수가 동적으로 증가하거나 감소합니다. 이를 통해 Spark 실행기 사이에서 데이터를 보다 효율적으로 교환할 수 있습니다. 자세한 내용은 [Spark 설명서](https://spark.apache.org/docs/latest/configuration.html#runtime-sql-configuration)를 참조하세요.

파티션의 목표 수를 결정할 때 목표는 프로비저닝된 AWS Glue 작업자의 사용을 극대화하는 것입니다. AWS Glue 작업자 수와 Spark 작업 수는 vCPUs. Spark는 각 vCPU 코어에 대해 하나의 태스크를 지원합니다. AWS Glue 버전 3.0 이상에서는 다음 공식을 사용하여 대상 파티션 수를 계산할 수 있습니다.

```
# Calculate NumPartitions by WorkerType
numExecutors = (NumberOfWorkers - 1)
numSlotsPerExecutor = 
  4 if WorkerType is G.1X
  8 if WorkerType is G.2X
  16 if WorkerType is G.4X
  32 if WorkerType is G.8X
NumPartitions = numSlotsPerExecutor * numExecutors

# Example: Glue 4.0 / G.1X / 10 Workers
numExecutors = ( 10 - 1 ) = 9  # 1 Worker reserved on Spark Driver
numSlotsPerExecutor       = 4  # G.1X has 4 vCpu core ( Glue 3.0 or later )
NumPartitions = 9  * 4    = 36
```

이 예제에서 각 G.1X 작업자는 Spark 실행기(`spark.executor.cores = 4`)에 4개의 vCPU 코어를 제공합니다. Spark는 각 vCPU 코어에 대해 하나의 태스크를 지원하므로 G.1X Spark 실행기는 4개의 태스크(`numSlotPerExecutor`)를 동시에 실행할 수 있습니다. 태스크에 동일한 시간이 걸리는 경우 이 파티션 수는 클러스터를 완전히 사용합니다. 그러나 일부 태스크가 다른 태스크보다 오래 걸리므로 유휴 코어가 생성됩니다. 이 경우 병목 현상 태스크를 나누고 효율적으로 예약하려면 `numPartitions`에 2 또는 3을 곱하는 방법을 고려합니다.

*파티션이 너무 많음*

파티션 수가 너무 많으면 태스크 수가 너무 많습니다. 이로 인해 Spark 실행기 간의 관리 태스크 및 데이터 교환과 같은 분산 처리와 관련된 오버헤드 때문에 Spark 드라이버에 과부하가 발생합니다.

작업의 파티션 수가 목표 파티션 수보다 훨씬 큰 경우 파티션 수를 줄이는 방법을 고려합니다. 다음 옵션을 사용하여 파티션을 줄일 수 있습니다.
+ 파일 크기가 매우 작은 경우 AWS Glue [groupFiles](https://docs.aws.amazon.com/glue/latest/dg/grouping-input-files.html)를 사용합니다. Apache Spark 태스크 시작으로 인한 과도한 병렬화를 줄여 각 파일을 처리할 수 있습니다.
+ `coalesce(N)`를 사용하여 파티션을 병합합니다. 이는 비용이 저렴한 프로세스입니다. 파티션 수를 줄이면 `repartition(N)`에서 셔플을 수행하여 각 파티션에 균등하게 레코드의 양을 분산하므로 `coalesce(N)`가 `repartition(N)`보다 선호됩니다. 이 경우 비용과 관리 오버헤드가 증가합니다.
+ Spark 3.x 적응형 쿼리 실행을 사용합니다. [Apache Spark 섹션의 주요 주제](key-topics-apache-spark.md)에서 설명한 대로 적응형 쿼리 실행은 파티션 수를 자동으로 병합하는 함수를 제공합니다. 실행을 수행할 때까지 파티션 수를 알 수 없는 경우 이 접근 방식을 사용할 수 있습니다.

### JDBC에서 데이터 로드 병렬화
<a name="parallelize-data-load-jdbc"></a>

Spark RDD 파티션 수는 구성에 의해 결정됩니다. 기본적으로 `SELECT` 쿼리를 통해 전체 소스 데이터세트를 스캔하기 위해 단일 태스크만 실행됩니다.

 AWS Glue DynamicFrames과 Spark DataFrames 여러 작업에서 병렬화된 JDBC 데이터 로드를 지원합니다. 이는 `where` 조건자를 사용하여 하나의 `SELECT` 쿼리를 여러 쿼리로 분할하여 수행됩니다. JDBC에서 읽기를 병렬화하려면 다음 옵션을 구성합니다.
+ For AWS Glue DynamicFrame, 설정`hashfield`(또는 `hashexpression)` 및 `hashpartition`. 자세한 내용은 [JDBC 테이블을 병렬로 읽기](https://docs.aws.amazon.com/glue/latest/dg/run-jdbc-parallel-read-job.html)를 참조하세요.

  ```
  connection_mysql8_options = {
      "url": "jdbc:mysql://XXXXXXXXXX.XXXXXXX.us-east-1.rds.amazonaws.com:3306/test",
      "dbtable": "medicare_tb",
      "user": "test",
      "password": "XXXXXXXXX",
      "hashexpression":"id",
      "hashpartitions":"10"
  }
  datasource0 = glueContext.create_dynamic_frame.from_options(
      'mysql', 
      connection_options=connection_mysql8_options,
      transformation_ctx= "datasource0"
  )
  ```
+ Spark DataFrame의 경우 `numPartitions`, `partitionColumn`, `lowerBound`, `upperBound`를 설정합니다. 자세한 내용은 [JDBC To Other Databases](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)를 참조하세요.

  ```
  df = spark.read \
      .format("jdbc") \
      .option("url", "jdbc:mysql://XXXXXXXXXX.XXXXXXX.us-east-1.rds.amazonaws.com:3306/test") \
      .option("dbtable", "medicare_tb") \
      .option("user", "test") \
      .option("password", "XXXXXXXXXX") \
      .option("partitionColumn", "id") \
      .option("numPartitions", "10") \
      .option("lowerBound", "0") \
      .option("upperBound", "1141455") \
      .load()
  
  df.write.format("json").save("s3://bucket_name/Tests/sparkjdbc/with_parallel/")
  ```

### ETL 커넥터를 사용할 경우 DynamoDB에서 데이터 로드 병렬화
<a name="dynamodb-etl-connector"></a>

Spark RDD 파티션 수는 `dynamodb.splits` 파라미터에 의해 결정됩니다. Amazon DynamoDB에서 읽기를 병렬화하려면 다음 옵션을 구성합니다.
+ `dynamodb.splits`의 값을 늘립니다.
+ for [Spark의 ETL에 대한 연결 유형 및 옵션에 설명된 공식에 따라 파라미터를 최적화 AWS Glue 합니다](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-connect.html#aws-glue-programming-etl-connect-dynamodb).

### Kinesis Data Streams에서 데이터 로드 병렬화
<a name="kinesis-data-streams"></a>

Spark RDD 파티션 수는 소스 Amazon Kinesis Data Streams 데이터 스트림의 샤드 수에 의해 결정됩니다. 데이터 스트림에 샤드가 몇 개뿐인 경우 Spark 태스크는 몇 개뿐입니다. 이로 인해 다운스트림 프로세스에서 병렬화가 낮아질 수 있습니다. Kinesis Data Streams에서 읽기를 병렬화하려면 다음 옵션을 구성합니다.
+ Kinesis Data Streams에서 데이터를 로드할 경우 더 많은 병렬화를 얻으려면 샤드 수를 늘립니다.
+ 마이크로 배치의 로직이 충분히 복잡한 경우 불필요한 열을 삭제한 후 배치 시작 시 데이터를 재분할하는 방법을 고려합니다.

자세한 내용은 [AWS Glue 스트리밍 ETL 작업의 비용 및 성능을 최적화하는 모범 사례를 참조하세요](https://aws.amazon.com/blogs/big-data/best-practices-to-optimize-cost-and-performance-for-aws-glue-streaming-etl-jobs/).

### 데이터 로드 후 태스크 병렬화
<a name="parallelize-after"></a>

데이터 로드 후 태스크를 병렬화하려면 다음 옵션을 사용하여 RDD 파티션 수를 늘립니다.
+ 특히 로드 자체를 병렬화할 수 없는 경우 초기 로드 직후 더 많은 수의 파티션을 생성하기 위해 데이터를 다시 분할합니다.

  파티션 수를 지정하여 DynamicFrame 또는 DataFrame에서 `repartition()`을 직접 호출합니다. 일반적으로 사용 가능한 코어 수의 2\~3배가 적당합니다.

  그러나 분할된 테이블을 작성할 경우 파일이 크게 증가할 수 있습니다(각 파티션이 잠재적으로 각 테이블 파티션에 파일을 생성할 수 있음). 이를 방지하기 위해 열을 기준으로 DataFrame을 다시 분할할 수 있습니다. 이렇게 하면 쓰기 전에 데이터가 구성되도록 테이블 파티션 열이 사용됩니다. 테이블 파티션에서 작은 파일을 가져오지 않고도 더 많은 수의 파티션을 지정할 수 있습니다. 그러나 일부 파티션 값이 대부분의 데이터로 끝나고 작업 완료가 지연되는 데이터 스큐를 피하도록 주의하세요.
+ 셔플이 있는 경우 `spark.sql.shuffle.partitions` 값을 늘립니다. 또한 셔플 작업 시 메모리 문제를 해결하는 데 도움이 될 수 있습니다.

  2,001개가 넘는 셔플 파티션이 있는 경우 Spark는 압축된 메모리 형식을 사용합니다. 숫자가 이에 근사한 경우 보다 효율적인 표현을 얻기 위해 해당 한도를 초과하는 `spark.sql.shuffle.partitions` 값을 설정하려고 할 수 있습니다.