As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Paralelize tarefas
Para otimizar o desempenho, é importante paralelizar tarefas para cargas e transformações de dados. Conforme discutimos em Tópicos principais do Apache Spark, o número de partições resilientes de conjunto de dados distribuído (RDD) é importante porque determina o grau de paralelismo. Cada tarefa que o Spark cria corresponde a uma partição RDD em uma base 1:1. Para obter o melhor desempenho, você precisa entender como o número de partições RDD é determinado e como esse número é otimizado.
Se você não tiver paralelismo suficiente, os seguintes sintomas serão registrados nas CloudWatchmétricas e na interface do usuário do Spark.
CloudWatch métricas
Verifique a carga da CPU e a utilização da memória. Se alguns executores não estiverem processando durante uma fase do seu trabalho, é apropriado melhorar o paralelismo. Nesse caso, durante o período visualizado, o Executor 1 estava executando uma tarefa, mas os demais executores (2, 3 e 4) não. Você pode inferir que esses executores não receberam tarefas atribuídas pelo driver do Spark.

IU do Spark
Na guia Estágio na interface do Spark, você pode ver o número de tarefas em um estágio. Nesse caso, o Spark executou apenas uma tarefa.

Além disso, a linha do tempo do evento mostra o Executor 1 processando uma tarefa. Isso significa que o trabalho nesse estágio foi executado inteiramente em um executor, enquanto os outros estavam ociosos.

