Migração local - AWS Orientação prescritiva

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Migração local

A migração local elimina a necessidade de reescrever todos os seus arquivos de dados. Em vez disso, os arquivos de metadados do Iceberg são gerados e vinculados aos seus arquivos de dados existentes. Esse método geralmente é mais rápido e econômico, especialmente para grandes conjuntos de dados ou tabelas com formatos de arquivo compatíveis, como Parquet, Avro e ORC.

nota

A migração local não pode ser usada ao migrar para as tabelas do Amazon S3.

O Iceberg oferece duas opções principais para implementar a migração local:

Depois de realizar a migração local usando um snapshot oumigrate, alguns arquivos de dados podem permanecer sem migração. Isso geralmente acontece quando os gravadores continuam gravando na tabela de origem durante ou após a migração. Para incorporar esses arquivos restantes em sua tabela Iceberg, você pode usar o procedimento add_files. Para obter mais informações, consulte Adicionar arquivos na documentação do Iceberg.

Digamos que você tenha uma products tabela baseada em Parquet que foi criada e preenchida no Athena da seguinte forma:

CREATE EXTERNAL TABLE mydb.products ( product_id INT, product_name STRING ) PARTITIONED BY (category STRING) STORED AS PARQUET LOCATION 's3://amzn-s3-demo-bucket/products/'; INSERT INTO mydb.products VALUES (1001, 'Smartphone', 'electronics'), (1002, 'Laptop', 'electronics'), (2001, 'T-Shirt', 'clothing'), (2002, 'Jeans', 'clothing');

As seções a seguir explicam como você pode usar os migrate procedimentos snapshot e com essa tabela.

Opção 1: procedimento de captura instantânea

O snapshot procedimento cria uma nova tabela Iceberg que tem um nome diferente, mas replica o esquema e o particionamento da tabela de origem. Essa operação deixa a tabela de origem completamente inalterada durante e após a ação. Ele cria efetivamente uma cópia leve da tabela, o que é particularmente útil para testar cenários ou explorar dados sem correr o risco de modificações na fonte de dados original. Essa abordagem permite um período de transição em que a tabela original e a tabela Iceberg permanecem disponíveis (consulte as notas no final desta seção). Quando o teste estiver concluído, você poderá mover sua nova tabela Iceberg para produção fazendo a transição de todos os escritores e leitores para a nova tabela.

Você pode executar o snapshot procedimento usando o Spark em qualquer modelo de implantação do Amazon EMR (por exemplo, Amazon EMR on, Amazon EMR on EKS EC2, EMR Serverless) e. AWS Glue

Para testar a migração local com o procedimento do snapshot Spark, siga estas etapas:

  1. Inicie um aplicativo Spark e configure a sessão do Spark com as seguintes configurações:

    • "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"

    • "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog"

    • "spark.sql.catalog.spark_catalog.type":"glue"

    • "spark.hadoop.hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"

  2. Execute o snapshot procedimento para criar uma nova tabela Iceberg que aponte para os arquivos de dados da tabela original:

    spark.sql(f""" CALL system.snapshot( source_table => 'mydb.products', table => 'mydb.products_iceberg', location => 's3://amzn-s3-demo-bucket/products_iceberg/' ) """ ).show(truncate=False)

    O quadro de dados de saída contém o imported_files_count (o número de arquivos que foram adicionados).

  3. Valide a nova tabela consultando-a:

    spark.sql(f""" SELECT * FROM mydb.products_iceberg LIMIT 10 """ ).show(truncate=False)
