Migrazione sul posto - AWS Guida prescrittiva

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Migrazione sul posto

La migrazione sul posto elimina la necessità di riscrivere tutti i file di dati. I file di metadati Iceberg vengono invece generati e collegati ai file di dati esistenti. Questo metodo è in genere più veloce ed economico, soprattutto per set di dati o tabelle di grandi dimensioni con formati di file compatibili come Parquet, Avro e ORC.

Nota

La migrazione sul posto non può essere utilizzata durante la migrazione a Amazon S3 Tables.

Iceberg offre due opzioni principali per implementare la migrazione sul posto:

Dopo aver eseguito la migrazione sul posto utilizzando uno dei due snapshot omigrate, alcuni file di dati potrebbero rimanere non migrati. Ciò accade in genere quando gli autori continuano a scrivere nella tabella di origine durante o dopo la migrazione. Per incorporare questi file rimanenti nella tabella Iceberg, è possibile utilizzare la procedura add_files. Per ulteriori informazioni, consulta Aggiungi file nella documentazione di Iceberg.

Supponiamo che tu abbia una products tabella basata su Parquet che è stata creata e popolata in Athena come segue:

CREATE EXTERNAL TABLE mydb.products ( product_id INT, product_name STRING ) PARTITIONED BY (category STRING) STORED AS PARQUET LOCATION 's3://amzn-s3-demo-bucket/products/'; INSERT INTO mydb.products VALUES (1001, 'Smartphone', 'electronics'), (1002, 'Laptop', 'electronics'), (2001, 'T-Shirt', 'clothing'), (2002, 'Jeans', 'clothing');

Le sezioni seguenti spiegano come utilizzare le procedure snapshot and migrate con questa tabella.

Opzione 1: procedura di istantanea

La snapshot procedura crea una nuova tabella Iceberg con un nome diverso ma replica lo schema e il partizionamento della tabella di origine. Questa operazione lascia la tabella di origine completamente invariata sia durante che dopo l'azione. Crea in modo efficace una copia leggera della tabella, che è particolarmente utile per testare scenari o esplorare i dati senza rischiare di modificare la fonte di dati originale. Questo approccio consente un periodo di transizione in cui sia la tabella originale che la tabella Iceberg rimangono disponibili (vedi le note alla fine di questa sezione). Una volta completato il test, puoi portare la tua nuova tabella Iceberg in produzione trasferendo tutti gli autori e i lettori alla nuova tabella.

Puoi eseguire la snapshot procedura utilizzando Spark in qualsiasi modello di distribuzione Amazon EMR (ad esempio, Amazon EMR on, Amazon EMR su EKS EC2, EMR Serverless) e. AWS Glue

Per testare la migrazione sul posto con la procedura Spark, segui questi passaggisnapshot:

  1. Avvia un'applicazione Spark e configura la sessione Spark con le seguenti impostazioni:

    • "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"

    • "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog"

    • "spark.sql.catalog.spark_catalog.type":"glue"

    • "spark.hadoop.hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"

  2. Esegui la snapshot procedura per creare una nuova tabella Iceberg che punti ai file di dati della tabella originale:

    spark.sql(f""" CALL system.snapshot( source_table => 'mydb.products', table => 'mydb.products_iceberg', location => 's3://amzn-s3-demo-bucket/products_iceberg/' ) """ ).show(truncate=False)

    Il dataframe di output contiene imported_files_count (il numero di file che sono stati aggiunti).

  3. Convalida la nuova tabella interrogandola:

    spark.sql(f""" SELECT * FROM mydb.products_iceberg LIMIT 10 """ ).show(truncate=False)
Note
  • Dopo aver eseguito la procedura, qualsiasi modifica del file di dati sulla tabella di origine non sincronizzerà la tabella generata. I nuovi file aggiunti non saranno visibili nella tabella Iceberg e i file rimossi influiranno sulle funzionalità di interrogazione nella tabella Iceberg. Per evitare problemi di sincronizzazione:

    • Se la nuova tabella Iceberg è destinata all'uso in produzione, interrompi tutti i processi che scrivono sulla tabella originale e reindirizzali alla nuova tabella.

    • Se hai bisogno di un periodo di transizione o se la nuova tabella Iceberg è a scopo di test, consulta Mantenere le tabelle Iceberg sincronizzate dopo la migrazione sul posto più avanti in questa sezione per indicazioni su come mantenere la sincronizzazione delle tabelle.

  • Quando si utilizza la snapshot procedura, la gc.enabled proprietà viene impostata sulle proprietà della tabella Iceberg creata. true Questa impostazione proibisce azioni comeexpire_snapshots, o DROP TABLE con l'PURGEopzioneremove_orphan_files, che eliminerebbero fisicamente i file di dati. Le operazioni di eliminazione o unione di Iceberg, che non hanno un impatto diretto sui file sorgente, sono ancora consentite. 

  • Per rendere la tua nuova tabella Iceberg completamente funzionante, senza limiti alle azioni che eliminano fisicamente i file di dati, puoi modificare la proprietà della gc.enabled tabella in. false Tuttavia, questa impostazione consentirà azioni che influiscono sui file di dati di origine, che potrebbero compromettere l'accesso alla tabella originale. Pertanto, modificate la gc.enabled proprietà solo se non è più necessario mantenere la funzionalità della tabella originale. Per esempio:

    spark.sql(f""" ALTER TABLE mydb.products_iceberg SET TBLPROPERTIES ('gc.enabled' = 'false'); """)

