Otimizar as operações de shuffle -

Otimizar as operações de shuffle

Certas operações, como join() egroupByKey(), exigem que o Spark execute uma operação de shuffle. O shuffle é o mecanismo do Spark para redistribuir dados para que sejam agrupados de forma diferente nas partições do RDD. Realizar a operação de shuffle pode ajudar a remediar gargalos de performance. No entanto, como o a operação de shuffle geralmente envolve a cópia de dados entre os executores do Spark, o shuffle é uma operação complexa e cara. Por exemplo, os shuffles geram os seguintes custos:

  • E/S de disco:

    • Gera um grande número de arquivos intermediários no disco.

  • E/S de rede:

    • Precisa de muitas conexões de rede (número de conexões = Mapper × Reducer).

    • Como os registros são agregados a novas partições do RDD que podem estar hospedadas em um executor diferente do Spark, uma fração substancial do seu conjunto de dados pode se movimentar entre os executores do Spark pela rede.

  • Carga de CPU e memória:

    • Classifica valores e mescla conjuntos de dados. Essas operações são planejadas no executor, sobrecarregando o executor.

O shuffle é um dos fatores mais importantes na degradação da performance da sua aplicação do Spark. Ao armazenar os dados intermediários, ele pode esgotar espaço no disco local do executor, o que faz com que a tarefa do Spark falhe.

Você pode avaliar a performance do shuffle nas métricas do CloudWatch e na interface do usuário do Spark.

métricas do CloudWatch

Se o valor de Bytes gravados de shuffle for alto em comparação com o valor de Bytes lidos de shuffle, seu trabalho do Spark poderá usar operações de shuffle, como ou join() ou groupByKey().

Grafo de Shuffle de Dados em Executores (Bytes) mostrando um aumento nos bytes gravados de shuffle.

IU do Spark

Na guia Etapa da interface do usuário do Spark, você pode verificar os valores de Registros/Tamanho de gravações de shuffle. Você também pode verificá-los na guia Executores.

Na captura de tela a seguir, cada executor troca aproximadamente 18,6 GB/4.020.000 registros com o processo shuffle, para um tamanho total de leitura de shuffle de cerca de 75 GB.

A coluna Vazamento de shuffle (Disco) mostra um grande volume de derramamento de dados da memória para o disco, o que pode causar um disco cheio ou um problema de performance.

""

Se você observar esses sintomas e a etapa demorar muito em comparação com suas metas de performance, ou se ela falhar com os erros Out Of Memory ou No space left on device, considere as soluções a seguir.

Otimizar a junção

A operação join(), que faz a junção de tabelas, é a operação de shuffle mais usada, mas geralmente é um gargalo de performance. Como a junção é uma operação onerosa, recomendamos não usá-la, a menos que seja essencial para seus requisitos de negócios. Certifique-se de que você está utilizando o seu pipeline de dados de forma eficiente, questionando-se sobre o seguinte:

  • Você está recalculando uma junção que também é executada em outros trabalhos que você pode reutilizar?

  • Você está fazendo uma junção para resolver chaves estrangeiras para valores que não são usados pelos consumidores do seu resultado?

Depois de confirmar que suas operações de junção são essenciais para seus requisitos de negócios, consulte as opções a seguir para otimizar sua junção de forma que atenda às suas necessidades.

Usar o pushdown antes realizar uma junção

Filtre linhas e colunas desnecessárias no DataFrame antes de realizar uma junção. Essa abordagem tem as seguintes vantagens:

  • Reduz o volume de transferência de dados durante o shuffle

  • Reduz o volume de processamento no executor do Spark

  • Reduz o volume de verificação de dados

# Default df_joined = df1.join(df2, ["product_id"]) # Use Pushdown df1_select = df1.select("product_id","product_title","star_rating").filter(col("star_rating")>=4.0) df2_select = df2.select("product_id","category_id") df_joined = df1_select.join(df2_select, ["product_id"])

Usar junção de DataFrame

Tente usar uma API de alto nível do Spark, como SparkSQL, DataFrame e Datasets, em vez da API do RDD ou da junção de DynamicFrame. Você pode converter o DynamicFrame em DataFrame com uma chamada de método como dyf.toDF(). Conforme analisado na seção Principais tópicos do Apache Spark, essas operações de junção aproveitam internamente a otimização de consultas pelo otimizador Catalyst.

Dicas e junções shuffle e broadcast hash

O Spark é compatível com dois tipos de junção: junção shuffle e junção broadcast hash. Uma junção broadcast hash não requer operação de shuffle e pode exigir menos processamento do que uma junção shuffle. No entanto, é aplicável somente ao realizar a junção de uma tabela pequena com uma grande. Ao realizar a junção de uma tabela que cabe na memória de um único executor do Spark, considere usar uma junção broadcast hash.

O diagrama a seguir mostra a estrutura de alto nível e as etapas de uma junção broadcast hash e uma junção shuffle.