Observações
  • Depois de executar o procedimento, qualquer modificação no arquivo de dados na tabela de origem dessincronizará a tabela gerada. Os novos arquivos que você adicionar não estarão visíveis na tabela Iceberg, e os arquivos que você removeu afetarão os recursos de consulta na tabela Iceberg. Para evitar os problemas de sincronização:

    • Se a nova tabela Iceberg for destinada ao uso em produção, interrompa todos os processos gravados na tabela original e redirecione-os para a nova tabela.

    • Se você precisar de um período de transição ou se a nova tabela do Iceberg for para fins de teste, consulte Mantendo as tabelas do Iceberg sincronizadas após a migração local, mais adiante nesta seção, para obter orientação sobre como manter a sincronização das tabelas.

  • Quando você usa o snapshot procedimento, a gc.enabled propriedade é definida true nas propriedades da tabela Iceberg criada. Essa configuração proíbe ações como expire_snapshotsremove_orphan_files, ou DROP TABLE com a PURGE opção, que excluiriam fisicamente os arquivos de dados. As operações de exclusão ou mesclagem do Iceberg, que não afetam diretamente os arquivos de origem, ainda são permitidas. 

  • Para tornar sua nova tabela Iceberg totalmente funcional, sem limites nas ações que excluem fisicamente os arquivos de dados, você pode alterar a propriedade da gc.enabled tabela parafalse. No entanto, essa configuração permitirá ações que afetem os arquivos de dados de origem, o que pode corromper o acesso à tabela original. Portanto, altere a gc.enabled propriedade somente se você não precisar mais manter a funcionalidade da tabela original. Por exemplo:

    spark.sql(f""" ALTER TABLE mydb.products_iceberg SET TBLPROPERTIES ('gc.enabled' = 'false'); """)

Opção 2: procedimento de migração

O migrate procedimento cria uma nova tabela Iceberg que tem o mesmo nome, esquema e particionamento da tabela de origem. Quando esse procedimento é executado, ele bloqueia a tabela de origem e a renomeia para <table_name>_BACKUP_ (ou um nome personalizado especificado pelo parâmetro do backup_table_name procedimento).

nota

Se você definir o parâmetro do drop_backup procedimento comotrue, a tabela original não será mantida como backup.

Consequentemente, o procedimento da migrate tabela exige que todas as modificações que afetam a tabela de origem sejam interrompidas antes que a ação seja executada. Antes de executar o migrate procedimento:

  • Pare todos os escritores que interagem com a tabela de origem.

  • Modifique leitores e escritores que não oferecem suporte nativo ao Iceberg para ativar o suporte ao Iceberg.

Por exemplo:

  • Athena continua trabalhando sem modificações.

  • O Spark requer:

    • Arquivos Iceberg Java Archive (JAR) a serem incluídos no classpath (consulte Trabalhando com o Iceberg no Amazon EMR e Trabalhando com o Iceberg nas seções anteriores deste guia). AWS Glue

    • As seguintes configurações do catálogo de sessões do Spark (usadas SparkSessionCatalog para adicionar suporte ao Iceberg e, ao mesmo tempo, manter as funcionalidades integradas do catálogo para tabelas que não são do Iceberg):

      • "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"

      • "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog"

      • "spark.sql.catalog.spark_catalog.type":"glue"

      • "spark.hadoop.hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"

Depois de executar o procedimento, você pode reiniciar seus gravadores com a nova configuração do Iceberg.

Atualmente, o migrate procedimento não é compatível com o AWS Glue Data Catalog, porque o Catálogo de Dados não suporta a RENAME operação. Portanto, recomendamos que você use esse procedimento somente quando estiver trabalhando com o Hive Metastore. Se você estiver usando o Catálogo de Dados, consulte a próxima seção para obter uma abordagem alternativa.

Você pode executar o migrate procedimento em todos os modelos de implantação do Amazon EMR (Amazon EMR ativado, Amazon EC2 EMR no EKS, EMR Serverless), mas isso requer uma conexão configurada com o Hive AWS Glue Metastore. O Amazon EMR ativado EC2 é a opção recomendada porque fornece uma configuração integrada do Hive Metastore, o que minimiza a complexidade da configuração.

Para testar a migração local com o procedimento migrate Spark de um Amazon EMR em um EC2 cluster configurado com o Hive Metastore, siga estas etapas:

  1. Inicie um aplicativo Spark e configure a sessão do Spark para usar a implementação do catálogo Iceberg Hive. Por exemplo, se você estiver usando a pyspark CLI:

    pyspark --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hive
  2. Crie uma products tabela no Hive Metastore. Essa é a tabela de origem, que já existe em uma migração típica.

    1. Crie a tabela products externa do Hive no Hive Metastore para apontar para os dados existentes no Amazon S3:

      spark.sql(f""" CREATE EXTERNAL TABLE products ( product_id INT, product_name STRING ) PARTITIONED BY (category STRING) STORED AS PARQUET LOCATION 's3://amzn-s3-demo-bucket/products/'; """ )
    2. Adicione as partições existentes usando o MSCK REPAIR TABLE comando:

      spark.sql(f""" MSCK REPAIR TABLE products """ )
    3. Confirme se a tabela contém dados executando uma SELECT consulta:

      spark.sql(f""" SELECT * FROM products """ ).show(truncate=False)

      Exemplo de resultado:

      Exemplo de saída da validação de dados durante a migração da tabela Iceberg.
  3. Use o migrate procedimento Iceberg:

    df_res=spark.sql(f""" CALL system.migrate( table => 'default.products' ) """ ) df_res.show()

    O quadro de dados de saída contém migrated_files_count (os números de arquivos que foram adicionados à tabela Iceberg):

    Exemplo de saída da validação da contagem de arquivos durante a migração da tabela Iceberg.
  4. Confirme se a tabela de backup foi criada:

    spark.sql("show tables").show()

    Exemplo de resultado:

    Exemplo de saída da validação de backup durante a migração da tabela Iceberg.
  5. Valide a operação consultando a tabela Iceberg:

    spark.sql(f""" SELECT * FROM products """ ).show(truncate=False)
