Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Utilisation des tables Iceberg en utilisant PyIceberg
Cette section explique comment vous pouvez interagir avec les tables Iceberg en utilisant PyIceberg
Prérequis
Note
Ces exemples utilisent le PyIceberg 1.9.1
Pour travailler avec PyIceberg, vous devez PyIceberg l' AWS SDK pour Python (Boto3) installer. Voici un exemple de la façon dont vous pouvez configurer un environnement virtuel Python pour travailler avec PyIceberg et AWS Glue Data Catalog :
-
Téléchargez PyIceberg
en utilisant le programme d'installation du package pip python . Vous avez également besoin de Boto3 pour interagir avec. Services AWS Vous pouvez configurer un environnement virtuel Python local à tester à l'aide des commandes suivantes : python3 -m venv my_env cd my_env/bin/ source activate pip install "pyiceberg[pyarrow,pandas,glue]" pip install boto3
-
Exécutez
python
pour ouvrir le shell Python et tester les commandes.
Connexion au catalogue de données
Pour commencer à utiliser les tables Iceberg dans AWS Glue, vous devez d'abord vous connecter au AWS Glue Data Catalog.
La load_catalog
fonction initialise une connexion au catalogue de données en créant un objet de catalogue
from pyiceberg.catalog import load_catalog region = "us-east-1" glue_catalog = load_catalog( 'default', **{ 'client.region': region }, type='glue' )
Lister et créer des bases de données
Pour répertorier les bases de données existantes, utilisez la list_namespaces
fonction :
databases = glue_catalog.list_namespaces() print(databases)
Pour créer une nouvelle base de données, utilisez la create_namespace
fonction :
database_name="mydb" s3_db_path=f"s3://amzn-s3-demo-bucket/{database_name}" glue_catalog.create_namespace(database_name, properties={"location": s3_db_path})
Création et écriture de tables Iceberg
Tables non partitionnées
Voici un exemple de création d'une table Iceberg non partitionnée à l'aide de la fonction : create_table
from pyiceberg.schema import Schema from pyiceberg.types import NestedField, StringType, DoubleType database_name="mydb" table_name="pyiceberg_table" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{table_name}" schema = Schema( NestedField(1, "city", StringType(), required=False), NestedField(2, "lat", DoubleType(), required=False), NestedField(3, "long", DoubleType(), required=False), ) glue_catalog.create_table(f"{database_name}.{table_name}", schema=schema, location=s3_table_path)
Vous pouvez utiliser cette list_tables
fonction pour vérifier la liste des tables d'une base de données :
tables = glue_catalog.list_tables(namespace=database_name) print(tables)
Vous pouvez utiliser la append
fonction et insérer PyArrow des données dans une table Iceberg :
import pyarrow as pa df = pa.Table.from_pylist( [ {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, {"city": "Paris", "lat": 48.864716, "long": 2.349014}, ], ) table = glue_catalog.load_table(f"{database_name}.{table_name}") table.append(df)
Tables partitionnées
Voici un exemple de création d'une table Iceberg partitionnéecreate_table
fonction et : PartitionSpec
from pyiceberg.schema import Schema from pyiceberg.types import ( NestedField, StringType, FloatType, DoubleType, TimestampType, ) # Define the schema schema = Schema( NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True), NestedField(field_id=2, name="drone_id", field_type=StringType(), required=True), NestedField(field_id=3, name="lat", field_type=DoubleType(), required=False), NestedField(field_id=4, name="lon", field_type=DoubleType(), required=False), NestedField(field_id=5, name="height", field_type=FloatType(), required=False), ) from pyiceberg.partitioning import PartitionSpec, PartitionField from pyiceberg.transforms import DayTransform partition_spec = PartitionSpec( PartitionField( source_id=1, # Refers to "datetime" field_id=1000, transform=DayTransform(), name="datetime_day" ) ) database_name="mydb" partitioned_table_name="pyiceberg_table_partitioned" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{partitioned_table_name}" glue_catalog.create_table( identifier=f"{database_name}.{partitioned_table_name}", schema=schema, location=s3_table_path, partition_spec=partition_spec )
Vous pouvez insérer des données dans une table partitionnée de la même manière que dans une table non partitionnée. Le partitionnement est géré automatiquement.
from datetime import datetime arrow_schema = pa.schema([ pa.field("datetime", pa.timestamp("us"), nullable=False), pa.field("drone_id", pa.string(), nullable=False), pa.field("lat", pa.float64()), pa.field("lon", pa.float64()), pa.field("height", pa.float32()), ]) data = [ { "datetime": datetime(2024, 6, 1, 12, 0, 0), "drone_id": "drone_001", "lat": 52.371807, "lon": 4.896029, "height": 120.5, }, { "datetime": datetime(2024, 6, 1, 12, 5, 0), "drone_id": "drone_002", "lat": 37.773972, "lon": -122.431297, "height": 150.0, }, { "datetime": datetime(2024, 6, 2, 9, 0, 0), "drone_id": "drone_001", "lat": 53.11254, "lon": 6.0989, "height": 110.2, }, { "datetime": datetime(2024, 6, 2, 9, 30, 0), "drone_id": "drone_003", "lat": 48.864716, "lon": 2.349014, "height": 145.7, }, ] df = pa.Table.from_pylist(data, schema=arrow_schema) table = glue_catalog.load_table(f"{database_name}.{partitioned_table_name}") table.append(df)
Lecture de données
Vous pouvez utiliser cette PyIceberg scan
fonction pour lire les données de vos tables Iceberg. Vous pouvez filtrer les lignes, sélectionner des colonnes spécifiques et limiter le nombre d'enregistrements renvoyés.
table= glue_catalog.load_table(f"{database_name}.{table_name}") scan_df = table.scan( row_filter=( f"city = 'Amsterdam'" ), selected_fields=("city", "lat"), limit=100, ).to_pandas() print(scan_df)
Suppression des données
La PyIceberg delete
fonction vous permet de supprimer des enregistrements de votre table en utilisant delete_filter
:
table = glue_catalog.load_table(f"{database_name}.{table_name}") table.delete(delete_filter="city == 'Paris'")
Accès aux métadonnées
PyIceberg fournit plusieurs fonctions pour accéder aux métadonnées des tables. Voici comment afficher les informations relatives aux instantanés de table :
#List of snapshots table.snapshots() #Current snapshot table.current_snapshot() #Take a previous snapshot second_last_snapshot_id=table.snapshots()[-2].snapshot_id print(f"Second last SnapshotID: {second_last_snapshot_id}")
Pour une liste détaillée des métadonnées disponibles, consultez la section de référence du code des métadonnées
Utiliser le voyage dans le temps
Vous pouvez utiliser des instantanés de tableau pour voyager dans le temps afin d'accéder aux états précédents de votre tableau. Voici comment afficher l'état de la table avant la dernière opération :
second_last_snapshot_id=table.snapshots()[-2].snapshot_id time_travel_df = table.scan( limit=100, snapshot_id=second_last_snapshot_id ).to_pandas() print(time_travel_df)
Pour une liste complète des fonctions disponibles, consultez la documentation de l'API PyIceberg Python