Paralelize tarefas -

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Paralelize tarefas

Para otimizar o desempenho, é importante paralelizar tarefas para cargas e transformações de dados. Conforme discutimos em Tópicos principais do Apache Spark, o número de partições resilientes de conjunto de dados distribuído (RDD) é importante porque determina o grau de paralelismo. Cada tarefa que o Spark cria corresponde a uma partição RDD em uma base 1:1. Para obter o melhor desempenho, você precisa entender como o número de partições RDD é determinado e como esse número é otimizado.

Se você não tiver paralelismo suficiente, os seguintes sintomas serão registrados nas CloudWatchmétricas e na interface do usuário do Spark.

CloudWatch métricas

Verifique a carga da CPU e a utilização da memória. Se alguns executores não estiverem processando durante uma fase do seu trabalho, é apropriado melhorar o paralelismo. Nesse caso, durante o período visualizado, o Executor 1 estava executando uma tarefa, mas os demais executores (2, 3 e 4) não. Você pode inferir que esses executores não receberam tarefas atribuídas pelo driver do Spark.

Gráfico mostrando o driver e apenas um executor.

IU do Spark

Na guia Estágio na interface do Spark, você pode ver o número de tarefas em um estágio. Nesse caso, o Spark executou apenas uma tarefa.

""

Além disso, a linha do tempo do evento mostra o Executor 1 processando uma tarefa. Isso significa que o trabalho nesse estágio foi executado inteiramente em um executor, enquanto os outros estavam ociosos.

Cronograma do evento mostrando apenas uma tarefa.

Se você observar esses sintomas, tente as soluções a seguir para cada fonte de dados.

Paralelize a carga de dados do Amazon S3

Para paralelizar as cargas de dados do Amazon S3, primeiro verifique o número padrão de partições. Em seguida, você pode determinar manualmente um número alvo de partições, mas evite ter muitas partições.

Determine o número padrão de partições

Para o Amazon S3, o número inicial de partições RDD do Spark (cada uma delas corresponde a uma tarefa do Spark) é determinado pelos recursos do seu conjunto de dados do Amazon S3 (por exemplo, formato, compactação e tamanho). Quando você cria um AWS Glue DynamicFrame ou um Spark a DataFrame partir de objetos CSV armazenados no Amazon S3, o número inicial de partições RDD NumPartitions () pode ser calculado aproximadamente da seguinte forma:

  • Tamanho do objeto <= 64 MB: NumPartitions = Number of Objects

  • Tamanho do objeto > 64 MB: NumPartitions = Total Object Size / 64 MB

  • Não divisível (gzip): NumPartitions = Number of Objects

Conforme discutido na seção Reduzir a quantidade de escaneamento de dados, o Spark divide objetos grandes do S3 em divisões que podem ser processadas paralelamente. Quando o objeto é maior que o tamanho da divisão, o Spark divide o objeto e cria uma partição RDD (e uma tarefa) para cada divisão. O tamanho da divisão do Spark é baseado no formato dos dados e no ambiente de execução, mas essa é uma aproximação inicial razoável. Alguns objetos são compactados usando formatos de compactação não divisíveis, como gzip, então o Spark não pode dividi-los.

O NumPartitions valor pode variar dependendo do formato dos dados, da compactação, da AWS Glue versão, do número de AWS Glue trabalhadores e da configuração do Spark.

Por exemplo, quando você carrega um único csv.gz objeto de 10 GB usando um Spark DataFrame, o driver do Spark cria somente uma partição RDD (NumPartitions=1) porque o gzip não pode ser dividido. Isso resulta em uma carga pesada em um executor específico do Spark e nenhuma tarefa é atribuída aos demais executores, conforme descrito na figura a seguir.

Verifique o número real de tarefas (NumPartitions) do estágio na guia Spark Web UI Stage ou execute df.rdd.getNumPartitions() seu código para verificar o paralelismo.

Ao encontrar um arquivo gzip de 10 GB, examine se o sistema que está gerando esse arquivo pode gerá-lo em um formato divisível. Se isso não for uma opção, talvez seja necessário escalar a capacidade do cluster para processar o arquivo. Para executar transformações com eficiência nos dados que você carregou, você precisará reequilibrar seu RDD entre os trabalhadores do seu cluster usando a repartição.