Observações
  • Depois de executar o procedimento, todos os processos atuais que consultam ou gravam na tabela de origem serão afetados se não estiverem configurados corretamente com o suporte do Iceberg. Portanto, recomendamos que você siga estas etapas:

    1. Pare todos os processos usando a tabela de origem antes da migração.

    2. Execute a migração.

    3. Reative os processos usando as configurações adequadas do Iceberg.

  • Se ocorrerem modificações no arquivo de dados durante o processo de migração (novos arquivos forem adicionados ou removidos), a tabela gerada ficará fora de sincronia. Para opções de sincronização, consulte Mantendo as tabelas do Iceberg sincronizadas após a migração local, mais adiante nesta seção.

Replicando o procedimento de migração de tabelas no AWS Glue Data Catalog

Você pode replicar o resultado do procedimento de migração em AWS Glue Data Catalog (fazendo backup da tabela original e substituindo-a por uma tabela Iceberg) seguindo estas etapas:

  1. Use o procedimento de captura instantânea para criar uma nova tabela Iceberg que aponta para os arquivos de dados da tabela original.

  2. Faça backup dos metadados da tabela original no Catálogo de Dados:

    1. Use a GetTableAPI para recuperar a definição da tabela de origem.

    2. Use a GetPartitionsAPI para recuperar a definição da partição da tabela de origem.

    3. Use a CreateTableAPI para criar uma tabela de backup no catálogo de dados.

    4. Use a BatchCreatePartitionAPI CreatePartitionor para registrar partições na tabela de backup no Catálogo de Dados.

  3. Altere a propriedade da tabela gc.enabled Iceberg false para ativar as operações completas da tabela.

  4. Descarte a tabela original.

  5. Localize o arquivo JSON de metadados da tabela Iceberg na pasta de metadados da localização raiz da tabela.

  6. Registre a nova tabela no Catálogo de Dados usando o procedimento register_table com o nome da tabela original e a localização do metadata.json arquivo que foi criado pelo procedimento: snapshot

    spark.sql(f""" CALL system.register_table( table => 'mydb.products', metadata_file => '{iceberg_metadata_file}' ) """ ).show(truncate=False)

Mantendo as tabelas do Iceberg sincronizadas após a migração local

O add_files procedimento fornece uma maneira flexível de incorporar dados existentes às tabelas do Iceberg. Especificamente, ele registra arquivos de dados existentes (como arquivos Parquet) referenciando seus caminhos absolutos na camada de metadados do Iceberg. Por padrão, o procedimento adiciona arquivos de todas as partições da tabela a uma tabela Iceberg, mas você pode adicionar seletivamente arquivos de partições específicas. Essa abordagem seletiva é particularmente útil em vários cenários:

  • Quando novas partições são adicionadas à tabela de origem após a migração inicial.

  • Quando os arquivos de dados são adicionados ou removidos das partições existentes após a migração inicial. No entanto, adicionar novamente partições modificadas exige primeiro a exclusão da partição. Mais informações sobre isso são fornecidas posteriormente nesta seção.

