Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Migrazione sul posto
La migrazione sul posto elimina la necessità di riscrivere tutti i file di dati. I file di metadati Iceberg vengono invece generati e collegati ai file di dati esistenti. Questo metodo è in genere più veloce ed economico, soprattutto per set di dati o tabelle di grandi dimensioni con formati di file compatibili come Parquet, Avro e ORC.
Nota
La migrazione sul posto non può essere utilizzata durante la migrazione a Amazon S3 Tables.
Iceberg offre due opzioni principali per implementare la migrazione sul posto:
-
Utilizzo della procedura snapshot
per creare una nuova tabella Iceberg mantenendo invariata la tabella di origine. Per ulteriori informazioni, consulta Snapshot Table nella documentazione di Iceberg. -
Utilizzo della procedura di migrazione
per creare una nuova tabella Iceberg in sostituzione della tabella di origine. Per ulteriori informazioni, consulta Migrate Table nella documentazione di Iceberg. Sebbene questa procedura funzioni con Hive Metastore (HMS), attualmente non è compatibile con. AWS Glue Data Catalog La procedura di replica della migrazione delle tabelle riportata più AWS Glue Data Catalog avanti in questa guida fornisce una soluzione alternativa per ottenere un risultato simile con Data Catalog.
Dopo aver eseguito la migrazione sul posto utilizzando uno dei due snapshot
omigrate
, alcuni file di dati potrebbero rimanere non migrati. Ciò accade in genere quando gli autori continuano a scrivere nella tabella di origine durante o dopo la migrazione. Per incorporare questi file rimanenti nella tabella Iceberg, è possibile utilizzare la procedura add_files
Supponiamo che tu abbia una products
tabella basata su Parquet che è stata creata e popolata in Athena come segue:
CREATE EXTERNAL TABLE mydb.products ( product_id INT, product_name STRING ) PARTITIONED BY (category STRING) STORED AS PARQUET LOCATION 's3://amzn-s3-demo-bucket/products/'; INSERT INTO mydb.products VALUES (1001, 'Smartphone', 'electronics'), (1002, 'Laptop', 'electronics'), (2001, 'T-Shirt', 'clothing'), (2002, 'Jeans', 'clothing');
Le sezioni seguenti spiegano come utilizzare le procedure snapshot
and migrate
con questa tabella.
Opzione 1: procedura di istantanea
La snapshot
procedura crea una nuova tabella Iceberg con un nome diverso ma replica lo schema e il partizionamento della tabella di origine. Questa operazione lascia la tabella di origine completamente invariata sia durante che dopo l'azione. Crea in modo efficace una copia leggera della tabella, che è particolarmente utile per testare scenari o esplorare i dati senza rischiare di modificare la fonte di dati originale. Questo approccio consente un periodo di transizione in cui sia la tabella originale che la tabella Iceberg rimangono disponibili (vedi le note alla fine di questa sezione). Una volta completato il test, puoi portare la tua nuova tabella Iceberg in produzione trasferendo tutti gli autori e i lettori alla nuova tabella.
Puoi eseguire la snapshot
procedura utilizzando Spark in qualsiasi modello di distribuzione Amazon EMR (ad esempio, Amazon EMR on, Amazon EMR su EKS EC2, EMR Serverless) e. AWS Glue
Per testare la migrazione sul posto con la procedura Spark, segui questi passaggisnapshot
:
-
Avvia un'applicazione Spark e configura la sessione Spark con le seguenti impostazioni:
-
"spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
-
"spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog"
-
"spark.sql.catalog.spark_catalog.type":"glue"
-
"spark.hadoop.hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
-
-
Esegui la
snapshot
procedura per creare una nuova tabella Iceberg che punti ai file di dati della tabella originale:spark.sql(f""" CALL system.snapshot( source_table => 'mydb.products', table => 'mydb.products_iceberg', location => 's3://amzn-s3-demo-bucket/products_iceberg/' ) """ ).show(truncate=False)
Il dataframe di output contiene
imported_files_count
(il numero di file che sono stati aggiunti). -
Convalida la nuova tabella interrogandola:
spark.sql(f""" SELECT * FROM mydb.products_iceberg LIMIT 10 """ ).show(truncate=False)
Note
-
Dopo aver eseguito la procedura, qualsiasi modifica del file di dati sulla tabella di origine non sincronizzerà la tabella generata. I nuovi file aggiunti non saranno visibili nella tabella Iceberg e i file rimossi influiranno sulle funzionalità di interrogazione nella tabella Iceberg. Per evitare problemi di sincronizzazione:
-
Se la nuova tabella Iceberg è destinata all'uso in produzione, interrompi tutti i processi che scrivono sulla tabella originale e reindirizzali alla nuova tabella.
-
Se hai bisogno di un periodo di transizione o se la nuova tabella Iceberg è a scopo di test, consulta Mantenere le tabelle Iceberg sincronizzate dopo la migrazione sul posto più avanti in questa sezione per indicazioni su come mantenere la sincronizzazione delle tabelle.
-
-
Quando si utilizza la
snapshot
procedura, lagc.enabled
proprietà viene impostata sulle proprietà della tabella Iceberg creata.true
Questa impostazione proibisce azioni comeexpire_snapshots
, oDROP TABLE
con l'PURGE
opzioneremove_orphan_files
, che eliminerebbero fisicamente i file di dati. Le operazioni di eliminazione o unione di Iceberg, che non hanno un impatto diretto sui file sorgente, sono ancora consentite. -
Per rendere la tua nuova tabella Iceberg completamente funzionante, senza limiti alle azioni che eliminano fisicamente i file di dati, puoi modificare la proprietà della
gc.enabled
tabella in.false
Tuttavia, questa impostazione consentirà azioni che influiscono sui file di dati di origine, che potrebbero compromettere l'accesso alla tabella originale. Pertanto, modificate lagc.enabled
proprietà solo se non è più necessario mantenere la funzionalità della tabella originale. Per esempio:spark.sql(f""" ALTER TABLE mydb.products_iceberg SET TBLPROPERTIES ('gc.enabled' = 'false'); """)
Opzione 2: procedura di migrazione
La migrate
procedura crea una nuova tabella Iceberg con lo stesso nome, schema e partizionamento della tabella di origine. Quando viene eseguita, questa procedura blocca la tabella di origine e la rinomina in <table_name>_BACKUP_
(o in un nome personalizzato specificato dal parametro della procedura). backup_table_name
Nota
Se si imposta il parametro della drop_backup
procedura sutrue
, la tabella originale non verrà conservata come backup.
Di conseguenza, la procedura della migrate
tabella richiede che tutte le modifiche che influiscono sulla tabella di origine vengano interrotte prima dell'esecuzione dell'azione. Prima di eseguire la migrate
procedura:
-
Interrompi tutti gli scrittori che interagiscono con la tabella dei sorgenti.
-
Modifica lettori e scrittori che non supportano nativamente Iceberg per abilitare il supporto Iceberg.
Per esempio:
-
Athena continua a lavorare senza modifiche.
-
Spark richiede:
-
File Iceberg Java Archive (JAR) da includere nel classpath (consulta le sezioni Lavorare con Iceberg in Amazon EMR e Lavorare con Iceberg AWS Glue nelle sezioni precedenti di questa guida).
-
Le seguenti configurazioni del catalogo delle sessioni Spark (utilizzate
SparkSessionCatalog
per aggiungere il supporto Iceberg mantenendo le funzionalità di catalogo integrate per le tabelle non Iceberg):-
"spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
-
"spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog"
-
"spark.sql.catalog.spark_catalog.type":"glue"
-
"spark.hadoop.hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
-
-
Dopo aver eseguito la procedura, potete riavviare i vostri masterizzatori con la nuova configurazione di Iceberg.
Attualmente, la migrate
procedura non è compatibile con AWS Glue Data Catalog, poiché il Data Catalog non supporta l'RENAME
operazione. Pertanto, ti consigliamo di utilizzare questa procedura solo quando lavori con Hive Metastore. Se utilizzi il Data Catalog, consulta la sezione successiva per un approccio alternativo.
Puoi eseguire la migrate
procedura su tutti i modelli di distribuzione di Amazon EMR (Amazon EMR su, Amazon EMR su EKS, EMR Serverless) AWS Glue e, ma richiede una connessione configurata a Hive Metastore. EC2 Amazon EMR on EC2 è la scelta consigliata perché fornisce una configurazione Hive Metastore integrata, che riduce al minimo la complessità della configurazione.
Per testare la migrazione sul posto con la procedura migrate
Spark da un Amazon EMR su un EC2 cluster configurato con Hive Metastore, segui questi passaggi:
-
Avvia un'applicazione Spark e configura la sessione Spark per utilizzare l'implementazione del catalogo Iceberg Hive. Ad esempio, se utilizzi la
pyspark
CLI:pyspark --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hive
-
Crea una
products
tabella in Hive Metastore. Questa è la tabella di origine, che esiste già in una migrazione tipica.-
Crea la tabella Hive
products
esterna in Hive Metastore per puntare ai dati esistenti in Amazon S3:spark.sql(f""" CREATE EXTERNAL TABLE products ( product_id INT, product_name STRING ) PARTITIONED BY (category STRING) STORED AS PARQUET LOCATION 's3://amzn-s3-demo-bucket/products/'; """ )
-
Aggiungi le partizioni esistenti utilizzando il comando:
MSCK REPAIR TABLE
spark.sql(f""" MSCK REPAIR TABLE products """ )
-
Verifica che la tabella contenga dati eseguendo una
SELECT
query:spark.sql(f""" SELECT * FROM products """ ).show(truncate=False)
Output di esempio:
-
-
Usa la procedura Iceberg:
migrate
df_res=spark.sql(f""" CALL system.migrate( table => 'default.products' ) """ ) df_res.show()
Il dataframe di output contiene
migrated_files_count
(il numero di file che sono stati aggiunti alla tabella Iceberg): -
Conferma che la tabella di backup è stata creata:
spark.sql("show tables").show()
Output di esempio:
-
Convalida l'operazione interrogando la tabella Iceberg:
spark.sql(f""" SELECT * FROM products """ ).show(truncate=False)
Note
-
Dopo aver eseguito la procedura, tutti i processi correnti che eseguono query o scrivono nella tabella di origine ne risentiranno se non sono configurati correttamente con il supporto Iceberg. Pertanto, ti consigliamo di seguire questi passaggi:
-
Arresta tutti i processi utilizzando la tabella di origine prima della migrazione.
-
Eseguire la migrazione.
-
Riattiva i processi utilizzando le impostazioni Iceberg appropriate.
-
-
Se durante il processo di migrazione si verificano modifiche ai file di dati (vengono aggiunti nuovi file o rimossi), la tabella generata non sarà sincronizzata. Per le opzioni di sincronizzazione, vedi Mantenere sincronizzate le tabelle Iceberg dopo la migrazione sul posto più avanti in questa sezione.
Replica della procedura di migrazione delle tabelle in AWS Glue Data Catalog
È possibile replicare il risultato della procedura di migrazione in AWS Glue Data Catalog (eseguire il backup della tabella originale e sostituirla con una tabella Iceberg) seguendo questi passaggi:
-
Utilizzate la procedura snapshot per creare una nuova tabella Iceberg che punti ai file di dati della tabella originale.
-
Esegui il backup dei metadati della tabella originale nel Data Catalog:
-
Utilizza l'GetTableAPI per recuperare la definizione della tabella di origine.
-
Utilizza l'GetPartitionsAPI per recuperare la definizione della partizione della tabella di origine.
-
Utilizza l'CreateTableAPI per creare una tabella di backup nel Data Catalog.
-
Utilizza l'BatchCreatePartitionAPI CreatePartitiono per registrare le partizioni nella tabella di backup nel Data Catalog.
-
-
Modifica la proprietà della tabella
gc.enabled
Iceberg perfalse
abilitare le operazioni complete della tabella. -
Rilasciare la tabella originale.
-
Individua il file JSON dei metadati della tabella Iceberg nella cartella dei metadati della posizione principale della tabella.
-
Registra la nuova tabella nel Data Catalog utilizzando la procedura register_table
con il nome della tabella originale e la posizione del metadata.json
file creato dalla procedura:snapshot
spark.sql(f""" CALL system.register_table( table => 'mydb.products', metadata_file => '{iceberg_metadata_file}' ) """ ).show(truncate=False)
Mantenere sincronizzate le tabelle Iceberg dopo la migrazione sul posto
La add_files
procedura fornisce un modo flessibile per incorporare i dati esistenti nelle tabelle Iceberg. In particolare, registra i file di dati esistenti (come i file Parquet) facendo riferimento ai loro percorsi assoluti nel livello di metadati di Iceberg. Per impostazione predefinita, la procedura aggiunge file da tutte le partizioni di tabella a una tabella Iceberg, ma è possibile aggiungere selettivamente file da partizioni specifiche. Questo approccio selettivo è particolarmente utile in diversi scenari:
-
Quando vengono aggiunte nuove partizioni alla tabella di origine dopo la migrazione iniziale.
-
Quando i file di dati vengono aggiunti o rimossi da partizioni esistenti dopo la migrazione iniziale. Tuttavia, la riaggiunta di partizioni modificate richiede prima l'eliminazione delle partizioni. Ulteriori informazioni al riguardo sono fornite più avanti in questa sezione.
Ecco alcune considerazioni sull'utilizzo della add_file
procedura dopo l'esecuzione della migrazione sul posto (snapshot
omigrate
), per mantenere la nuova tabella Iceberg sincronizzata con i file di dati di origine:
-
Quando vengono aggiunti nuovi dati a nuove partizioni nella tabella di origine, utilizzate la
add_files
procedura con l'partition_filter
opzione per incorporare selettivamente queste aggiunte nella tabella Iceberg:spark.sql(f""" CALL system.add_files( source_table => 'mydb.products', table => 'mydb.products_iceberg', partition_filter => map('category', 'electronics') ).show(truncate=False)
oppure:
spark.sql(f""" CALL system.add_files( source_table => '`parquet`.`s3://amzn-s3-demo-bucket/products/`', table => 'mydb.products_iceberg', partition_filter => map('category', 'electronics') ).show(truncate=False)
-
La
add_files
procedura cerca i file nell'intera tabella di origine o in partizioni specifiche quando specificate l'partition_filter
opzione e tenta di aggiungere tutti i file trovati alla tabella Iceberg. Per impostazione predefinita, la proprietà dellacheck_duplicate_files
procedura è impostata sutrue
, il che impedisce l'esecuzione della procedura se i file esistono già nella tabella Iceberg. Questo è importante perché non esiste un'opzione integrata per ignorare i file aggiunti in precedenza e la disattivazionecheck_duplicate_files
comporterà l'aggiunta di due volte dei file, con la conseguente creazione di duplicati. Quando vengono aggiunti nuovi file alla tabella di origine, procedi nel seguente modo:-
Per le nuove partizioni, usa
add_files
withpartition_filter
a per importare solo i file dalla nuova partizione. -
Per le partizioni esistenti, eliminate prima la partizione dalla tabella Iceberg, quindi eseguitela nuovamente
add_files
per quella partizione, specificando.partition_filter
Per esempio:# We initially perform in-place migration with snapshot spark.sql(f""" CALL system.snapshot( source_table => 'mydb.products', table => 'mydb.products_iceberg', location => 's3://amzn-s3-demo-bucket/products_iceberg/' ) """ ).show(truncate=False) # Then on the source table, some new files were generated under the category='electronics' partition. Example: spark.sql(""" INSERT INTO mydb.products VALUES (1003, 'Tablet', 'electronics') """) # We delete the modified partition from the Iceberg table. Note this is a metadata operation only spark.sql(""" DELETE FROM mydb.products_iceberg WHERE category = 'electronics' """) # We add_files from the modified partition spark.sql(""" CALL system.add_files( source_table => 'mydb.products', table => 'mydb.products_iceberg', partition_filter => map('category', 'electronics') ) """).show(truncate=False)
-
Nota
Ogni add_files
operazione genera una nuova istantanea della tabella Iceberg con dati aggiunti.
Scelta della giusta strategia di migrazione in atto
Per scegliere la migliore strategia di migrazione sul posto, considera le domande nella tabella seguente.
Domanda |
Raccomandazione |
Spiegazione |
---|---|---|
Vuoi migrare rapidamente senza riscrivere i dati mantenendo entrambe le tabelle Hive e Iceberg accessibili per test o transizioni graduali? |
|
Utilizzate la |
Stai usando Hive Metastore e vuoi sostituire immediatamente la tua tabella Hive con una tabella Iceberg, senza riscrivere i dati? |
|
Utilizzare la Nota: questa opzione è compatibile con Hive Metastore ma non con. AWS Glue Data Catalog Utilizzate la |
Stai usando AWS Glue Data Catalog e vuoi sostituire immediatamente la tua tabella Hive con una tabella Iceberg, senza riscrivere i dati? |
Adattamento della |
Replica il comportamento
Nota: questa opzione richiede la gestione manuale delle chiamate AWS Glue API per il backup dei metadati. Utilizzate la |