Determine manualmente um número alvo de partições

Dependendo das propriedades de seus dados e da implementação de determinadas funcionalidades pelo Spark, você pode acabar com um NumPartitions valor baixo, mesmo que o trabalho subjacente ainda possa ser paralelizado. Se NumPartitions for muito pequeno, execute df.repartition(N) para aumentar o número de partições para que o processamento possa ser distribuído entre vários executores do Spark.

Nesse caso, a execução df.repartition(100) aumentará NumPartitions de 1 para 100, criando 100 partições de seus dados, cada uma com uma tarefa que pode ser atribuída aos outros executores.

A operação repartition(N) divide os dados inteiros igualmente (10 GB/100 partições = 100 MB/partição), evitando a distorção de dados em determinadas partições.

nota

Quando uma operação aleatória como a join é executada, o número de partições aumenta ou diminui dinamicamente, dependendo do valor de ou. spark.sql.shuffle.partitions spark.default.parallelism Isso facilita uma troca mais eficiente de dados entre os executores do Spark. Para obter mais informações, consulte a documentação do Spark.

Sua meta ao determinar o número alvo de partições é maximizar o uso dos trabalhadores provisionados AWS Glue . O número de AWS Glue trabalhadores e o número de tarefas do Spark estão relacionados por meio do número de v. CPUs O Spark oferece suporte a uma tarefa para cada núcleo de vCPU. Na AWS Glue versão 3.0 ou posterior, você pode calcular um número alvo de partições usando a fórmula a seguir.

# Calculate NumPartitions by WorkerType numExecutors = (NumberOfWorkers - 1) numSlotsPerExecutor = 4 if WorkerType is G.1X 8 if WorkerType is G.2X 16 if WorkerType is G.4X 32 if WorkerType is G.8X NumPartitions = numSlotsPerExecutor * numExecutors # Example: Glue 4.0 / G.1X / 10 Workers numExecutors = ( 10 - 1 ) = 9 # 1 Worker reserved on Spark Driver numSlotsPerExecutor = 4 # G.1X has 4 vCpu core ( Glue 3.0 or later ) NumPartitions = 9 * 4 = 36

Neste exemplo, cada trabalhador G.1X fornece quatro núcleos de vCPU para um executor do Spark (). spark.executor.cores = 4 O Spark suporta uma tarefa para cada núcleo de vCPU, então os executores do G.1X Spark podem executar quatro tarefas simultaneamente (). numSlotPerExecutor Esse número de partições faz uso total do cluster se as tarefas levarem o mesmo tempo. No entanto, algumas tarefas demorarão mais do que outras, criando núcleos ociosos. Se isso acontecer, considere multiplicar numPartitions por 2 ou 3 para dividir e programar com eficiência as tarefas de gargalo.

Muitas partições

Um número excessivo de partições cria um número excessivo de tarefas. Isso causa uma carga pesada no driver do Spark devido à sobrecarga relacionada ao processamento distribuído, como tarefas de gerenciamento e troca de dados entre os executores do Spark.

Se o número de partições em seu trabalho for substancialmente maior do que o número desejado de partições, considere reduzir o número de partições. Você pode reduzir partições usando as seguintes opções:

  • Se o tamanho do arquivo for muito pequeno, use AWS Glue GroupFiles. Você pode reduzir o paralelismo excessivo resultante do lançamento de uma tarefa do Apache Spark para processar cada arquivo.

  • Use coalesce(N) para mesclar partições. Esse é um processo de baixo custo. Ao reduzir o número de partições, coalesce(N) é preferívelrepartition(N), pois repartition(N) executa o shuffle para distribuir igualmente a quantidade de registros em cada partição. Isso aumenta os custos e as despesas gerais de gerenciamento.

  • Use a execução adaptativa de consultas do Spark 3.x. Conforme discutido na seção Tópicos principais do Apache Spark, a Execução Adaptativa de Consultas fornece uma função para aglutinar automaticamente o número de partições. Você pode usar essa abordagem quando não consegue saber o número de partições até realizar a execução.

Paralelize a carga de dados do JDBC

O número de partições Spark RDD é determinado pela configuração. Observe que, por padrão, somente uma única tarefa é executada para verificar todo o conjunto de dados de origem por meio de uma SELECT consulta.

