Reducción de la cantidad de análisis de datos -

Reducción de la cantidad de análisis de datos

Para empezar, considere la posibilidad de cargar solo los datos que necesite. Para mejorar el rendimiento, puede simplemente reducir la cantidad de datos que se cargan en su clúster de Spark para cada origen de datos. Para evaluar si este enfoque es apropiado, use las siguientes métricas.

Puede comprobar los bytes leídos de Amazon S3 en las métricas de CloudWatch y obtener más detalles en la IU de Spark, tal y como se describe en la sección IU de Spark.

Métricas de CloudWatch

Puede ver el tamaño de lectura aproximado de Amazon S3 en Movimiento de datos de ETL (bytes). Esta métrica muestra el número de bytes que leen de Amazon S3 todos los ejecutores desde el informe anterior. Puede utilizarla para supervisar el movimiento de datos de ETL desde Amazon S3 y comparar las tasas de lectura con las tasas de ingesta de orígenes de datos externos.

Pestaña Métricas de gráfico de movimiento de datos de ETL (bytes) mostrando los bytes de S3 escritos y los bytes de S3 leídos.

Si observa un punto de datos de Lectura de bytes de S3 mayor de lo esperado, considere las siguientes soluciones.

Interfaz de usuario de Spark

En la pestaña Etapa de la IU de AWS Glue para Spark, puede ver el tamaño de Entrada y Salida. En el siguiente ejemplo, la etapa 2 lee 47,4 GiB de entrada y 47,7 GiB de salida, mientras que la etapa 5 lee 61,2 MiB de entrada y 56,6 MiB de salida.

""

Cuando utiliza los enfoques de Spark SQL o DataFrame en su trabajo de AWS Glue, en la pestaña SQL / DataFrame se muestran más estadísticas sobre estas etapas. En este caso, la etapa 2 muestra número de archivos leídos: 430, tamaño de los archivos leídos: 47,4 GiB y número de filas de salida: 160 796 570.

""

Si observa que hay una diferencia sustancial de tamaño entre los datos que lee y los datos que utiliza, pruebe las siguientes soluciones.

Amazon S3

