Configure Spark
You can configure Spark on Amazon EMR
Configuration classifications for Spark on Amazon EMR include the following:
-
spark– Sets themaximizeResourceAllocationproperty to true or false. When true, Amazon EMR automatically configuresspark-defaultsproperties based on cluster hardware configuration. For more information, see Using maximizeResourceAllocation. -
spark-defaults– Sets values in thespark-defaults.conffile. For more information, see Spark configurationin the Spark documentation. -
spark-env– Sets values in thespark-env.shfile. For more information, see Environment variablesin the Spark documentation. -
spark-hive-site– Sets values in thehive-site.xmlfor Spark. -
spark-log4j– (Amazon EMR releases 6.7.x and lower) Sets values in thelog4j.propertiesfile. For more information, see the log4j.properties.templatefile on Github. -
spark-log4j2– (Amazon EMR releases 6.8.0 and higher) Sets values in thelog4j2.propertiesfile. For more information, see the log4j2.properties.templatefile on Github. -
spark-metrics– Sets values in themetrics.propertiesfile. For settings and more information, see the metrics.properties.templatefile on Github, and Metrics in Spark documentation.
Note
If you're migrating Spark workloads to Amazon EMR from another platform, we recommend that you test your workloads with the Spark defaults set by Amazon EMR before you add custom configurations. Most customers see improved performance with our default settings.
Topics
Spark defaults set by Amazon EMR
The following table shows how Amazon EMR sets default values in
spark-defaults that affect applications.
| Setting | Description | Default value |
|---|---|---|
spark.executor.memory |
The amount of memory to use per executor process. For
example: |
This setting is determined by the core and task instance types in the cluster. |
spark.executor.cores |
The number of cores to use on each executor. |
This setting is determined by the core and task instance types in the cluster. |
spark.dynamicAllocation.enabled |
When true, use dynamic resource allocation to scale the number of executors registered with an application up and down based on the workload. |
NoteSpark shuffle service is automatically configured by Amazon EMR . |
spark.sql.hive.advancedPartitionPredicatePushdown.enabled |
When true, advanced partition predicate pushdown into Hive metastore is enabled. |
true |
spark.sql.hive.stringLikePartitionPredicatePushdown.enabled |
Pushes down NoteGlue doesn't support predicate push down for
|
true |
Configuring Spark garbage collection on Amazon EMR 6.1.0
Setting custom garbage collection configurations with
spark.driver.extraJavaOptions and
spark.executor.extraJavaOptions results in driver or executor
launch failure with Amazon EMR 6.1 because of a conflicting garbage collection
configuration with Amazon EMR 6.1.0. For Amazon EMR 6.1.0, the default garbage collection
configuration is set through spark.driver.defaultJavaOptions and
spark.executor.defaultJavaOptions. This configuration applies only
to Amazon EMR 6.1.0. JVM options not related to garbage collection, such as those for
configuring logging (-verbose:class), can still be set through
extraJavaOptions. For more information, see Spark application properties.
Using
maximizeResourceAllocation
To configure your executors to use the maximum resources possible on each node in
a cluster, set maximizeResourceAllocation to true in your
spark configuration classification. The
maximizeResourceAllocation is specific to Amazon EMR . When you enable
maximizeResourceAllocation, Amazon EMR calculates the maximum compute
and memory resources available for an executor on an instance in the core instance
group. It then sets the corresponding spark-defaults settings based on
the calculated maximum values.
Amazon EMR calculates the maximum compute and memory resources available for an executor based on an instance type from the core instance fleet. Since each instance fleet can have different instance types and sizes within a fleet, the executor configuration that Amazon EMR uses might not be the best for your clusters, so we don't recommend using the default settings when using maximum resource allocation. Configure custom settings for your instance fleet clusters.
Note
You should not use the maximizeResourceAllocation option on
clusters with other distributed applications like HBase. Amazon EMR uses custom YARN
configurations for distributed applications, which can conflict with
maximizeResourceAllocation and cause Spark applications to
fail.
The following is an example Spark configuration classification with
maximizeResourceAllocation set to true.
[ { "Classification": "spark", "Properties": { "maximizeResourceAllocation": "true" } } ]
| Setting | Description | Value |
|---|---|---|
| spark.default.parallelism | Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user. |
2X number of CPU cores available to YARN containers. |
| spark.driver.memory | Amount of memory to use for the driver process, i.e. where SparkContext is initialized. (for example, 1g, 2g). |
Setting is configured based on the instance types in the cluster. However, because the Spark driver application may run on either the primary or one of the core instances (for example, in YARN client and cluster modes, respectively), this is set based on the smaller of the instance types in these two instance groups. |
| spark.executor.memory | Amount of memory to use per executor process. (for example, 1g, 2g) |
Setting is configured based on the core and task instance types in the cluster. |
| spark.executor.cores | The number of cores to use on each executor. | Setting is configured based on the core and task instance types in the cluster. |
| spark.executor.instances | The number of executors. |
Setting is configured based on the core and task instance
types in the cluster. Set unless
|
Configuring node decommissioning behavior
With Amazon EMR release 5.9.0 and higher, Spark on Amazon EMR includes a set of features to help ensure that Spark gracefully handles node termination because of a manual resize or an automatic scaling policy request. Amazon EMR implements a deny listing mechanism in Spark that is built on top of the YARN decommissioning mechanism. This mechanism helps ensure that no new tasks are scheduled on a node that is decommissioning, while at the same time allowing tasks that are already running to complete. In addition, there are features to help recover Spark jobs faster if shuffle blocks are lost when a node terminates. The recomputation process is triggered sooner and optimized to recompute faster with fewer stage retries, and jobs can be prevented from failing because of fetch failures that are caused by missing shuffle blocks.
Important
The spark.decommissioning.timeout.threshold setting was added in
Amazon EMR release 5.11.0 to improve Spark resiliency when you use Spot instances. In
earlier releases, when a node uses a Spot instance, and the instance is
terminated because of bid price, Spark may not be able to handle the termination
gracefully. Jobs may fail, and shuffle recomputations could take a significant
amount of time. For this reason, we recommend using release 5.11.0 or later if
you use Spot instances.
| Setting | Description | Default value |
|---|---|---|
|
|
When set to |
|
|
|
The amount of time that a node in the |
|
|
|
Available in Amazon EMR release 5.11.0 or later. Specified in seconds.
When a node transitions to the decommissioning state, if the host
will decommission within a time period equal to or less than this
value, Amazon EMR not only deny lists the node, but also cleans up the
host state (as specified by
|
|
|
|
When set to |
|
|
|
When set to |
true |
Spark ThriftServer environment variable
Spark sets the Hive Thrift Server Port environment variable,
HIVE_SERVER2_THRIFT_PORT, to 10001.
Changing Spark default settings
You change the defaults in spark-defaults.conf using the
spark-defaults configuration classification or the
maximizeResourceAllocation setting in the spark
configuration classification.
The following procedures show how to modify settings using the CLI or console.
To create a cluster with spark.executor.memory set to 2g using the CLI
-
Create a cluster with Spark installed and
spark.executor.memoryset to 2g, using the following command, which references a file,myConfig.jsonstored in Amazon S3.aws emr create-cluster --release-labelemr-7.10.0--applications Name=Spark \ --instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole --configurations https://s3.amazonaws.com/amzn-s3-demo-bucket/myfolder/myConfig.jsonNote
Linux line continuation characters (\) are included for readability. They can be removed or used in Linux commands. For Windows, remove them or replace with a caret (^).
myConfig.json:[ { "Classification": "spark-defaults", "Properties": { "spark.executor.memory": "2G" } } ]
To create a cluster with spark.executor.memory set to 2g using the console
Navigate to the new Amazon EMR console and select Switch to the old console from the side navigation. For more information on what to expect when you switch to the old console, see Using the old console.
-
Choose Create cluster, Go to advanced options.
-
Choose Spark.
-
Under Edit software settings, leave Enter configuration selected and enter the following configuration:
classification=spark-defaults,properties=[spark.executor.memory=2G] -
Select other options, choose and then choose Create cluster.
To set maximizeResourceAllocation
-
Create a cluster with Spark installed and
maximizeResourceAllocationset to true using the AWS CLI, referencing a file,myConfig.json, stored in Amazon S3.aws emr create-cluster --release-labelemr-7.10.0--applications Name=Spark \ --instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole --configurations https://s3.amazonaws.com/amzn-s3-demo-bucket/myfolder/myConfig.jsonNote
Linux line continuation characters (\) are included for readability. They can be removed or used in Linux commands. For Windows, remove them or replace with a caret (^).
myConfig.json:[ { "Classification": "spark", "Properties": { "maximizeResourceAllocation": "true" } } ]
Note
With Amazon EMR version 5.21.0 and later, you can override cluster configurations and specify additional configuration classifications for each instance group in a running cluster. You do this by using the Amazon EMR console, the AWS Command Line Interface (AWS CLI), or the AWS SDK. For more information, see Supplying a Configuration for an Instance Group in a Running Cluster.
Migrating from Apache Log4j 1.x to Log4j 2.x
Apache Sparklog4j.properties file to configure Log4j in Spark processes. Apache
Spark releases 3.3.0 and later use Apache Log4j 2.x and the
log4j2.properties file to configure Log4j in Spark
processes.
If you have configured Apache Spark Log4j using an Amazon EMR release lower than 6.8.0,
then you must remove the legacy spark-log4j configuration
classification and migrate to the spark-log4j2 configuration
classification and key format before you can upgrade to Amazon EMR 6.8.0 or later. The
legacy spark-log4j classification causes cluster creation to fail with
a ValidationException error in Amazon EMR releases 6.8.0 and
later. You will not be charged for a failure related to the Log4j incompatibility,
but you must remove the defunct spark-log4j configuration
classification to continue.
For more information about migrating from Apache Log4j 1.x to Log4j 2.x, see the
Apache
Log4j Migration Guide
Note
With Amazon EMR , Apache Spark uses a log4j2.properties file rather
than the .xml file described in the Apache
Log4j Migration Guide