Junção broadcast com conexões diretas entre as tabelas e a tabela resultante da junção, e junção shuffle com duas fases de shuffle entre as tabelas e a tabela resultante.

Os detalhes de cada junção são os seguintes:

  • Junção shuffle:

    • A junção shuffle hash combina duas tabelas sem classificação e distribui a junção entre as duas tabelas. É adequado para junções de pequenas tabelas que podem ser armazenadas na memória do executor do Spark.

    • A junção sort-merge distribui as duas tabelas a serem combinadas por chave e as classifica antes juntá-las. É adequado para junções de tabelas grandes.

  • Junção broadcast hash:

    • Uma junção broadcast hash envia o RDD ou a tabela menor para cada um dos nós de processamento. Em seguida, ele realiza uma combinação do lado do map com cada partição do RDD ou tabela de maior volume.

      É adequado para junções quando um de seus RDDs ou tabelas pode caber na memória ou pode ser ajustado para caber na memória. É vantajoso fazer uma junção broadcast hash sempre que possível, porque não requer um shuffle. Você pode usar uma dica de junção para solicitar uma junção broadcast do Spark, como a seguir.

      # DataFrame from pySpark.sql.functions import broadcast df_joined= df_big.join(broadcast(df_small), right_df[key] == left_df[key], how='inner') -- SparkSQL SELECT /*+ BROADCAST(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key;

      Para obter mais informações sobre dicas de junção, consulte Join hints.

No AWS Glue versão 3.0 e posterior, você pode aproveitar automaticamente as junções broadcast hash habilitando a Execução Adaptativa de Consultas e parâmetros adicionais. A Execução Adaptativa de Consultas converte uma junção sort-merge em uma junção broadcast hash quando as estatísticas de runtime de um dos lados da junção são menores do que o limite de junção broadcast hash adaptativa.

No AWS Glue versão 3.0, você pode habilitar a Execução Adaptativa de Consultas definindo spark.sql.adaptive.enabled=true. A Execução Adaptativa de Consultas está habilitada por padrão no AWS Glue 4.0.

Você pode definir parâmetros adicionais relacionados aos shuffles e às junções broadcast hash:

  • spark.sql.adaptive.localShuffleReader.enabled

  • spark.sql.adaptive.autoBroadcastJoinThreshold

Para obter mais informações sobre parâmetros relacionados, consulte Converting sort-merge join to broadcast join.

No AWS Glue versão 3.0 ou posterior, você pode usar outras dicas de junção para shuffle para ajustar seu comportamento.

