Migración in situ - AWS Guía prescriptiva

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Migración in situ

La migración in situ elimina la necesidad de volver a escribir todos los archivos de datos. En su lugar, los archivos de metadatos de Iceberg se generan y se vinculan a los archivos de datos existentes. Este método suele ser más rápido y rentable, especialmente para conjuntos de datos o tablas de gran tamaño que tienen formatos de archivo compatibles, como Parquet, Avro y ORC.

nota

La migración in situ no se puede utilizar al migrar a Amazon S3 Tables.

Iceberg ofrece dos opciones principales para implementar la migración in situ:

  • Se utiliza el procedimiento de captura de pantalla para crear una nueva tabla de Iceberg y, al mismo tiempo, mantener la tabla de origen sin cambios. Para obtener más información, consulte la tabla de instantáneas en la documentación de Iceberg.

  • Uso del procedimiento de migración para crear una nueva tabla Iceberg en sustitución de la tabla de origen. Para obtener más información, consulte Migrate Table en la documentación de Iceberg. Aunque este procedimiento funciona con Hive Metastore (HMS), actualmente no es compatible con. AWS Glue Data Catalog El procedimiento de replicación de la tabla, que se describe más AWS Glue Data Catalog adelante en esta guía, proporciona una solución alternativa para lograr un resultado similar con el catálogo de datos.

Después de realizar la migración in situ mediante uno snapshot o varios archivos de datosmigrate, es posible que algunos archivos de datos permanezcan sin migrar. Esto suele ocurrir cuando los escritores siguen escribiendo en la tabla de origen durante o después de la migración. Para incorporar los archivos restantes a la tabla Iceberg, puede utilizar el procedimiento add_files. Para obtener más información, consulte Añadir archivos en la documentación de Iceberg.

Supongamos que tiene una products tabla basada en Parquet que se creó y rellenó en Athena de la siguiente manera:

CREATE EXTERNAL TABLE mydb.products ( product_id INT, product_name STRING ) PARTITIONED BY (category STRING) STORED AS PARQUET LOCATION 's3://amzn-s3-demo-bucket/products/'; INSERT INTO mydb.products VALUES (1001, 'Smartphone', 'electronics'), (1002, 'Laptop', 'electronics'), (2001, 'T-Shirt', 'clothing'), (2002, 'Jeans', 'clothing');

En las siguientes secciones se explica cómo puede utilizar los migrate procedimientos snapshot y con esta tabla.

Opción 1: procedimiento de captura de pantalla

El snapshot procedimiento crea una nueva tabla Iceberg con un nombre diferente, pero que replica el esquema y la partición de la tabla de origen. Esta operación deja la tabla de origen completamente inalterada durante y después de la acción. De forma eficaz, crea una copia ligera de la tabla, que resulta especialmente útil para probar escenarios o explorar datos sin correr el riesgo de modificar la fuente de datos original. Este enfoque permite un período de transición en el que tanto la tabla original como la tabla Iceberg permanecen disponibles (consulte las notas al final de esta sección). Una vez finalizadas las pruebas, puede pasar la nueva mesa Iceberg a la de producción haciendo la transición de todos los grabadores y lectores a la nueva tabla.

Puede ejecutar el snapshot procedimiento mediante Spark en cualquier modelo de implementación de Amazon EMR (por ejemplo, Amazon EMR en Amazon EMR, Amazon EC2 EMR en EKS, EMR Serverless) y. AWS Glue

Para probar la migración in situ con el procedimiento de snapshot Spark, siga estos pasos:

  1. Abre una aplicación de Spark y configura la sesión de Spark con los siguientes ajustes:

    • "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"

    • "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog"

    • "spark.sql.catalog.spark_catalog.type":"glue"

    • "spark.hadoop.hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"

  2. Ejecute el snapshot procedimiento para crear una nueva tabla de Iceberg que apunte a los archivos de datos de la tabla original:

    spark.sql(f""" CALL system.snapshot( source_table => 'mydb.products', table => 'mydb.products_iceberg', location => 's3://amzn-s3-demo-bucket/products_iceberg/' ) """ ).show(truncate=False)

    El marco de datos de salida contiene imported_files_count (el número de archivos que se agregaron).

  3. Valide la nueva tabla consultándola:

    spark.sql(f""" SELECT * FROM mydb.products_iceberg LIMIT 10 """ ).show(truncate=False)
