Utilisation des tables Iceberg en utilisant PyIceberg - AWS Conseils prescriptifs

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utilisation des tables Iceberg en utilisant PyIceberg

Cette section explique comment vous pouvez interagir avec les tables Iceberg en utilisant PyIceberg. Les exemples fournis sont du code standard que vous pouvez exécuter sur des EC2 instances, des AWS Lambdafonctions Amazon Linux 2023 ou tout autre environnement Python avec des informations d'identification correctement configurées.AWS

Prérequis

Note

Ces exemples utilisent le PyIceberg 1.9.1.

Pour travailler avec PyIceberg, vous devez PyIceberg l' AWS SDK pour Python (Boto3) installer. Voici un exemple de la façon dont vous pouvez configurer un environnement virtuel Python pour travailler avec PyIceberg et AWS Glue Data Catalog :

  1. Téléchargez PyIcebergen utilisant le programme d'installation du package pip python. Vous avez également besoin de Boto3 pour interagir avec. Services AWS Vous pouvez configurer un environnement virtuel Python local à tester à l'aide des commandes suivantes :

    python3 -m venv my_env cd my_env/bin/ source activate pip install "pyiceberg[pyarrow,pandas,glue]" pip install boto3
  2. Exécutez python pour ouvrir le shell Python et tester les commandes.

Connexion au catalogue de données

Pour commencer à utiliser les tables Iceberg dans AWS Glue, vous devez d'abord vous connecter au AWS Glue Data Catalog. 

La load_catalog fonction initialise une connexion au catalogue de données en créant un objet de catalogue qui sert d'interface principale pour toutes les opérations Iceberg :

from pyiceberg.catalog import load_catalog region = "us-east-1" glue_catalog = load_catalog( 'default', **{ 'client.region': region }, type='glue' )

Lister et créer des bases de données

Pour répertorier les bases de données existantes, utilisez la list_namespaces fonction :

databases = glue_catalog.list_namespaces() print(databases)

Pour créer une nouvelle base de données, utilisez la create_namespace fonction :

database_name="mydb" s3_db_path=f"s3://amzn-s3-demo-bucket/{database_name}" glue_catalog.create_namespace(database_name, properties={"location": s3_db_path})

Création et écriture de tables Iceberg

Tables non partitionnées

Voici un exemple de création d'une table Iceberg non partitionnée à l'aide de la fonction : create_table

from pyiceberg.schema import Schema from pyiceberg.types import NestedField, StringType, DoubleType database_name="mydb" table_name="pyiceberg_table" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{table_name}" schema = Schema( NestedField(1, "city", StringType(), required=False), NestedField(2, "lat", DoubleType(), required=False), NestedField(3, "long", DoubleType(), required=False), ) glue_catalog.create_table(f"{database_name}.{table_name}", schema=schema, location=s3_table_path)

Vous pouvez utiliser cette list_tables fonction pour vérifier la liste des tables d'une base de données :

tables = glue_catalog.list_tables(namespace=database_name) print(tables)

Vous pouvez utiliser la append fonction et insérer PyArrow des données dans une table Iceberg :

import pyarrow as pa df = pa.Table.from_pylist( [ {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, {"city": "Paris", "lat": 48.864716, "long": 2.349014}, ], ) table = glue_catalog.load_table(f"{database_name}.{table_name}") table.append(df)

Tables partitionnées

Voici un exemple de création d'une table Iceberg partitionnée avec partitionnement masqué à l'aide de la create_table fonction et : PartitionSpec

from pyiceberg.schema import Schema from pyiceberg.types import ( NestedField, StringType, FloatType, DoubleType, TimestampType, ) # Define the schema schema = Schema( NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True), NestedField(field_id=2, name="drone_id", field_type=StringType(), required=True), NestedField(field_id=3, name="lat", field_type=DoubleType(), required=False), NestedField(field_id=4, name="lon", field_type=DoubleType(), required=False), NestedField(field_id=5, name="height", field_type=FloatType(), required=False), ) from pyiceberg.partitioning import PartitionSpec, PartitionField from pyiceberg.transforms import DayTransform partition_spec = PartitionSpec( PartitionField( source_id=1, # Refers to "datetime" field_id=1000, transform=DayTransform(), name="datetime_day" ) ) database_name="mydb" partitioned_table_name="pyiceberg_table_partitioned" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{partitioned_table_name}" glue_catalog.create_table( identifier=f"{database_name}.{partitioned_table_name}", schema=schema, location=s3_table_path, partition_spec=partition_spec )

Vous pouvez insérer des données dans une table partitionnée de la même manière que dans une table non partitionnée. Le partitionnement est géré automatiquement.

from datetime import datetime arrow_schema = pa.schema([ pa.field("datetime", pa.timestamp("us"), nullable=False), pa.field("drone_id", pa.string(), nullable=False), pa.field("lat", pa.float64()), pa.field("lon", pa.float64()), pa.field("height", pa.float32()), ]) data = [ { "datetime": datetime(2024, 6, 1, 12, 0, 0), "drone_id": "drone_001", "lat": 52.371807, "lon": 4.896029, "height": 120.5, }, { "datetime": datetime(2024, 6, 1, 12, 5, 0), "drone_id": "drone_002", "lat": 37.773972, "lon": -122.431297, "height": 150.0, }, { "datetime": datetime(2024, 6, 2, 9, 0, 0), "drone_id": "drone_001", "lat": 53.11254, "lon": 6.0989, "height": 110.2, }, { "datetime": datetime(2024, 6, 2, 9, 30, 0), "drone_id": "drone_003", "lat": 48.864716, "lon": 2.349014, "height": 145.7, }, ] df = pa.Table.from_pylist(data, schema=arrow_schema) table = glue_catalog.load_table(f"{database_name}.{partitioned_table_name}") table.append(df)

Lecture de données

Vous pouvez utiliser cette PyIceberg scan fonction pour lire les données de vos tables Iceberg. Vous pouvez filtrer les lignes, sélectionner des colonnes spécifiques et limiter le nombre d'enregistrements renvoyés.

table= glue_catalog.load_table(f"{database_name}.{table_name}") scan_df = table.scan( row_filter=( f"city = 'Amsterdam'" ), selected_fields=("city", "lat"), limit=100, ).to_pandas() print(scan_df)

Suppression des données

La PyIceberg delete fonction vous permet de supprimer des enregistrements de votre table en utilisant delete_filter :

table = glue_catalog.load_table(f"{database_name}.{table_name}") table.delete(delete_filter="city == 'Paris'")

Accès aux métadonnées

PyIceberg fournit plusieurs fonctions pour accéder aux métadonnées des tables. Voici comment afficher les informations relatives aux instantanés de table :

#List of snapshots table.snapshots() #Current snapshot table.current_snapshot() #Take a previous snapshot second_last_snapshot_id=table.snapshots()[-2].snapshot_id print(f"Second last SnapshotID: {second_last_snapshot_id}")

Pour une liste détaillée des métadonnées disponibles, consultez la section de référence du code des métadonnées de la PyIceberg documentation.

Utiliser le voyage dans le temps

Vous pouvez utiliser des instantanés de tableau pour voyager dans le temps afin d'accéder aux états précédents de votre tableau. Voici comment afficher l'état de la table avant la dernière opération :

second_last_snapshot_id=table.snapshots()[-2].snapshot_id time_travel_df = table.scan( limit=100, snapshot_id=second_last_snapshot_id ).to_pandas() print(time_travel_df)

Pour une liste complète des fonctions disponibles, consultez la documentation de l'API PyIceberg Python.