Opzione 2: procedura di migrazione

La migrate procedura crea una nuova tabella Iceberg con lo stesso nome, schema e partizionamento della tabella di origine. Quando viene eseguita, questa procedura blocca la tabella di origine e la rinomina in <table_name>_BACKUP_ (o in un nome personalizzato specificato dal parametro della procedura). backup_table_name

Nota

Se si imposta il parametro della drop_backup procedura sutrue, la tabella originale non verrà conservata come backup.

Di conseguenza, la procedura della migrate tabella richiede che tutte le modifiche che influiscono sulla tabella di origine vengano interrotte prima dell'esecuzione dell'azione. Prima di eseguire la migrate procedura:

  • Interrompi tutti gli scrittori che interagiscono con la tabella dei sorgenti.

  • Modifica lettori e scrittori che non supportano nativamente Iceberg per abilitare il supporto Iceberg.

Per esempio:

  • Athena continua a lavorare senza modifiche.

  • Spark richiede:

    • File Iceberg Java Archive (JAR) da includere nel classpath (consulta le sezioni Lavorare con Iceberg in Amazon EMR e Lavorare con Iceberg AWS Glue nelle sezioni precedenti di questa guida).

    • Le seguenti configurazioni del catalogo delle sessioni Spark (utilizzate SparkSessionCatalog per aggiungere il supporto Iceberg mantenendo le funzionalità di catalogo integrate per le tabelle non Iceberg):

      • "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"

      • "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog"

      • "spark.sql.catalog.spark_catalog.type":"glue"

      • "spark.hadoop.hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"

Dopo aver eseguito la procedura, potete riavviare i vostri masterizzatori con la nuova configurazione di Iceberg.

Attualmente, la migrate procedura non è compatibile con AWS Glue Data Catalog, poiché il Data Catalog non supporta l'RENAMEoperazione. Pertanto, ti consigliamo di utilizzare questa procedura solo quando lavori con Hive Metastore. Se utilizzi il Data Catalog, consulta la sezione successiva per un approccio alternativo.

Puoi eseguire la migrate procedura su tutti i modelli di distribuzione di Amazon EMR (Amazon EMR su, Amazon EMR su EKS, EMR Serverless) AWS Glue e, ma richiede una connessione configurata a Hive Metastore. EC2 Amazon EMR on EC2 è la scelta consigliata perché fornisce una configurazione Hive Metastore integrata, che riduce al minimo la complessità della configurazione.

Per testare la migrazione sul posto con la procedura migrate Spark da un Amazon EMR su un EC2 cluster configurato con Hive Metastore, segui questi passaggi:

  1. Avvia un'applicazione Spark e configura la sessione Spark per utilizzare l'implementazione del catalogo Iceberg Hive. Ad esempio, se utilizzi la pyspark CLI:

    pyspark --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hive
  2. Crea una products tabella in Hive Metastore. Questa è la tabella di origine, che esiste già in una migrazione tipica.

    1. Crea la tabella Hive products esterna in Hive Metastore per puntare ai dati esistenti in Amazon S3:

      spark.sql(f""" CREATE EXTERNAL TABLE products ( product_id INT, product_name STRING ) PARTITIONED BY (category STRING) STORED AS PARQUET LOCATION 's3://amzn-s3-demo-bucket/products/'; """ )
    2. Aggiungi le partizioni esistenti utilizzando il comando: MSCK REPAIR TABLE

      spark.sql(f""" MSCK REPAIR TABLE products """ )
    3. Verifica che la tabella contenga dati eseguendo una SELECT query:

      spark.sql(f""" SELECT * FROM products """ ).show(truncate=False)

      Output di esempio:

      Esempio di output derivante dalla convalida dei dati durante la migrazione della tabella Iceberg.
  3. Usa la procedura Iceberg: migrate

    df_res=spark.sql(f""" CALL system.migrate( table => 'default.products' ) """ ) df_res.show()

    Il dataframe di output contiene migrated_files_count (il numero di file che sono stati aggiunti alla tabella Iceberg):

    Esempio di output derivante dalla convalida del conteggio dei file durante la migrazione della tabella Iceberg.
  4. Conferma che la tabella di backup è stata creata:

    spark.sql("show tables").show()

    Output di esempio:

    Esempio di output della convalida del backup durante la migrazione della tabella Iceberg.
  5. Convalida l'operazione interrogando la tabella Iceberg:

    spark.sql(f""" SELECT * FROM products """ ).show(truncate=False)