Notas
  • Tras ejecutar el procedimiento, cualquier modificación del archivo de datos en la tabla de origen desincronizará la tabla generada. Los archivos nuevos que añada no estarán visibles en la tabla Iceberg y los archivos que elimine afectarán a las capacidades de consulta de la tabla Iceberg. Para evitar los problemas de sincronización:

  • Al utilizar snapshot este procedimiento, la gc.enabled propiedad se establece true en las propiedades de la tabla Iceberg creada. Esta configuración prohíbe acciones como expire_snapshotsremove_orphan_files, o DROP TABLE con la PURGE opción, que eliminarían físicamente los archivos de datos. Las operaciones de eliminación o fusión tipo iceberg, que no afectan directamente a los archivos fuente, siguen estando permitidas. 

  • Para que su nueva tabla Iceberg sea completamente funcional, sin límites en cuanto a las acciones que eliminan físicamente los archivos de datos, puede cambiar la propiedad de la gc.enabled tabla a. false Sin embargo, esta configuración permitirá realizar acciones que afecten a los archivos de datos de origen, lo que podría dañar el acceso a la tabla original. Por lo tanto, cambie la gc.enabled propiedad solo si ya no necesita mantener la funcionalidad de la tabla original. Por ejemplo:

    spark.sql(f""" ALTER TABLE mydb.products_iceberg SET TBLPROPERTIES ('gc.enabled' = 'false'); """)

Opción 2: procedimiento de migración

El migrate procedimiento crea una nueva tabla Iceberg que tiene el mismo nombre, esquema y partición que la tabla de origen. Cuando se ejecuta este procedimiento, bloquea la tabla de origen y le cambia el nombre a <table_name>_BACKUP_ (o a un nombre personalizado especificado en el parámetro del backup_table_name procedimiento).

nota

Si establece el parámetro de drop_backup procedimiento entrue, la tabla original no se conservará como copia de seguridad.

En consecuencia, el procedimiento de la migrate tabla requiere que se detengan todas las modificaciones que afecten a la tabla de origen antes de realizar la acción. Antes de ejecutar el migrate procedimiento:

  • Detenga todos los grabadores que interactúen con la tabla fuente.

  • Modifique los lectores y escritores que no sean compatibles de forma nativa con Iceberg para habilitar la compatibilidad con Iceberg.

Por ejemplo:

  • Athena sigue trabajando sin modificaciones.

  • Spark requiere:

    • Archivos Iceberg Java Archive (JAR) que se incluirán en la ruta de clases (consulte las secciones Cómo trabajar con Iceberg en Amazon EMR y Trabajar con Iceberg AWS Glue en las secciones anteriores de esta guía).

    • Las siguientes configuraciones del catálogo de sesiones de Spark (se utilizan SparkSessionCatalog para añadir compatibilidad con Iceberg y, al mismo tiempo, mantener las funcionalidades de catálogo integradas para tablas que no son de Iceberg):

      • "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"

      • "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog"

      • "spark.sql.catalog.spark_catalog.type":"glue"

      • "spark.hadoop.hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"

Tras ejecutar el procedimiento, puedes reiniciar las grabadoras con su nueva configuración de Iceberg.

Actualmente, el migrate procedimiento no es compatible con el AWS Glue Data Catalog, porque el catálogo de datos no admite la RENAME operación. Por lo tanto, le recomendamos que utilice este procedimiento únicamente cuando trabaje con Hive Metastore. Si utiliza el catálogo de datos, consulte la siguiente sección para ver un enfoque alternativo.

Puede ejecutar el migrate procedimiento en todos los modelos de implementación de Amazon EMR (Amazon EMR activado, Amazon EC2 EMR en EKS, EMR Serverless), pero requiere una conexión configurada a Hive AWS Glue Metastore. Amazon EMR on EC2 es la opción recomendada porque ofrece una configuración integrada de Hive Metastore, que minimiza la complejidad de la configuración.

Para probar la migración in situ con el procedimiento migrate Spark desde un Amazon EMR EC2 en un clúster configurado con Hive Metastore, sigue estos pasos:

  1. Inicie una aplicación de Spark y configure la sesión de Spark para usar la implementación del catálogo de Iceberg Hive. Por ejemplo, si utiliza la pyspark CLI:

    pyspark --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hive
  2. Cree una products tabla en Hive Metastore. Esta es la tabla de origen, que ya existe en una migración típica.

    1. Cree la tabla Hive products externa en Hive Metastore para que apunte a los datos existentes en Amazon S3:

      spark.sql(f""" CREATE EXTERNAL TABLE products ( product_id INT, product_name STRING ) PARTITIONED BY (category STRING) STORED AS PARQUET LOCATION 's3://amzn-s3-demo-bucket/products/'; """ )
    2. Agregue las particiones existentes mediante el comando: MSCK REPAIR TABLE

      spark.sql(f""" MSCK REPAIR TABLE products """ )
    3. Confirme que la tabla contiene datos ejecutando una SELECT consulta:

      spark.sql(f""" SELECT * FROM products """ ).show(truncate=False)

      Resultado de ejemplo:

      Ejemplo del resultado de la validación de datos durante la migración de la tabla Iceberg.
  3. Utilice el procedimiento de Iceberg: migrate

    df_res=spark.sql(f""" CALL system.migrate( table => 'default.products' ) """ ) df_res.show()

    El marco de datos de salida contiene migrated_files_count (el número de archivos que se agregaron a la tabla Iceberg):

    Ejemplo de resultado de la validación del recuento de archivos durante la migración de la tabla Iceberg.
  4. Confirme que se creó la tabla de respaldo:

    spark.sql("show tables").show()

    Resultado de ejemplo:

    Ejemplo de resultado de la validación de copias de seguridad durante la migración de la tabla Iceberg.
  5. Valide la operación consultando la tabla de Iceberg:

    spark.sql(f""" SELECT * FROM products """ ).show(truncate=False)