Aqui estão algumas considerações sobre o uso do add_file procedimento após a realização da migração local (snapshotoumigrate), para manter a nova tabela Iceberg sincronizada com os arquivos de dados de origem:

  • Quando novos dados forem adicionados a novas partições na tabela de origem, use o add_files procedimento com a partition_filter opção de incorporar seletivamente essas adições na tabela Iceberg:

    spark.sql(f""" CALL system.add_files( source_table => 'mydb.products', table => 'mydb.products_iceberg', partition_filter => map('category', 'electronics') ).show(truncate=False)

    ou:

    spark.sql(f""" CALL system.add_files( source_table => '`parquet`.`s3://amzn-s3-demo-bucket/products/`', table => 'mydb.products_iceberg', partition_filter => map('category', 'electronics') ).show(truncate=False)
  • O add_files procedimento verifica arquivos em toda a tabela de origem ou em partições específicas quando você especifica a partition_filter opção e tenta adicionar todos os arquivos encontrados à tabela Iceberg. Por padrão, a propriedade do check_duplicate_files procedimento é definida comotrue, o que impede a execução do procedimento se os arquivos já existirem na tabela Iceberg. Isso é importante porque não há uma opção integrada para ignorar arquivos adicionados anteriormente, e a desativação check_duplicate_files fará com que os arquivos sejam adicionados duas vezes, criando duplicatas. Quando novos arquivos forem adicionados à tabela de origem, siga estas etapas:

    1. Para novas partições, use add_files com a partition_filter para importar somente arquivos da nova partição.

    2. Para partições existentes, primeiro exclua a partição da tabela Iceberg e, em seguida, execute novamente add_files essa partição, especificando o. partition_filter Por exemplo:

      # We initially perform in-place migration with snapshot spark.sql(f""" CALL system.snapshot( source_table => 'mydb.products', table => 'mydb.products_iceberg', location => 's3://amzn-s3-demo-bucket/products_iceberg/' ) """ ).show(truncate=False) # Then on the source table, some new files were generated under the category='electronics' partition. Example: spark.sql(""" INSERT INTO mydb.products VALUES (1003, 'Tablet', 'electronics') """) # We delete the modified partition from the Iceberg table. Note this is a metadata operation only spark.sql(""" DELETE FROM mydb.products_iceberg WHERE category = 'electronics' """) # We add_files from the modified partition spark.sql(""" CALL system.add_files( source_table => 'mydb.products', table => 'mydb.products_iceberg', partition_filter => map('category', 'electronics') ) """).show(truncate=False)
nota

Cada add_files operação gera um novo instantâneo da tabela Iceberg com dados anexados.

Escolhendo a estratégia certa de migração local

Para escolher a melhor estratégia de migração local, considere as perguntas na tabela a seguir.

Pergunta

Recomendação

Explicação

Você quer migrar rapidamente sem reescrever dados e, ao mesmo tempo, manter as tabelas Hive e Iceberg acessíveis para testes ou transições graduais?

snapshotprocedimento seguido pelo add_files procedimento

Use o snapshot procedimento para criar uma nova tabela Iceberg clonando o esquema e referenciando arquivos de dados, sem modificar a tabela de origem. Use o add_files procedimento para incorporar partições que foram adicionadas ou modificadas após a migração, observando que a readição de partições modificadas exige primeiro a exclusão da partição.

Você está usando o Hive Metastore e deseja substituir sua tabela do Hive por uma tabela Iceberg imediatamente, sem reescrever os dados?

migrateprocedimento seguido pelo add_files procedimento

Use o migrate procedimento para criar uma tabela Iceberg, fazer backup da tabela de origem e substituir a tabela original pela versão Iceberg.

Observação: essa opção é compatível com o Hive Metastore, mas não com o. AWS Glue Data Catalog

Use o add_files procedimento para incorporar partições que foram adicionadas ou modificadas após a migração, observando que a readição de partições modificadas exige primeiro a exclusão da partição.

Você está usando AWS Glue Data Catalog e deseja substituir sua tabela Hive por uma tabela Iceberg imediatamente, sem reescrever os dados?

Adaptação do migrate procedimento, seguida pelo add_files procedimento

Replique o comportamento migrate do procedimento:

  1. Use snapshot para criar uma tabela Iceberg.

  2. Faça backup dos metadados da tabela original usando AWS Glue APIs.

  3. Ative gc.enabled as propriedades da tabela Iceberg.

  4. Descarte a tabela original.

  5. Use register_table para criar uma nova entrada de tabela com o nome original.

Observação: essa opção exige o tratamento manual de chamadas de AWS Glue API para backup de metadados.

Use o add_files procedimento para incorporar partições que foram adicionadas ou modificadas após a migração, observando que a readição de partições modificadas exige primeiro a exclusão da partição.