Paralelizar tarefas -

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Paralelizar tarefas

Para otimizar a performance, é importante paralelizar tarefas para cargas e transformações de dados. Conforme analisado em Principais tópicos do Apache Spark, o número de partições resilientes do conjunto de dados distribuído (RDD) é importante, pois determina o grau de paralelismo. Cada tarefa que o Spark cria corresponde a uma partição do RDD em uma base 1:1. Para obter a melhor performance, você precisa entender como o número de partições do RDD é determinado e como esse número é otimizado.

Se você não tiver paralelismo suficiente, os seguintes sintomas serão registrados nas CloudWatchmétricas e na interface do usuário do Spark.

CloudWatch métricas

Verifique a carga da CPU e a utilização da memória. Se alguns executores não estiverem processando durante uma fase do seu trabalho, é apropriado melhorar o paralelismo. Nesse caso, durante o período visualizado, o Executor 1 estava executando uma tarefa, mas os demais executores (2, 3 e 4) não. Você pode inferir que esses executores não tiveram tarefas atribuídas pelo driver do Spark.

Grafo mostrando o driver e apenas um executor.

Interface do usuário do Spark

Na guia Etapa na interface do usuário do Spark, você pode ver o número de tarefas em uma etapa. Nesse caso, o Spark executou apenas uma tarefa.

""

Além disso, o cronograma de eventos mostra o Executor 1 processando uma tarefa. Isso significa que o trabalho nessa etapa foi executado por completo em um executor, enquanto os outros estavam ociosos.

Cronograma de eventos mostrando apenas uma tarefa.

Se você observar esses sintomas, tente as soluções a seguir para cada fonte de dados.

Paralelizar a carga de dados do Amazon S3

Para paralelizar cargas de dados do Amazon S3, primeiro verifique o número padrão de partições. Você então pode determinar manualmente um número alvo de partições, mas evite ter muitas partições.

Determinar o número padrão de partições

Para o Amazon S3, o número inicial de partições do Spark RDD (cada uma delas corresponde a uma tarefa do Spark) é determinado pelos recursos do seu conjunto de dados do Amazon S3 (por exemplo, formato, compactação e tamanho). Quando você cria um AWS Glue DynamicFrame ou um Spark a DataFrame partir de objetos CSV armazenados no Amazon S3, o número inicial de partições RDD NumPartitions () pode ser calculado aproximadamente da seguinte forma:

  • Tamanho do objeto <= 64 MB: NumPartitions = Number of Objects

  • Tamanho do objeto > 64 MB: NumPartitions = Total Object Size / 64 MB

  • Não divisível (gzip): NumPartitions = Number of Objects

Conforme analisado na seção Reduzir a quantidade de dados verificados, o Spark divide objetos grandes do S3 em splits que podem ser processados paralelamente. Quando o objeto é maior que o tamanho do split, o Spark divide o objeto e cria uma partição do RDD (e uma tarefa) para cada split. O tamanho do split do Spark é baseado no formato dos dados e no ambiente de runtime, mas esta é uma aproximação inicial razoável. Alguns objetos são compactados usando formatos de compactação não divisíveis, como gzip, para que o Spark não possa dividi-los.

O NumPartitions valor pode variar dependendo do formato dos dados, da compactação, da AWS Glue versão, do número de AWS Glue trabalhadores e da configuração do Spark.

Por exemplo, quando você carrega um único csv.gz objeto de 10 GB usando um Spark DataFrame, o driver do Spark cria somente uma partição RDD (NumPartitions=1) porque o gzip não pode ser dividido. Isso resulta em uma carga pesada em um executor específico do Spark e nenhuma tarefa é atribuída aos demais executores, conforme descrito na figura a seguir.

Verifique o número real de tarefas (NumPartitions) da etapa na guia Etapa da Interface do usuário web do Spark, ou execute df.rdd.getNumPartitions() em seu código para verificar o paralelismo.

Ao encontrar um arquivo gzip de 10 GB, examine se o sistema que está gerando esse arquivo pode gerá-lo em um formato divisível. Se isso não for uma opção, talvez seja necessário escalar a capacidade do cluster para processar o arquivo. Para executar transformações com eficiência nos dados que você carregou, você precisará rebalancear seu RDD nos operadores do seu cluster usando a repartição.