Notas
  • Tras ejecutar el procedimiento, todos los procesos actuales que consultan o escriben en la tabla de origen se verán afectados si no están configurados correctamente con el soporte de Iceberg. Por lo tanto, le recomendamos que siga estos pasos:

    1. Detenga todos los procesos utilizando la tabla de origen antes de la migración.

    2. Realice la migración.

    3. Reactive los procesos utilizando la configuración de Iceberg adecuada.

  • Si se producen modificaciones en los archivos de datos durante el proceso de migración (se añaden archivos nuevos o se eliminan archivos), la tabla generada no estará sincronizada. Para ver las opciones de sincronización, consulte Mantener sincronizadas las tablas de Iceberg tras una migración local más adelante en esta sección.

Replicar el procedimiento de migración de tablas en AWS Glue Data Catalog

Puede replicar el resultado del procedimiento de migración AWS Glue Data Catalog (haciendo una copia de seguridad de la tabla original y sustituyéndola por una tabla Iceberg) siguiendo estos pasos:

  1. Utilice el procedimiento de captura de pantalla para crear una nueva tabla de iceberg que apunte a los archivos de datos de la tabla original.

  2. Haga una copia de seguridad de los metadatos de la tabla original en el catálogo de datos:

    1. Utilice la GetTableAPI para recuperar la definición de la tabla de origen.

    2. Usa la GetPartitionsAPI para recuperar la definición de partición de la tabla fuente.

    3. Utilice la CreateTableAPI para crear una tabla de respaldo en el catálogo de datos.

    4. Utilice la BatchCreatePartitionAPI CreatePartitiono para registrar las particiones en la tabla de respaldo del catálogo de datos.

  3. Cambie la propiedad de la tabla gc.enabled Iceberg false a para habilitar todas las operaciones de la tabla.

  4. Elimine la tabla original.

  5. Localice el archivo JSON de metadatos de la tabla Iceberg en la carpeta de metadatos de la ubicación raíz de la tabla.

  6. Registre la nueva tabla en el catálogo de datos mediante el procedimiento register_table con el nombre de la tabla original y la ubicación del metadata.json archivo que se creó mediante el procedimiento: snapshot

    spark.sql(f""" CALL system.register_table( table => 'mydb.products', metadata_file => '{iceberg_metadata_file}' ) """ ).show(truncate=False)

Mantener sincronizadas las tablas de Iceberg tras la migración in situ

El add_files procedimiento proporciona una forma flexible de incorporar los datos existentes en las tablas de Iceberg. En concreto, registra los archivos de datos existentes (como los archivos Parquet) haciendo referencia a sus rutas absolutas en la capa de metadatos de Iceberg. De forma predeterminada, el procedimiento agrega archivos de todas las particiones de tabla a una tabla de Iceberg, pero puede agregar archivos de particiones específicas de forma selectiva. Este enfoque selectivo es particularmente útil en varios escenarios:

  • Cuando se agregan nuevas particiones a la tabla de origen después de la migración inicial.

  • Cuando se agregan o eliminan archivos de datos de las particiones existentes después de la migración inicial. Sin embargo, si se vuelven a añadir particiones modificadas, primero se debe eliminar la partición. Más adelante en esta sección se proporciona más información al respecto.