Para reducir la cantidad de datos que se cargan en su trabajo al leer desde Amazon S3, tenga en cuenta el tamaño, la compresión, el formato y el diseño del archivo (particiones) del conjunto de datos. Los trabajos de AWS Glue para Spark se suelen utilizar para la ETL de datos sin procesar, pero para un procesamiento distribuido eficiente, es necesario inspeccionar las características del formato del origen de datos.

  • Tamaño del archivo: recomendamos mantener el tamaño de los archivos de entrada y salida dentro de un rango moderado (por ejemplo, 128 MB). Los archivos demasiado pequeños y demasiado grandes pueden causar problemas.

    Un gran número de archivos pequeños provoca los siguientes problemas:

    • Una carga elevada de E/S de la red en Amazon S3 debido a la sobrecarga necesaria para hacer solicitudes (por ejemplo, List, Get o Head) para muchos objetos (en comparación con unos pocos objetos que almacenan la misma cantidad de datos).

    • Una carga elevada de procesamiento y E/S en el controlador de Spark, lo que generará muchas particiones y tareas y provocará un paralelismo excesivo.

    Por otro lado, si el tipo de archivo no se puede dividir (por ejemplo, gzip) y los archivos son demasiado grandes, la aplicación de Spark debe esperar a que una sola tarea termine de leer todo el archivo.

    Para reducir el paralelismo excesivo que se produce cuando se crea una tarea de Apache Spark para cada archivo pequeño, utilice la agrupación de archivos para DynamicFrames. Este enfoque reduce las probabilidades de que se produzca una excepción de OOM por parte del controlador de Spark. Para configurar la agrupación de archivos, defina los parámetros groupFiles y groupSize. En el siguiente ejemplo de código se usa la API DynamicFrame de AWS Glue en un script de ETL con estos parámetros.

    dyf = glueContext.create_dynamic_frame_from_options("s3", {'paths': ["s3://input-s3-path/"], 'recurse':True, 'groupFiles': 'inPartition', 'groupSize': '1048576'}, format="json")
  • Compresión: si los objetos de S3 ocupan cientos de megabytes, considere la posibilidad de comprimirlos. Hay varios formatos de compresión, que se pueden clasificar a grandes rasgos en dos tipos:

    • Los formatos de compresión que no se pueden dividir, como gzip, requieren que un nodo de trabajo descomprima todo el archivo.

    • Los formatos de compresión que se pueden dividir, como bzip2 o LZO (indexados), permiten la descompresión parcial de un archivo, que se puede paralelizar.

    En el caso de Spark (y otros motores de procesamiento distribuido habituales), dividirá el archivo de datos de origen en fragmentos que su motor pueda procesar en paralelo. A estas unidades se las suele denominar divisiones. Una vez que los datos estén en un formato que se pueda dividir, los lectores de AWS Glue optimizados pueden recuperar las divisiones de un objeto de S3 al ofrecer la opción Range a la API GetObject para recuperar solo bloques específicos. Tenga en cuenta el siguiente diagrama para ver cómo funcionaría esto en la práctica.

    Cada uno de los tres nodos de trabajo de AWS Glue se conecta a un archivo dividido en Amazon S3.

    Los datos comprimidos pueden acelerar considerablemente la aplicación, siempre que los archivos tengan un tamaño óptimo o se puedan dividir. Los tamaños de datos más pequeños reducen los datos analizados de Amazon S3 y el tráfico de red de Amazon S3 a su clúster de Spark. Por otro lado, se necesita más CPU para comprimir y descomprimir los datos. La cantidad de computación necesaria escala con la relación de compresión del algoritmo de compresión. Tenga en cuenta esta concesión a la hora de elegir el formato de compresión que se pueda dividir.

    nota

    Si bien los archivos gzip generalmente no se pueden dividir, puede comprimir bloques de Parquet individuales con gzip y esos bloques se pueden paralelizar.

  • Formato de archivo: utilice un formato de columnas. Apache Parquet y Apache ORC son formatos de datos de columnas habituales. Parquet y ORC almacenan los datos de manera eficiente mediante la compresión basada en columnas, codificando y comprimiendo cada columna en función de su tipo de datos. Para obtener más información sobre las codificaciones de Parquet, consulte Parquet encoding definitions. Los archivos de Parquet también se pueden dividir.

    Los formatos de columnas agrupan los valores por columnas y los almacenan juntos en bloques. Cuando utilice formatos de columnas, puede omitir bloques de datos que correspondan a columnas que no tiene previsto utilizar. Las aplicaciones de Spark pueden recuperar solo las columnas que necesita. En general, unos índices de compresión mejores o la omisión de bloques de datos implican leer menos bytes de Amazon S3, lo que se traduce en un mejor rendimiento. Ambos formatos también admiten los siguientes enfoques de inserción para reducir la E/S:

    • Inserción por proyección: la inserción por proyección es una técnica que permite recuperar únicamente las columnas especificadas en la aplicación. Puede especificar columnas en la aplicación de Spark, como se muestra en los siguientes ejemplos:

      • Ejemplo de DataFrame: df.select("star_rating")

      • Ejemplo de Spark SQL: spark.sql("select start_rating from <table>")

    • Inserción de predicados: la inserción de predicados es una técnica para procesar cláusulas WHERE y GROUP BY de forma eficiente. Ambos formatos tienen bloques de datos que representan valores de columnas. Cada bloque contiene las estadísticas del bloque, como los valores máximo y mínimo. Spark puede usar estas estadísticas para determinar si el bloque debe leerse u omitirse en función del valor del filtro utilizado en la aplicación. Para utilizar esta característica, agregue más filtros en las condiciones, como se muestra en los siguientes ejemplos:

      • Ejemplo de DataFrame: df.select("star_rating").filter("star_rating < 2")

      • Ejemplo de Spark SQL: spark.sql("select * from <table> where star_rating < 2")

  • Diseño de archivo: al almacenar los datos de S3 en objetos situados en diferentes rutas en función de cómo se vayan a utilizar los datos, podrá recuperar los datos pertinentes de forma eficiente. Para obtener más información, consulte Organizar objetos con prefijos en la documentación de Amazon S3. AWS Glue admite almacenar claves y valores en prefijos de Amazon S3 en formato key=value y los datos se particionan según la ruta de Amazon S3. La partición de los datos le permite restringir el volumen de datos que analiza cada aplicación de análisis posterior, lo que mejora el rendimiento y reduce los costos. Para obtener más información, consulte Administración de particiones para la salida de ETL en AWS Glue.

    La partición divide la tabla en diferentes partes y mantiene los datos relacionados en archivos agrupados en función de los valores de las columnas, como el año, el mes y el día, como se muestra en el siguiente ejemplo.

    # Partitioning by /YYYY/MM/DD s3://<YourBucket>/year=2023/month=03/day=31/0000.gz s3://<YourBucket>/year=2023/month=03/day=01/0000.gz s3://<YourBucket>/year=2023/month=03/day=02/0000.gz s3://<YourBucket>/year=2023/month=03/day=03/0000.gz ...

    Si quiere definir particiones para su conjunto de datos, modélelo con una tabla en el AWS Glue Data Catalog. A continuación, puede restringir la cantidad de datos analizados mediante la eliminación de particiones, de la siguiente manera:

    • Para DynamicFrame de AWS Glue, defina push_down_predicate (o catalogPartitionPredicate).

      dyf = Glue_context.create_dynamic_frame.from_catalog( database=src_database_name, table_name=src_table_name, push_down_predicate = "year='2023' and month ='03'", )
    • Para DataFrame de Spark, establezca una ruta fija para eliminar las particiones.

      df = spark.read.format("json").load("s3://<YourBucket>/year=2023/month=03/*/*.gz")
    • En el caso de Spark SQL, puede establecer la cláusula where para eliminar las particiones del Catálogo de datos.

      df = spark.sql("SELECT * FROM <Table> WHERE year= '2023' and month = '03'")
    • Para particionar por fecha al escribir los datos con AWS Glue, establezca partitionKeys en DynamicFrame o partitionBy() en DataFrame con la información de la fecha de sus columnas de la siguiente manera.

      • DynamicFrame

        glue_context.write_dynamic_frame_from_options( frame= dyf, connection_type='s3',format='parquet' connection_options= { 'partitionKeys': ["year", "month", "day"], 'path': 's3://<YourBucket>/<Prefix>/' } )
      • DataFrame

        df.write.mode('append')\ .partitionBy('year','month','day')\ .parquet('s3://<YourBucket>/<Prefix>/')

      Esto puede aumentar el rendimiento de los consumidores de sus datos de salida.

      Si no tiene acceso para modificar la canalización que crea el conjunto de datos de entrada, la creación de particiones no es una opción. En su lugar, puede excluir las rutas de S3 innecesarias mediante patrones glob. Establezca exclusiones al leer en DynamicFrame. Por ejemplo, el siguiente código excluye los días de los meses del 01 al 09 del año 2023.

      dyf = glueContext.create_dynamic_frame.from_catalog( database=db, table_name=table, additional_options = { "exclusions":"[\"**year=2023/month=0[1-9]/**\"]" }, transformation_ctx='dyf' )

      También puede establecer exclusiones en las propiedades de la tabla del Catálogo de datos:

      • Clave: exclusions

      • Valor:: ["**year=2023/month=0[1-9]/**"]

  • Demasiadas particiones de Amazon S3: evite particionar los datos de Amazon S3 en columnas que contengan una amplia gama de valores, como una columna de ID con miles de valores. Esto puede aumentar considerablemente el número de particiones del bucket, ya que el número de particiones posibles es el producto de todos los campos por los que ha creado particiones. Demasiadas particiones pueden provocar lo siguiente:

    • Aumento de la latencia para recuperar los metadatos de las particiones del Catálogo de datos.

    • Mayor número de archivos pequeños, lo que requiere más solicitudes a la API de Amazon S3 (List, Get y Head).

    Por ejemplo, si establece un tipo de fecha en partitionBy o partitionKeys, la creación de particiones de nivel de fecha, como yyyy/mm/dd, es adecuada para muchos casos de uso. Sin embargo, yyyy/mm/dd/<ID> podría generar tantas particiones que afectaría negativamente al rendimiento en su conjunto.

    Por otro lado, algunos casos de uso, como las aplicaciones de procesamiento en tiempo real, requieren muchas particiones, como yyyy/mm/dd/hh. Si su caso de uso requiere muchas particiones, considere la posibilidad de utilizar índices de particiones de AWS Glue para reducir la latencia a la hora de recuperar los metadatos de las particiones del Catálogo de datos.