Determinar manualmente um número alvo de partições

Dependendo das propriedades de seus dados e da implementação de determinadas funcionalidades pelo Spark, você pode acabar com um valor de NumPartitions baixo, mesmo que o trabalho subjacente ainda possa ser paralelizado. Se NumPartitions for muito pequeno, execute df.repartition(N) para aumentar o número de partições para que o processamento possa ser distribuído nos vários executores do Spark.

Nesse caso, a execução df.repartition(100) aumentará o NumPartitions de 1 para 100, criando 100 partições de seus dados, cada uma com uma tarefa que pode ser atribuída aos outros executores.

A operação repartition(N) divide os dados inteiros igualmente (10 GB/100 partições = 100 MB/partição), evitando a distorção de dados em determinadas partições.

nota

Quando uma operação de shuffle, como join, é executada, o número de partições aumenta ou diminui dinamicamente, dependendo do valor de spark.sql.shuffle.partitions ou spark.default.parallelism. Isso facilita uma troca mais eficiente de dados entre os executores do Spark. Para obter mais informações, consulte a documentação do Spark.

Sua meta ao determinar o número alvo de partições é maximizar o uso dos trabalhadores provisionados AWS Glue . O número de AWS Glue trabalhadores e o número de tarefas do Spark estão relacionados por meio do número de v. CPUs O Spark oferece suporte a uma tarefa para cada núcleo de vCPU. Na AWS Glue versão 3.0 ou posterior, você pode calcular um número alvo de partições usando a fórmula a seguir.

# Calculate NumPartitions by WorkerType numExecutors = (NumberOfWorkers - 1) numSlotsPerExecutor = 4 if WorkerType is G.1X 8 if WorkerType is G.2X 16 if WorkerType is G.4X 32 if WorkerType is G.8X NumPartitions = numSlotsPerExecutor * numExecutors # Example: Glue 4.0 / G.1X / 10 Workers numExecutors = ( 10 - 1 ) = 9 # 1 Worker reserved on Spark Driver numSlotsPerExecutor = 4 # G.1X has 4 vCpu core ( Glue 3.0 or later ) NumPartitions = 9 * 4 = 36

Neste exemplo, cada operador G.1X fornece quatro núcleos de vCPU para um executor do Spark (spark.executor.cores = 4). O Spark oferece suporte a uma tarefa para cada núcleo de vCPU, então os executores G.1X do Spark podem executar quatro tarefas simultaneamente (numSlotPerExecutor). Esse número de partições fará uso total do cluster se as tarefas levarem o mesmo tempo. No entanto, algumas tarefas demorarão mais do que outras, criando núcleos ociosos. Se isso acontecer, considere multiplicar numPartitions por 2 ou 3 para dividir e programar com eficiência as tarefas de gargalo.

Muitas partições

Um número excessivo de partições cria um número excessivo de tarefas. Isso causa uma carga pesada no driver do Spark devido à sobrecarga relacionada ao processamento distribuído, como tarefas de gerenciamento e troca de dados entre os executores do Spark.

Se o número de partições em seu trabalho for substancialmente maior do que o número alvo de partições, considere reduzir o número de partições. É possível reduzir partições usando as seguintes opções:

  • Se o tamanho do arquivo for muito pequeno, use AWS Glue GroupFiles. Você pode reduzir o paralelismo excessivo resultante do lançamento de uma tarefa do Apache Spark para processar cada arquivo.

  • Use coalesce(N) para mesclar as partições. Este é um processo de baixo custo. Ao reduzir o número de partições, coalesce(N) é preferível em vez de repartition(N), pois repartition(N) executa o shuffle para distribuir igualmente a quantidade de registros em cada partição. Isso aumenta os custos e a sobrecarga de gerenciamento.

  • Use a Execução Adaptativa de Consultas do Spark 3.x. Conforme analisado na seção Principais tópicos do Apache Spark, a Execução Adaptativa de Consultas fornece uma função para aglutinar automaticamente o número de partições. Você pode usar essa abordagem quando não consegue saber o número de partições até realizar a execução.

Paralelizar a carga de dados do JDBC

O número de partições do Spark RDD é determinado pela configuração. Observe que, por padrão, somente uma única tarefa é executada para verificar todo o conjunto de dados de origem por meio de uma consulta SELECT.

