就地迁移 - AWS 规范性指导

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

就地迁移

就地迁移无需重写所有数据文件。取而代之的是生成 Iceberg 元数据文件并将其链接到您的现有数据文件。这种方法通常更快、更具成本效益,特别是对于具有兼容文件格式(例如 Parquet、Avro 和 ORC)的大型数据集或表。

注意

迁移到 Amazon S3 表时,不能使用就地迁移。

Iceberg 为实现就地迁移提供了两个主要选项:

  • 使用快照过程创建新的 Iceberg 表,同时保持源表不变。有关更多信息,请参阅 Iceberg 文档中的快照表

  • 使用迁移过程创建新的 Iceberg 表作为源表的替换。有关更多信息,请参阅 Iceberg 文档中的迁移表。尽管此过程适用于 Hive Metastore (HMS),但它目前与 Hive Metastore 不兼容。 AWS Glue Data Catalog本指南后 AWS Glue Data Catalog面部分的 “复制表迁移过程” 提供了一种解决方法,可使用数据目录实现类似的结果。

使用snapshot或执行就地迁移后migrate,某些数据文件可能仍处于未迁移状态。当写入者在迁移期间或迁移之后继续写入源表时,通常会发生这种情况。要将这些剩余文件合并到 Iceberg 表中,可以使用 add_f iles 过程。有关更多信息,请参阅 Iceberg 文档中的添加文件

假设你有一个基于 Parquet 的桌子,该products表是在 Athena 中创建和填充的,如下所示:

CREATE EXTERNAL TABLE mydb.products ( product_id INT, product_name STRING ) PARTITIONED BY (category STRING) STORED AS PARQUET LOCATION 's3://amzn-s3-demo-bucket/products/'; INSERT INTO mydb.products VALUES (1001, 'Smartphone', 'electronics'), (1002, 'Laptop', 'electronics'), (2001, 'T-Shirt', 'clothing'), (2002, 'Jeans', 'clothing');

以下各节说明如何在此表中使用snapshotmigrate过程。

选项 1:快照过程

snapshot过程创建一个新的 Iceberg 表,该表具有不同的名称,但会复制源表的架构和分区。此操作使源表在操作期间和操作之后都完全不变。它可以有效地创建表的轻量级副本,这对于测试场景或数据探索特别有用,而不会冒修改原始数据源的风险。这种方法可以实现一个过渡期,在这个过渡期,原始表和 Iceberg 表都保持可用(参见本节末尾的注释)。测试完成后,您可以通过将所有写入者和读者过渡到新表来将新 Iceberg 表移至生产环境。

你可以在任何亚马逊 EMR 部署模式中使用 Spark 来运行该snapshot过程(例如,开启亚马逊 EMR,在 EKS 上使用 Amazon EC2 EMR,EMR Serverless)和。 AWS Glue

要使用 snapshot Spark 程序测试就地迁移,请执行以下步骤:

  1. 启动 Spark 应用程序并使用以下设置配置 Spark 会话:

    • "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"

    • "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog"

    • "spark.sql.catalog.spark_catalog.type":"glue"

    • "spark.hadoop.hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"

  2. 运行以下snapshot过程创建指向原始表数据文件的新 Iceberg 表:

    spark.sql(f""" CALL system.snapshot( source_table => 'mydb.products', table => 'mydb.products_iceberg', location => 's3://amzn-s3-demo-bucket/products_iceberg/' ) """ ).show(truncate=False)

    输出 dataFrame 包含imported_files_count(已添加的文件数)。

  3. 通过查询来验证新表:

    spark.sql(f""" SELECT * FROM mydb.products_iceberg LIMIT 10 """ ).show(truncate=False)
备注
  • 运行该过程后,对源表的任何数据文件修改都将使生成的表失去同步。您添加的新文件将在 Iceberg 表中不可见,而您删除的文件将影响 Iceberg 表中的查询功能。要避免同步问题,请执行以下操作:

  • 使用该snapshot过程时,该gc.enabled属性将在创建的 Iceberg 表的表属性true中设置为。此设置禁止诸如expire_snapshotsremove_orphan_files、或PURGE选项之类DROP TABLE的操作,这些操作会以物理方式删除数据文件。仍然允许不直接影响源文件的 Iceberg 删除或合并操作。 

  • 要使您的新 Iceberg 表完全正常运行,并且对物理删除数据文件的操作没有限制,您可以将gc.enabled表属性更改为false。但是,此设置将允许影响源数据文件的操作,这可能会破坏对原始表的访问。因此,只有在不再需要维护原始表的功能时才更改该gc.enabled属性。例如:

    spark.sql(f""" ALTER TABLE mydb.products_iceberg SET TBLPROPERTIES ('gc.enabled' = 'false'); """)

选项 2:迁移程序

migrate过程创建一个新的 Iceberg 表,该表的名称、架构和分区与源表相同。当此过程运行时,它会锁定源表并将其重命名为<table_name>_BACKUP_(或由backup_table_name过程参数指定的自定义名称)。