Note
  • Dopo aver eseguito la procedura, tutti i processi correnti che eseguono query o scrivono nella tabella di origine ne risentiranno se non sono configurati correttamente con il supporto Iceberg. Pertanto, ti consigliamo di seguire questi passaggi:

    1. Arresta tutti i processi utilizzando la tabella di origine prima della migrazione.

    2. Eseguire la migrazione.

    3. Riattiva i processi utilizzando le impostazioni Iceberg appropriate.

  • Se durante il processo di migrazione si verificano modifiche ai file di dati (vengono aggiunti nuovi file o rimossi), la tabella generata non sarà sincronizzata. Per le opzioni di sincronizzazione, vedi Mantenere sincronizzate le tabelle Iceberg dopo la migrazione sul posto più avanti in questa sezione.

Replica della procedura di migrazione delle tabelle in AWS Glue Data Catalog

È possibile replicare il risultato della procedura di migrazione in AWS Glue Data Catalog (eseguire il backup della tabella originale e sostituirla con una tabella Iceberg) seguendo questi passaggi:

  1. Utilizzate la procedura snapshot per creare una nuova tabella Iceberg che punti ai file di dati della tabella originale.

  2. Esegui il backup dei metadati della tabella originale nel Data Catalog:

    1. Utilizza l'GetTableAPI per recuperare la definizione della tabella di origine.

    2. Utilizza l'GetPartitionsAPI per recuperare la definizione della partizione della tabella di origine.

    3. Utilizza l'CreateTableAPI per creare una tabella di backup nel Data Catalog.

    4. Utilizza l'BatchCreatePartitionAPI CreatePartitiono per registrare le partizioni nella tabella di backup nel Data Catalog.

  3. Modifica la proprietà della tabella gc.enabled Iceberg per false abilitare le operazioni complete della tabella.

  4. Rilasciare la tabella originale.

  5. Individua il file JSON dei metadati della tabella Iceberg nella cartella dei metadati della posizione principale della tabella.

  6. Registra la nuova tabella nel Data Catalog utilizzando la procedura register_table con il nome della tabella originale e la posizione del metadata.json file creato dalla procedura: snapshot

    spark.sql(f""" CALL system.register_table( table => 'mydb.products', metadata_file => '{iceberg_metadata_file}' ) """ ).show(truncate=False)

Mantenere sincronizzate le tabelle Iceberg dopo la migrazione sul posto

La add_files procedura fornisce un modo flessibile per incorporare i dati esistenti nelle tabelle Iceberg. In particolare, registra i file di dati esistenti (come i file Parquet) facendo riferimento ai loro percorsi assoluti nel livello di metadati di Iceberg. Per impostazione predefinita, la procedura aggiunge file da tutte le partizioni di tabella a una tabella Iceberg, ma è possibile aggiungere selettivamente file da partizioni specifiche. Questo approccio selettivo è particolarmente utile in diversi scenari:

  • Quando vengono aggiunte nuove partizioni alla tabella di origine dopo la migrazione iniziale.

  • Quando i file di dati vengono aggiunti o rimossi da partizioni esistenti dopo la migrazione iniziale. Tuttavia, la riaggiunta di partizioni modificate richiede prima l'eliminazione delle partizioni. Ulteriori informazioni al riguardo sono fornite più avanti in questa sezione.