Tanto o Spark AWS Glue DynamicFrames quanto o Spark DataFrames oferecem suporte ao carregamento de dados JDBC paralelizado em várias tarefas. Isso é feito usando predicados where para dividir uma consulta SELECT em várias consultas. Para paralelizar as leituras do JDBC, configure as seguintes opções:

  • Para AWS Glue DynamicFrame, defina hashfield (ou hashexpression) hashpartition e. Para saber mais, consulte Leitura de tabelas JDBC em paralelo.

    connection_mysql8_options = { "url": "jdbc:mysql://XXXXXXXXXX.XXXXXXX.us-east-1.rds.amazonaws.com:3306/test", "dbtable": "medicare_tb", "user": "test", "password": "XXXXXXXXX", "hashexpression":"id", "hashpartitions":"10" } datasource0 = glueContext.create_dynamic_frame.from_options( 'mysql', connection_options=connection_mysql8_options, transformation_ctx= "datasource0" )
  • Para Spark DataFrame, set numPartitionspartitionColumn,lowerBound, e. upperBound Para saber mais, consulte JDBC To Other Databases.

    df = spark.read \ .format("jdbc") \ .option("url", "jdbc:mysql://XXXXXXXXXX.XXXXXXX.us-east-1.rds.amazonaws.com:3306/test") \ .option("dbtable", "medicare_tb") \ .option("user", "test") \ .option("password", "XXXXXXXXXX") \ .option("partitionColumn", "id") \ .option("numPartitions", "10") \ .option("lowerBound", "0") \ .option("upperBound", "1141455") \ .load() df.write.format("json").save("s3://bucket_name/Tests/sparkjdbc/with_parallel/")

Paralelizar a carga de dados do DynamoDB ao usar o conector ETL

O número de partições do Spark RDD é determinado pelo parâmetro dynamodb.splits. Para paralelizar as leituras do Amazon DynamoDB, configure as seguintes opções:

Paralelizar a carga de dados do Kinesis Data Streams

O número de partições do Spark RDD é determinado pelo número de fragmentos no fluxo de dados de origem do Amazon Kinesis Data Streams. Se você tiver apenas alguns fragmentos em seu fluxo de dados, haverá apenas algumas tarefas do Spark. Isso pode resultar em baixo paralelismo nos processos downstream. Para paralelizar as leituras do Kinesis Data Streams, configure as seguintes opções:

  • Aumente o número de fragmentos para obter mais paralelismo ao carregar dados do Kinesis Data Streams.

  • Se sua lógica no microlote for complexa o suficiente, considere reparticionar os dados no início do lote, depois de remover as colunas desnecessárias.

Para obter mais informações, consulte Melhores práticas para otimizar o custo e o desempenho de trabalhos AWS Glue de ETL de streaming.

Paralelizar tarefas após o carregamento dos dados

Para paralelizar tarefas após o carregamento dos dados, aumente o número de partições do RDD usando as seguintes opções:

  • Reparticione os dados para gerar um número maior de partições, especialmente logo após o carregamento inicial, se a carga em si não puder ser paralelizada.

    Ligue repartition() DynamicFrame ou DataFrame especifique o número de partições. Uma boa regra é duas ou três vezes o número de núcleos disponíveis.

    No entanto, ao gravar uma tabela particionada, isso pode levar a uma explosão de arquivos (cada partição pode potencialmente gerar um arquivo em cada partição da tabela). Para evitar isso, você pode reparticionar sua coluna DataFrame por. Isso usa as colunas de partição da tabela para que os dados sejam organizados antes da gravação. Você pode especificar um número maior de partições sem colocar arquivos pequenos nas partições da tabela. No entanto, tenha cuidado para evitar a distorção de dados, em que alguns valores de partição acabam com a maioria dos dados e atrasam a conclusão da tarefa.

  • Quando houver shuffles, aumente o valor de spark.sql.shuffle.partitions. Isso também pode ajudar com qualquer problema de memória ao realizar a operação de shuffle.

    Quando você tem mais de 2.001 partições de shuffle, o Spark usa um formato de memória compactada. Se você tiver um número próximo a esse, talvez queira definir o valor de spark.sql.shuffle.partitions acima desse limite para obter uma representação mais eficiente.