Estas son algunas consideraciones para utilizar el add_file procedimiento después de realizar (snapshotomigrate) la migración in situ, a fin de mantener la nueva tabla de Iceberg sincronizada con los archivos de datos de origen:

  • Cuando se agreguen nuevos datos a las nuevas particiones de la tabla de origen, utilice el add_files procedimiento con la partition_filter opción de incorporar estas adiciones de forma selectiva en la tabla de Iceberg:

    spark.sql(f""" CALL system.add_files( source_table => 'mydb.products', table => 'mydb.products_iceberg', partition_filter => map('category', 'electronics') ).show(truncate=False)

    o bien:

    spark.sql(f""" CALL system.add_files( source_table => '`parquet`.`s3://amzn-s3-demo-bucket/products/`', table => 'mydb.products_iceberg', partition_filter => map('category', 'electronics') ).show(truncate=False)
  • El add_files procedimiento busca archivos en toda la tabla de origen o en particiones específicas cuando se especifica la partition_filter opción e intenta añadir todos los archivos que encuentre a la tabla de Iceberg. De forma predeterminada, la propiedad del check_duplicate_files procedimiento está establecida entrue, lo que impide que el procedimiento se ejecute si los archivos ya existen en la tabla Iceberg. Esto es importante porque no hay una opción integrada para omitir los archivos agregados anteriormente y, al deshabilitarlos, los archivos se check_duplicate_files añadirán dos veces, lo que generará duplicados. Cuando se agreguen nuevos archivos a la tabla de origen, siga estos pasos:

    1. Para particiones nuevas, add_files utilícelo con partition_filter a para importar solo los archivos de la nueva partición.

    2. En el caso de las particiones existentes, elimine primero la partición de la tabla de Iceberg y, a continuación, vuelva a ejecutarla add_files para esa partición, especificando la. partition_filter Por ejemplo:

      # We initially perform in-place migration with snapshot spark.sql(f""" CALL system.snapshot( source_table => 'mydb.products', table => 'mydb.products_iceberg', location => 's3://amzn-s3-demo-bucket/products_iceberg/' ) """ ).show(truncate=False) # Then on the source table, some new files were generated under the category='electronics' partition. Example: spark.sql(""" INSERT INTO mydb.products VALUES (1003, 'Tablet', 'electronics') """) # We delete the modified partition from the Iceberg table. Note this is a metadata operation only spark.sql(""" DELETE FROM mydb.products_iceberg WHERE category = 'electronics' """) # We add_files from the modified partition spark.sql(""" CALL system.add_files( source_table => 'mydb.products', table => 'mydb.products_iceberg', partition_filter => map('category', 'electronics') ) """).show(truncate=False)
nota

Cada add_files operación genera una nueva instantánea de la tabla Iceberg con datos adjuntos.

Elegir la estrategia de migración local adecuada

Para elegir la mejor estrategia de migración local, tenga en cuenta las preguntas de la siguiente tabla.

Pregunta

Recomendación

Explicación

¿Desea realizar una migración rápida sin tener que volver a escribir los datos y, al mismo tiempo, mantener accesibles las tablas de Hive e Iceberg para realizar pruebas o realizar una transición gradual?

snapshotprocedimiento seguido de procedimiento add_files

Utilice el snapshot procedimiento para crear una nueva tabla Iceberg clonando el esquema y haciendo referencia a los archivos de datos, sin modificar la tabla de origen. Utilice el add_files procedimiento para incorporar las particiones que se agregaron o modificaron después de la migración, teniendo en cuenta que volver a agregar las particiones modificadas requiere primero eliminarlas.

¿Utiliza Hive Metastore y desea reemplazar la tabla de Hive por una tabla de Iceberg inmediatamente, sin tener que volver a escribir los datos?

migrateadd_filesprocedimiento seguido de procedimiento

Utilice migrate este procedimiento para crear una tabla Iceberg, hacer una copia de seguridad de la tabla de origen y sustituir la tabla original por la versión Iceberg.

Nota: Esta opción es compatible con Hive Metastore, pero no con. AWS Glue Data Catalog

Utilice el add_files procedimiento para incorporar las particiones que se agregaron o modificaron después de la migración, teniendo en cuenta que volver a añadir las particiones modificadas requiere primero eliminarlas.

¿Está utilizando la tabla Hive por una tabla Iceberg AWS Glue Data Catalog y desea sustituirla inmediatamente, sin tener que volver a escribir los datos?

Adaptación del migrate procedimiento, seguida del procedimiento add_files

Replicar migrate el comportamiento del procedimiento:

  1. Se utiliza snapshot para crear una tabla de iceberg.

  2. Realice una copia de seguridad de los metadatos de la tabla original mediante AWS Glue APIs.

  3. Actívela gc.enabled en las propiedades de la tabla Iceberg.

  4. Elimine la tabla original.

  5. Se utiliza register_table para crear una nueva entrada de tabla con el nombre original.

Nota: Esta opción requiere la gestión manual de las llamadas a la AWS Glue API para la copia de seguridad de los metadatos.

Utilice el add_files procedimiento para incorporar las particiones que se agregaron o modificaron después de la migración. Tenga en cuenta que, para volver a agregar las particiones modificadas, primero es necesario eliminarlas.