注意

如果将drop_backup过程参数设置为true,则原始表将不会保留为备份。

因此,migrate表过程要求在执行操作之前停止所有影响源表的修改。在运行该migrate过程之前:

  • 停止所有与源表交互的写入器。

  • 修改原生不支持 Iceberg 的读者和作者以启用 Iceberg 支持。

例如:

  • Athena 继续工作,无需任何修改。

  • Spark 需要:

    • 要包含在类路径中的 Iceberg Java 存档 (JAR) 文件(参见本指南前面章节中的 “在 Amazon EMR 中使用 Iceb erg” 和 “使用 Iceberg”)。 AWS Glue

    • SparkSessionCatalog以下 Spark 会话目录配置(用于添加 Iceberg 支持,同时维护非 Iceberg 表的内置目录功能):

      • "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"

      • "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog"

      • "spark.sql.catalog.spark_catalog.type":"glue"

      • "spark.hadoop.hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"

运行该过程后,您可以使用新的 Iceberg 配置重新启动写入器。

目前,该migrate过程与不兼容 AWS Glue Data Catalog,因为数据目录不支持该RENAME操作。因此,我们建议您仅在使用 Hive Metastore 时使用此过程。如果您使用的是数据目录,请参阅下一节以了解替代方法。

你可以在所有亚马逊 EMR 部署模型(开启亚马逊 EMR,在 EKS 上运行 Amazon EMR,在 EKS 上运行 EMR Serverless)上运行该migrate程序,但它需要配置与 Hive Metast AWS Glue ore 的连接。 EC2建议选择 Amazon EMR on EC2 ,因为它提供了内置 Hive Metastore 配置,可最大限度地降低设置复杂性。

要使用 migrate Spark 程序在配置了 Hive Metastore 的集群 EC2 上测试来自亚马逊 EMR 的就地迁移,请执行以下步骤:

  1. 启动 Spark 应用程序并将 Spark 会话配置为使用 Iceberg Hive 目录实现。例如,如果你使用的是 pyspark CLI:

    pyspark --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hive
  2. 在 Hive 元数据仓中创建products表。这是源表,它已经存在于典型的迁移中。

    1. 在 Hive Metastore 中创建products外部 Hive 表以指向 Amazon S3 中的现有数据:

      spark.sql(f""" CREATE EXTERNAL TABLE products ( product_id INT, product_name STRING ) PARTITIONED BY (category STRING) STORED AS PARQUET LOCATION 's3://amzn-s3-demo-bucket/products/'; """ )
    2. 使用MSCK REPAIR TABLE以下命令添加现有分区:

      spark.sql(f""" MSCK REPAIR TABLE products """ )
    3. 通过运行SELECT查询来确认表中是否包含数据:

      spark.sql(f""" SELECT * FROM products """ ).show(truncate=False)

      示例输出:

      Iceberg 表迁移期间数据验证的输出示例。
  3. 使用 Iceberg migrate 程序:

    df_res=spark.sql(f""" CALL system.migrate( table => 'default.products' ) """ ) df_res.show()

    输出 dataFrame 包含migrated_files_count(添加到 Iceberg 表中的文件数量):

    Iceberg 表迁移期间文件计数验证的输出示例。
  4. 确认备份表已创建:

    spark.sql("show tables").show()

    示例输出:

    Iceberg 表迁移期间备份验证的输出示例。
  5. 通过查询 Iceberg 表来验证操作:

    spark.sql(f""" SELECT * FROM products """ ).show(truncate=False)
备注
  • 运行该过程后,如果当前所有查询或写入源表的进程未正确配置 Iceberg 支持,则这些进程都将受到影响。因此,我们建议您按照以下步骤操作:

    1. 迁移前使用源表停止所有进程。

    2. 执行迁移。

    3. 使用正确的 Iceberg 设置重新激活进程。

  • 如果在迁移过程中修改了数据文件(添加了新文件或删除了文件),则生成的表将不同步。有关同步选项,请参阅本节后面的就地迁移后使 Iceberg 表保持同步。

在中复制表迁移过程 AWS Glue Data Catalog

您可以按照以下步骤在中复制迁移过程的结果 AWS Glue Data Catalog (备份原始表并将其替换为 Iceberg 表):

  1. 使用快照过程创建新的 Iceberg 表,该表指向原始表的数据文件。

  2. 备份数据目录中的原始表元数据:

    1. 使用 GetTableAPI 检索源表定义。

    2. 使用 GetPartitionsAPI 检索源表分区定义。

    3. 使用 CreateTableAPI 在数据目录中创建备份表。

    4. 使用CreatePartitionBatchCreatePartitionAPI 将分区注册到数据目录中的备份表。

  3. gc.enabled Iceberg 表属性更改为false,以启用全表操作。

  4. 删除原始表。

  5. 在表根位置的元数据文件夹中找到 Iceberg 表元数据 JSON 文件。

  6. 使用 regi ster_table 过程在数据目录中注册新表,并使用该过程创建的原始表名和metadata.json文件的位置:snapshot

    spark.sql(f""" CALL system.register_table( table => 'mydb.products', metadata_file => '{iceberg_metadata_file}' ) """ ).show(truncate=False)