Tanto o Spark AWS Glue DynamicFrames quanto o Spark DataFrames oferecem suporte ao carregamento de dados JDBC paralelizado em várias tarefas. Isso é feito usando where predicados para dividir uma SELECT consulta em várias consultas. Para paralelizar as leituras do JDBC, configure as seguintes opções:

  • Para AWS Glue DynamicFrame, defina hashfield (ou hashexpression) hashpartition e. Para saber mais, consulte Lendo tabelas JDBC em paralelo.

    connection_mysql8_options = { "url": "jdbc:mysql://XXXXXXXXXX.XXXXXXX.us-east-1.rds.amazonaws.com:3306/test", "dbtable": "medicare_tb", "user": "test", "password": "XXXXXXXXX", "hashexpression":"id", "hashpartitions":"10" } datasource0 = glueContext.create_dynamic_frame.from_options( 'mysql', connection_options=connection_mysql8_options, transformation_ctx= "datasource0" )
  • Para Spark DataFrame, set numPartitionspartitionColumn,lowerBound, e. upperBound Para saber mais, consulte JDBC para outros bancos de dados.

    df = spark.read \ .format("jdbc") \ .option("url", "jdbc:mysql://XXXXXXXXXX.XXXXXXX.us-east-1.rds.amazonaws.com:3306/test") \ .option("dbtable", "medicare_tb") \ .option("user", "test") \ .option("password", "XXXXXXXXXX") \ .option("partitionColumn", "id") \ .option("numPartitions", "10") \ .option("lowerBound", "0") \ .option("upperBound", "1141455") \ .load() df.write.format("json").save("s3://bucket_name/Tests/sparkjdbc/with_parallel/")

Paralelize a carga de dados do DynamoDB ao usar o conector ETL

O número de partições Spark RDD é determinado pelo parâmetro. dynamodb.splits Para paralelizar as leituras do Amazon DynamoDB, configure as seguintes opções:

Paralelize a carga de dados do Kinesis Data Streams

O número de partições RDD do Spark é determinado pelo número de fragmentos no stream de dados de origem do Amazon Kinesis Data Streams. Se você tiver apenas alguns fragmentos em seu fluxo de dados, haverá apenas algumas tarefas do Spark. Isso pode resultar em baixo paralelismo nos processos posteriores. Para paralelizar as leituras do Kinesis Data Streams, configure as seguintes opções:

  • Aumente o número de fragmentos para obter mais paralelismo ao carregar dados do Kinesis Data Streams.

  • Se sua lógica no microlote for complexa o suficiente, considere reparticionar os dados no início do lote, depois de eliminar as colunas desnecessárias.

Para obter mais informações, consulte Melhores práticas para otimizar o custo e o desempenho de trabalhos AWS Glue de ETL de streaming.

Paralelize tarefas após o carregamento dos dados

Para paralelizar tarefas após o carregamento dos dados, aumente o número de partições RDD usando as seguintes opções:

  • Reparticione os dados para gerar um número maior de partições, especialmente logo após o carregamento inicial, se a carga em si não puder ser paralelizada.

    Ligue repartition() DynamicFrame ou DataFrame especifique o número de partições. Uma boa regra é duas ou três vezes o número de núcleos disponíveis.

    No entanto, ao escrever uma tabela particionada, isso pode levar a uma explosão de arquivos (cada partição pode potencialmente gerar um arquivo em cada partição da tabela). Para evitar isso, você pode reparticionar sua coluna DataFrame por. Isso usa as colunas de partição da tabela para que os dados sejam organizados antes da gravação. Você pode especificar um número maior de partições sem colocar arquivos pequenos nas partições da tabela. No entanto, tenha cuidado para evitar a distorção de dados, na qual alguns valores de partição acabam com a maioria dos dados e atrasam a conclusão da tarefa.

  • Quando houver embaralhos, aumente o spark.sql.shuffle.partitions valor. Isso também pode ajudar com qualquer problema de memória ao embaralhar.

    Quando você tem mais de 2.001 partições aleatórias, o Spark usa um formato de memória compactada. Se você tiver um número próximo a esse, talvez queira definir o spark.sql.shuffle.partitions valor acima desse limite para obter uma representação mais eficiente.