-- Join Hints for shuffle sort merge join SELECT /*+ SHUFFLE_MERGE(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; SELECT /*+ MERGEJOIN(t2) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; SELECT /*+ MERGE(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; -- Join Hints for shuffle hash join SELECT /*+ SHUFFLE_HASH(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; -- Join Hints for shuffle-and-replicate nested loop join SELECT /*+ SHUFFLE_REPLICATE_NL(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key;

Usar bucketing

A junção sort-merge requer duas fases: shuffle e classificar, e depois mesclar. Essas duas fases podem sobrecarregar o executor do Spark e causar problemas de OOM e performance quando alguns dos executores estão sendo mesclados e outros estão sendo classificados simultaneamente. Nesses casos, talvez seja possível juntar com eficiência usando bucketing. O bucketing realizará o pré-shuffle e pré-classificará sua entrada nas chaves de junção. Em seguida, gravará esses dados classificados em uma tabela intermediária. O custo das etapas de shuffle e classificação pode ser reduzido ao juntar tabelas grandes definindo as tabelas intermediárias classificadas com antecedência.

A junção sort-merge tem as etapas adicionais de shuffle e classificar.

As tabelas em buckets são úteis para o seguinte:

  • Dados combinados com frequência pela mesma chave, como account_id

  • Carregar tabelas cumulativas diárias, como tabelas base e delta, que podem ser agrupadas em buckets em uma coluna comum

Você pode criar uma tabela em buckets usando o código a seguir.

df.write.bucketBy(50, "account_id").sortBy("age").saveAsTable("bucketed_table")

Reparticionar DataFrames nas chaves de junção antes da junção

Para reparticionar os dois DataFrames nas chaves de junção antes da junção, use as instruções a seguir.

df1_repartitioned = df1.repartition(N,"join_key") df2_repartitioned = df2.repartition(N,"join_key") df_joined = df1_repartitioned.join(df2_repartitioned,"product_id")

Isso particionará dois RDDs (ainda separados) na chave de junção antes de iniciar a junção. Se os dois RDDs forem particionados na mesma chave com o mesmo código de particionamento, o RDD registrará que seu plano de juntar terá uma grande probabilidade de coexistir no mesmo operador antes de realizar o shuffle para a junção. Isso pode melhorar a performance ao reduzir a atividade da rede e a distorção de dados durante a junção.

Superar distorções de dados

A distorção de dados é uma das causas mais comuns de um gargalo nos trabalhos do Spark. Ela ocorre quando os dados não são distribuídos uniformemente nas partições do RDD. Isso faz com que as tarefas dessa partição demorem muito mais do que outras, atrasando o tempo geral de processamento da aplicação.

Para identificar a distorção de dados, avalie as seguintes métricas na interface do usuário do Spark:

  • Na guia Etapa da interface do usuário do Spark, examine a página Cronograma de eventos. Você pode ver uma distribuição desigual de tarefas na captura de tela a seguir. Tarefas distribuídas de forma desigual ou demorando muito para serem executadas podem indicar distorção de dados.

    O tempo de computação do executor é muito maior para uma tarefa do que para as outras.
  • Outra página importante é o Resumo de métricas, que mostra as estatísticas das tarefas do Spark. A captura de tela a seguir mostra métricas com percentis para Duração, Tempo de GC, Derramamento (memória), Derramamento (disco) e assim por diante.

    Tabela do resumo de métricas com a linha Duração destacada.

    Quando as tarefas forem distribuídas uniformemente, você verá números semelhantes em todos os percentis. Quando houver distorção de dados, você verá valores muito enviesados em cada percentil. No exemplo, a duração da tarefa é inferior a 13 segundos em Mín., 25.º percentil, Mediana e 75.º percentil. Embora a tarefa Máx. tenha processado 100 vezes mais dados do que o 75.º percentil, sua duração de 6,4 minutos é cerca de 30 vezes maior. Isso significa que pelo menos uma tarefa (ou até 25% das tarefas) demorou muito mais do que o resto das tarefas.

Se você vir distorções de dados, tente o seguinte:

  • No AWS Glue versão 3.0, habilite a Execução Adaptativa de Consultas definindo spark.sql.adaptive.enabled=true. A Execução Adaptativa de Consultas está habilitada por padrão no AWS Glue 4.0.

    Você também pode usar a Execução Adaptativa de Consultas para a distorção de dados introduzida pelas junções, definindo os seguintes parâmetros relacionados:

    • spark.sql.adaptive.skewJoin.skewedPartitionFactor

    • spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

    • spark.sql.adaptive.advisoryPartitionSizeInBytes=128m (128 mebibytes or larger should be good)

    • spark.sql.adaptive.coalescePartitions.enabled=true (when you want to coalesce partitions)

    Para obter mais informações, consulte a documentação do Apache Spark.

  • Use chaves com uma vasta gama de valores para as chaves de junção. Em uma junção shuffle, as partições são determinadas para cada valor de hash de uma chave. Se a cardinalidade de uma chave de junção for muito baixa, é mais provável que a função hash faça um trabalho ruim ao distribuir seus dados nas partições. Portanto, se sua aplicação e sua lógica de negócios forem compatíveis, considere usar uma chave de cardinalidade mais alta ou uma chave composta.

    # Use Single Primary Key df_joined = df1_select.join(df2_select, ["primary_key"]) # Use Composite Key df_joined = df1_select.join(df2_select, ["primary_key","secondary_key"])

Usar cache

Ao usar DataFrames repetitivos, evite shuffle ou computação adicional usando df.cache() ou df.persist() para armazenar em cache os resultados do cálculo na memória e no disco de cada executor do Spark. O Spark também é compatível com RDDs persistentes em disco ou replicação em vários nós (nível de armazenamento).

Por exemplo, você pode persistir os DataFrames adicionando df.persist(). Quando o cache não for mais necessário, você poderá usar unpersist para descartar os dados em cache.

df = spark.read.parquet("s3://<Bucket>/parquet/product_category=Books/") df_high_rate = df.filter(col("star_rating")>=4.0) df_high_rate.persist() df_joined1 = df_high_rate.join(<Table1>, ["key"]) df_joined2 = df_high_rate.join(<Table2>, ["key"]) df_joined3 = df_high_rate.join(<Table3>, ["key"]) ... df_high_rate.unpersist()

Remover ações desnecessárias do Spark

Evite executar ações desnecessárias, como count, show ou collect. Conforme analisado na seção Principais tópicos do Apache Spark, o Spark tem carregamento lento. Cada RDD transformado pode ser recalculado toda vez que você executa uma ação nele. Quando você usa várias ações do Spark, vários acessos à origem, cálculos de tarefas e operações de shuffle são executados para cada ação sendo chamada.

Se você não precisar de collect() ou de outras ações em seu ambiente comercial, considere removê-las.

nota

Evitar usar a ação collect() do Spark em ambientes comerciais o máximo possível. A ação collect() retorna todos os resultados de um cálculo no executor do Spark para o driver do Spark, o que pode fazer com que este retorne um erro de OOM. Para evitar um erro de OOM, o Spark define spark.driver.maxResultSize = 1GB por padrão, o que limita o tamanho máximo dos dados retornados ao driver do Spark a 1 GB.