Ecco alcune considerazioni sull'utilizzo della add_file procedura dopo l'esecuzione della migrazione sul posto (snapshotomigrate), per mantenere la nuova tabella Iceberg sincronizzata con i file di dati di origine:

  • Quando vengono aggiunti nuovi dati a nuove partizioni nella tabella di origine, utilizzate la add_files procedura con l'partition_filteropzione per incorporare selettivamente queste aggiunte nella tabella Iceberg:

    spark.sql(f""" CALL system.add_files( source_table => 'mydb.products', table => 'mydb.products_iceberg', partition_filter => map('category', 'electronics') ).show(truncate=False)

    oppure:

    spark.sql(f""" CALL system.add_files( source_table => '`parquet`.`s3://amzn-s3-demo-bucket/products/`', table => 'mydb.products_iceberg', partition_filter => map('category', 'electronics') ).show(truncate=False)
  • La add_files procedura cerca i file nell'intera tabella di origine o in partizioni specifiche quando specificate l'partition_filteropzione e tenta di aggiungere tutti i file trovati alla tabella Iceberg. Per impostazione predefinita, la proprietà della check_duplicate_files procedura è impostata sutrue, il che impedisce l'esecuzione della procedura se i file esistono già nella tabella Iceberg. Questo è importante perché non esiste un'opzione integrata per ignorare i file aggiunti in precedenza e la disattivazione check_duplicate_files comporterà l'aggiunta di due volte dei file, con la conseguente creazione di duplicati. Quando vengono aggiunti nuovi file alla tabella di origine, procedi nel seguente modo:

    1. Per le nuove partizioni, usa add_files with partition_filter a per importare solo i file dalla nuova partizione.

    2. Per le partizioni esistenti, eliminate prima la partizione dalla tabella Iceberg, quindi eseguitela nuovamente add_files per quella partizione, specificando. partition_filter Per esempio:

      # We initially perform in-place migration with snapshot spark.sql(f""" CALL system.snapshot( source_table => 'mydb.products', table => 'mydb.products_iceberg', location => 's3://amzn-s3-demo-bucket/products_iceberg/' ) """ ).show(truncate=False) # Then on the source table, some new files were generated under the category='electronics' partition. Example: spark.sql(""" INSERT INTO mydb.products VALUES (1003, 'Tablet', 'electronics') """) # We delete the modified partition from the Iceberg table. Note this is a metadata operation only spark.sql(""" DELETE FROM mydb.products_iceberg WHERE category = 'electronics' """) # We add_files from the modified partition spark.sql(""" CALL system.add_files( source_table => 'mydb.products', table => 'mydb.products_iceberg', partition_filter => map('category', 'electronics') ) """).show(truncate=False)
Nota

Ogni add_files operazione genera una nuova istantanea della tabella Iceberg con dati aggiunti.

Scelta della giusta strategia di migrazione in atto

Per scegliere la migliore strategia di migrazione sul posto, considera le domande nella tabella seguente.

Domanda

Raccomandazione

Spiegazione

Vuoi migrare rapidamente senza riscrivere i dati mantenendo entrambe le tabelle Hive e Iceberg accessibili per test o transizioni graduali?

snapshotprocedura seguita dalla procedura add_files

Utilizzate la snapshot procedura per creare una nuova tabella Iceberg clonando lo schema e facendo riferimento ai file di dati, senza modificare la tabella di origine. Utilizzate la add_files procedura per incorporare le partizioni che sono state aggiunte o modificate dopo la migrazione, tenendo presente che la riaggiunta di partizioni modificate richiede prima l'eliminazione delle partizioni.

Stai usando Hive Metastore e vuoi sostituire immediatamente la tua tabella Hive con una tabella Iceberg, senza riscrivere i dati?

migrateadd_filesprocedura seguita da procedura

Utilizzare la migrate procedura per creare una tabella Iceberg, eseguire il backup della tabella di origine e sostituire la tabella originale con la versione Iceberg.

Nota: questa opzione è compatibile con Hive Metastore ma non con. AWS Glue Data Catalog

Utilizzate la add_files procedura per incorporare le partizioni che sono state aggiunte o modificate dopo la migrazione, tenendo presente che la riaggiunta di partizioni modificate richiede prima l'eliminazione delle partizioni.

Stai usando AWS Glue Data Catalog e vuoi sostituire immediatamente la tua tabella Hive con una tabella Iceberg, senza riscrivere i dati?

Adattamento della migrate procedura, seguito dalla procedura add_files

Replica il comportamento migrate della procedura:

  1. Usa snapshot per creare una tabella Iceberg.

  2. Esegui il backup dei metadati della tabella originale utilizzando. AWS Glue APIs

  3. Abilita le gc.enabled proprietà della tabella Iceberg.

  4. Rilasciare la tabella originale.

  5. Si usa register_table per creare una nuova voce della tabella con il nome originale.

Nota: questa opzione richiede la gestione manuale delle chiamate AWS Glue API per il backup dei metadati.

Utilizzate la add_files procedura per incorporare le partizioni che sono state aggiunte o modificate dopo la migrazione, tenendo presente che la riaggiunta di partizioni modificate richiede prima l'eliminazione delle partizioni.