Se você observar esses sintomas, tente as soluções a seguir para cada fonte de dados.
Paralelize a carga de dados do Amazon S3
Para paralelizar as cargas de dados do Amazon S3, primeiro verifique o número padrão de partições. Em seguida, você pode determinar manualmente um número alvo de partições, mas evite ter muitas partições.
Determine o número padrão de partições
Para o Amazon S3, o número inicial de partições RDD do Spark (cada uma delas corresponde a uma tarefa do Spark) é determinado pelos recursos do seu conjunto de dados do Amazon S3 (por exemplo, formato, compactação e tamanho). Quando você cria um AWS Glue DynamicFrame ou um Spark a DataFrame partir de objetos CSV armazenados no Amazon S3, o número inicial de partições RDD NumPartitions
() pode ser calculado aproximadamente da seguinte forma:
-
Tamanho do objeto <= 64 MB:
NumPartitions = Number of Objects
-
Tamanho do objeto > 64 MB:
NumPartitions = Total Object Size / 64 MB
-
Não divisível (gzip):
NumPartitions = Number of Objects
Conforme discutido na seção Reduzir a quantidade de escaneamento de dados, o Spark divide objetos grandes do S3 em divisões que podem ser processadas paralelamente. Quando o objeto é maior que o tamanho da divisão, o Spark divide o objeto e cria uma partição RDD (e uma tarefa) para cada divisão. O tamanho da divisão do Spark é baseado no formato dos dados e no ambiente de execução, mas essa é uma aproximação inicial razoável. Alguns objetos são compactados usando formatos de compactação não divisíveis, como gzip, então o Spark não pode dividi-los.
O NumPartitions
valor pode variar dependendo do formato dos dados, da compactação, da AWS Glue versão, do número de AWS Glue trabalhadores e da configuração do Spark.
Por exemplo, quando você carrega um único csv.gz
objeto de 10 GB usando um Spark DataFrame, o driver do Spark cria somente uma partição RDD (NumPartitions=1
) porque o gzip não pode ser dividido. Isso resulta em uma carga pesada em um executor específico do Spark e nenhuma tarefa é atribuída aos demais executores, conforme descrito na figura a seguir.
Verifique o número real de tarefas (NumPartitions
) do estágio na guia Spark Web UI Stage ou execute df.rdd.getNumPartitions()
seu código para verificar o paralelismo.
Ao encontrar um arquivo gzip de 10 GB, examine se o sistema que está gerando esse arquivo pode gerá-lo em um formato divisível. Se isso não for uma opção, talvez seja necessário escalar a capacidade do cluster para processar o arquivo. Para executar transformações com eficiência nos dados que você carregou, você precisará reequilibrar seu RDD entre os trabalhadores do seu cluster usando a repartição.
Determine manualmente um número alvo de partições
Dependendo das propriedades de seus dados e da implementação de determinadas funcionalidades pelo Spark, você pode acabar com um NumPartitions
valor baixo, mesmo que o trabalho subjacente ainda possa ser paralelizado. Se NumPartitions
for muito pequeno, execute df.repartition(N)
para aumentar o número de partições para que o processamento possa ser distribuído entre vários executores do Spark.
Nesse caso, a execução df.repartition(100)
aumentará NumPartitions
de 1 para 100, criando 100 partições de seus dados, cada uma com uma tarefa que pode ser atribuída aos outros executores.
A operação repartition(N)
divide os dados inteiros igualmente (10 GB/100 partições = 100 MB/partição), evitando a distorção de dados em determinadas partições.
nota
Quando uma operação aleatória como a join
é executada, o número de partições aumenta ou diminui dinamicamente, dependendo do valor de ou. spark.sql.shuffle.partitions
spark.default.parallelism
Isso facilita uma troca mais eficiente de dados entre os executores do Spark. Para obter mais informações, consulte a documentação do Spark
Sua meta ao determinar o número alvo de partições é maximizar o uso dos trabalhadores provisionados AWS Glue . O número de AWS Glue trabalhadores e o número de tarefas do Spark estão relacionados por meio do número de v. CPUs O Spark oferece suporte a uma tarefa para cada núcleo de vCPU. Na AWS Glue versão 3.0 ou posterior, você pode calcular um número alvo de partições usando a fórmula a seguir.
# Calculate NumPartitions by WorkerType numExecutors = (NumberOfWorkers - 1) numSlotsPerExecutor = 4 if WorkerType is G.1X 8 if WorkerType is G.2X 16 if WorkerType is G.4X 32 if WorkerType is G.8X NumPartitions = numSlotsPerExecutor * numExecutors # Example: Glue 4.0 / G.1X / 10 Workers numExecutors = ( 10 - 1 ) = 9 # 1 Worker reserved on Spark Driver numSlotsPerExecutor = 4 # G.1X has 4 vCpu core ( Glue 3.0 or later ) NumPartitions = 9 * 4 = 36
Neste exemplo, cada trabalhador G.1X fornece quatro núcleos de vCPU para um executor do Spark (). spark.executor.cores = 4
O Spark suporta uma tarefa para cada núcleo de vCPU, então os executores do G.1X Spark podem executar quatro tarefas simultaneamente (). numSlotPerExecutor
Esse número de partições faz uso total do cluster se as tarefas levarem o mesmo tempo. No entanto, algumas tarefas demorarão mais do que outras, criando núcleos ociosos. Se isso acontecer, considere multiplicar numPartitions
por 2 ou 3 para dividir e programar com eficiência as tarefas de gargalo.
Muitas partições
Um número excessivo de partições cria um número excessivo de tarefas. Isso causa uma carga pesada no driver do Spark devido à sobrecarga relacionada ao processamento distribuído, como tarefas de gerenciamento e troca de dados entre os executores do Spark.
Se o número de partições em seu trabalho for substancialmente maior do que o número desejado de partições, considere reduzir o número de partições. Você pode reduzir partições usando as seguintes opções:
-
Se o tamanho do arquivo for muito pequeno, use AWS Glue GroupFiles. Você pode reduzir o paralelismo excessivo resultante do lançamento de uma tarefa do Apache Spark para processar cada arquivo.
-
Use
coalesce(N)
para mesclar partições. Esse é um processo de baixo custo. Ao reduzir o número de partições,coalesce(N)
é preferívelrepartition(N)
, poisrepartition(N)
executa o shuffle para distribuir igualmente a quantidade de registros em cada partição. Isso aumenta os custos e as despesas gerais de gerenciamento. -
Use a execução adaptativa de consultas do Spark 3.x. Conforme discutido na seção Tópicos principais do Apache Spark, a Execução Adaptativa de Consultas fornece uma função para aglutinar automaticamente o número de partições. Você pode usar essa abordagem quando não consegue saber o número de partições até realizar a execução.
Paralelize a carga de dados do JDBC
O número de partições Spark RDD é determinado pela configuração. Observe que, por padrão, somente uma única tarefa é executada para verificar todo o conjunto de dados de origem por meio de uma SELECT
consulta.
Tanto o Spark AWS Glue DynamicFrames quanto o Spark DataFrames oferecem suporte ao carregamento de dados JDBC paralelizado em várias tarefas. Isso é feito usando where
predicados para dividir uma SELECT
consulta em várias consultas. Para paralelizar as leituras do JDBC, configure as seguintes opções:
-
Para AWS Glue DynamicFrame, defina
hashfield
(ouhashexpression)
hashpartition
e. Para saber mais, consulte Lendo tabelas JDBC em paralelo.connection_mysql8_options = { "url": "jdbc:mysql://XXXXXXXXXX.XXXXXXX.us-east-1.rds.amazonaws.com:3306/test", "dbtable": "medicare_tb", "user": "test", "password": "XXXXXXXXX", "hashexpression":"id", "hashpartitions":"10" } datasource0 = glueContext.create_dynamic_frame.from_options( 'mysql', connection_options=connection_mysql8_options, transformation_ctx= "datasource0" )
-
Para Spark DataFrame, set
numPartitions
partitionColumn
,lowerBound
, e.upperBound
Para saber mais, consulte JDBC para outros bancos dedados. df = spark.read \ .format("jdbc") \ .option("url", "jdbc:mysql://XXXXXXXXXX.XXXXXXX.us-east-1.rds.amazonaws.com:3306/test") \ .option("dbtable", "medicare_tb") \ .option("user", "test") \ .option("password", "XXXXXXXXXX") \ .option("partitionColumn", "id") \ .option("numPartitions", "10") \ .option("lowerBound", "0") \ .option("upperBound", "1141455") \ .load() df.write.format("json").save("s3://bucket_name/Tests/sparkjdbc/with_parallel/")
Paralelize a carga de dados do DynamoDB ao usar o conector ETL
O número de partições Spark RDD é determinado pelo parâmetro. dynamodb.splits
Para paralelizar as leituras do Amazon DynamoDB, configure as seguintes opções:
-
Aumente o valor do
dynamodb.splits
. -
Otimize o parâmetro seguindo a fórmula explicada em Tipos e opções de conexão para ETL no AWS Glue Spark.
Paralelize a carga de dados do Kinesis Data Streams
O número de partições RDD do Spark é determinado pelo número de fragmentos no stream de dados de origem do Amazon Kinesis Data Streams. Se você tiver apenas alguns fragmentos em seu fluxo de dados, haverá apenas algumas tarefas do Spark. Isso pode resultar em baixo paralelismo nos processos posteriores. Para paralelizar as leituras do Kinesis Data Streams, configure as seguintes opções:
-
Aumente o número de fragmentos para obter mais paralelismo ao carregar dados do Kinesis Data Streams.
-
Se sua lógica no microlote for complexa o suficiente, considere reparticionar os dados no início do lote, depois de eliminar as colunas desnecessárias.
Para obter mais informações, consulte Melhores práticas para otimizar o custo e o desempenho de trabalhos AWS Glue de ETL de streaming
Paralelize tarefas após o carregamento dos dados
Para paralelizar tarefas após o carregamento dos dados, aumente o número de partições RDD usando as seguintes opções:
-
Reparticione os dados para gerar um número maior de partições, especialmente logo após o carregamento inicial, se a carga em si não puder ser paralelizada.
Ligue
repartition()
DynamicFrame ou DataFrame especifique o número de partições. Uma boa regra é duas ou três vezes o número de núcleos disponíveis.No entanto, ao escrever uma tabela particionada, isso pode levar a uma explosão de arquivos (cada partição pode potencialmente gerar um arquivo em cada partição da tabela). Para evitar isso, você pode reparticionar sua coluna DataFrame por. Isso usa as colunas de partição da tabela para que os dados sejam organizados antes da gravação. Você pode especificar um número maior de partições sem colocar arquivos pequenos nas partições da tabela. No entanto, tenha cuidado para evitar a distorção de dados, na qual alguns valores de partição acabam com a maioria dos dados e atrasam a conclusão da tarefa.
-
Quando houver embaralhos, aumente o
spark.sql.shuffle.partitions
valor. Isso também pode ajudar com qualquer problema de memória ao embaralhar.Quando você tem mais de 2.001 partições aleatórias, o Spark usa um formato de memória compactada. Se você tiver um número próximo a esse, talvez queira definir o
spark.sql.shuffle.partitions
valor acima desse limite para obter uma representação mais eficiente.