Bases de datos y JDBC

Para reducir el análisis de datos al recuperar información de una base de datos, puede especificar un predicado (o cláusula) where en una consulta SQL. Las bases de datos que no proporcionen una interfaz de SQL ofrecerán su propio mecanismo de consulta o filtrado.

Al utilizar conexiones de Java Database Connectivity (JDBC), proporcione una consulta de selección con la cláusula where para los siguientes parámetros:

  • Para DynamicFrame, utilice la opción sampleQuery. Al utilizar create_dynamic_frame.from_catalog, configure el argumento additional_options de la siguiente manera.

    query = "SELECT * FROM <TableName> where id = 'XX' AND" datasource0 = glueContext.create_dynamic_frame.from_catalog( database = db, table_name = table, additional_options={ "sampleQuery": query, "hashexpression": key, "hashpartitions": 10, "enablePartitioningForSampleQuery": True }, transformation_ctx = "datasource0" )

    Cuando using create_dynamic_frame.from_options, configure el argumento connection_options de la siguiente manera.

    query = "SELECT * FROM <TableName> where id = 'XX' AND" datasource0 = glueContext.create_dynamic_frame.from_options( connection_type = connection, connection_options={ "url": url, "user": user, "password": password, "dbtable": table, "sampleQuery": query, "hashexpression": key, "hashpartitions": 10, "enablePartitioningForSampleQuery": True } )
  • Para DataFrame, utilice la opción query.

    query = "SELECT * FROM <TableName> where id = 'XX'" jdbcDF = spark.read \ .format('jdbc') \ .option('url', url) \ .option('user', user) \ .option('password', pwd) \ .option('query', query) \ .load()
  • En el caso de Amazon Redshift, utilice la versión 4.0 o posteriores de AWS Glue para aprovechar la compatibilidad con la inserción en el conector de Amazon Redshift Spark.

    dyf = glueContext.create_dynamic_frame.from_catalog( database = "redshift-dc-database-name", table_name = "redshift-table-name", redshift_tmp_dir = args["temp-s3-dir"], additional_options = {"aws_iam_role": "arn:aws:iam::role-account-id:role/rs-role-name"} )
  • Para otras bases de datos, consulte la documentación correspondiente.

AWS GlueOpciones de

  • Para evitar un análisis completo de todas las ejecuciones continuas de trabajos y procesar solo los datos que no estuvieron presentes durante la última ejecución del trabajo, habilite los marcadores de trabajos.

  • Para limitar la cantidad de datos de entrada que se van a procesar, habilite la ejecución delimitada con marcadores de trabajos. Esto ayuda a reducir la cantidad de datos analizados para cada ejecución de trabajo.