就地迁移后保持 Iceberg 表同步

add_files过程提供了一种灵活的方法来将现有数据合并到 Iceberg 表中。具体而言,它通过在 Iceberg 的元数据层中引用现有数据文件(例如 Parquet 文件)的绝对路径来注册它们。默认情况下,该过程会将所有表分区中的文件添加到 Iceberg 表,但您可以有选择地添加来自特定分区的文件。这种选择性方法在以下几种情况下特别有用:

  • 在初始迁移后向源表中添加新分区时。

  • 在初始迁移后向现有分区中添加或删除数据文件时。但是,重新添加修改后的分区需要先删除分区。本节稍后将提供有关此内容的更多信息。

以下是在执行就地迁移(snapshotmigrate)后使用该add_file过程以使新的 Iceberg 表与源数据文件保持同步的一些注意事项:

  • 将新数据添加到源表中的新分区时,请使用带有partition_filter选项的add_files过程有选择地将这些新增数据合并到 Iceberg 表中:

    spark.sql(f""" CALL system.add_files( source_table => 'mydb.products', table => 'mydb.products_iceberg', partition_filter => map('category', 'electronics') ).show(truncate=False)

    或者:

    spark.sql(f""" CALL system.add_files( source_table => '`parquet`.`s3://amzn-s3-demo-bucket/products/`', table => 'mydb.products_iceberg', partition_filter => map('category', 'electronics') ).show(truncate=False)
  • 当您指定partition_filter选项时,该add_files过程会扫描整个源表或特定分区中的文件,并尝试将其找到的所有文件添加到 Iceberg 表中。默认情况下,check_duplicate_files过程属性设置为true,如果文件已存在于 Iceberg 表中,则程序将无法运行。这很重要,因为没有内置选项可以跳过先前添加的文件,禁用check_duplicate_files会导致文件被添加两次,从而创建重复的文件。将新文件添加到源表时,请按照以下步骤操作:

    1. 对于新分区,add_files与一起使用partition_filter,仅从新分区导入文件。

    2. 对于现有分区,请先从 Iceberg 表中删除该分区,然后对该分区重新运行add_files,指定。partition_filter例如:

      # We initially perform in-place migration with snapshot spark.sql(f""" CALL system.snapshot( source_table => 'mydb.products', table => 'mydb.products_iceberg', location => 's3://amzn-s3-demo-bucket/products_iceberg/' ) """ ).show(truncate=False) # Then on the source table, some new files were generated under the category='electronics' partition. Example: spark.sql(""" INSERT INTO mydb.products VALUES (1003, 'Tablet', 'electronics') """) # We delete the modified partition from the Iceberg table. Note this is a metadata operation only spark.sql(""" DELETE FROM mydb.products_iceberg WHERE category = 'electronics' """) # We add_files from the modified partition spark.sql(""" CALL system.add_files( source_table => 'mydb.products', table => 'mydb.products_iceberg', partition_filter => map('category', 'electronics') ) """).show(truncate=False)
注意

每个add_files操作都会生成一个包含附加数据的新的 Iceberg 表快照。

选择正确的就地迁移策略

要选择最佳的就地迁移策略,请考虑下表中的问题。

问题

建议

说明

您是否想在不重写数据的情况下快速迁移,同时保持 Hive 和 Iceberg 表可供测试或逐步过渡使用?

snapshot程序之后是add_files程序

使用该snapshot过程通过克隆架构并引用数据文件来创建新的 Iceberg 表,而无需修改源表。使用此add_files过程合并迁移后添加或修改的分区,注意重新添加修改后的分区需要先删除分区。

你是否在使用 Hive Metastore,是否想在不重写数据的情况下立即用 Iceberg 表替换 Hive 表?

migrate程序之后是add_files程序

使用该migrate过程创建 Iceberg 表,备份源表,然后用 Iceberg 版本替换原始表。

注意:此选项与 Hive Metastore 兼容,但不兼容。 AWS Glue Data Catalog

使用此add_files过程合并迁移后添加或修改的分区,注意重新添加修改后的分区需要先删除分区。

您是否正在使用 AWS Glue Data Catalog Hive 表,是否要立即用 Iceberg 表替换 Hive 表,而不必重写数据?

调整migrate程序,然后是add_files程序

复制migrate过程行为:

  1. snapshot用于创建冰山表。

  2. 使用备份原始表元数据 AWS Glue APIs。

  3. gc.enabled在 Iceberg 表属性上启用。

  4. 删除原始表。

  5. register_table用于使用原始名称创建新的表条目。

注意:此选项需要手动处理元数据备份 AWS Glue 的 API 调用。

使用此add_files过程合并迁移后添加或修改的分区,注意重新添加修改后的分区需要先删除分区。