Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Arbeiten mit Iceberg-Tabellen mithilfe von PyIceberg
In diesem Abschnitt wird erklärt, wie Sie mit Iceberg-Tabellen interagieren können, indem Sie PyIceberg
Voraussetzungen
Um damit arbeiten zu können PyIceberg, benötigen Sie PyIceberg und AWS SDK für Python (Boto3) haben es installiert. Hier ist ein Beispiel dafür, wie Sie eine virtuelle Python-Umgebung einrichten können, um mit PyIceberg und zu arbeiten AWS Glue Data Catalog:
-
Laden Sie es mit PyIceberg
dem Pip-Python-Paketinstallationsprogramm herunter. Sie benötigen außerdem Boto3, um damit zu interagieren . AWS-Services Mit den folgenden Befehlen können Sie eine lokale virtuelle Python-Umgebung zum Testen konfigurieren: python3 -m venv my_env cd my_env/bin/ source activate pip install "pyiceberg[pyarrow,pandas,glue]" pip install boto3
-
Führen Sie aus
python
, um die Python-Shell zu öffnen und die Befehle zu testen.
Verbindung zum Datenkatalog herstellen
Um mit der Arbeit mit Iceberg-Tabellen in zu beginnen AWS Glue, müssen Sie zunächst eine Verbindung mit dem AWS Glue Data Catalog herstellen.
Die load_catalog
Funktion initialisiert eine Verbindung zum Datenkatalog, indem sie ein Katalogobjekt
from pyiceberg.catalog import load_catalog region = "us-east-1" glue_catalog = load_catalog( 'default', **{ 'client.region': region }, type='glue' )
Datenbanken auflisten und erstellen
Verwenden Sie die list_namespaces
Funktion, um bestehende Datenbanken aufzulisten:
databases = glue_catalog.list_namespaces() print(databases)
Verwenden Sie die create_namespace
Funktion, um eine neue Datenbank zu erstellen:
database_name="mydb" s3_db_path=f"s3://amzn-s3-demo-bucket/{database_name}" glue_catalog.create_namespace(database_name, properties={"location": s3_db_path})
Iceberg-Tabellen erstellen und schreiben
Unpartitionierte Tabellen
Hier ist ein Beispiel für die Erstellung einer unpartitionierten Iceberg-Tabelle mithilfe der Funktion: create_table
from pyiceberg.schema import Schema from pyiceberg.types import NestedField, StringType, DoubleType database_name="mydb" table_name="pyiceberg_table" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{table_name}" schema = Schema( NestedField(1, "city", StringType(), required=False), NestedField(2, "lat", DoubleType(), required=False), NestedField(3, "long", DoubleType(), required=False), ) glue_catalog.create_table(f"{database_name}.{table_name}", schema=schema, location=s3_table_path)
Sie können die list_tables
Funktion verwenden, um die Liste der Tabellen in einer Datenbank zu überprüfen:
tables = glue_catalog.list_tables(namespace=database_name) print(tables)
Sie können die append
Funktion und PyArrow zum Einfügen von Daten in eine Iceberg-Tabelle verwenden:
import pyarrow as pa df = pa.Table.from_pylist( [ {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, {"city": "Paris", "lat": 48.864716, "long": 2.349014}, ], ) table = glue_catalog.load_table(f"{database_name}.{table_name}") table.append(df)
Partitionierte Tabellen
Hier ist ein Beispiel für die Erstellung einer partitioniertencreate_table
PartitionSpec
from pyiceberg.schema import Schema from pyiceberg.types import ( NestedField, StringType, FloatType, DoubleType, TimestampType, ) # Define the schema schema = Schema( NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True), NestedField(field_id=2, name="drone_id", field_type=StringType(), required=True), NestedField(field_id=3, name="lat", field_type=DoubleType(), required=False), NestedField(field_id=4, name="lon", field_type=DoubleType(), required=False), NestedField(field_id=5, name="height", field_type=FloatType(), required=False), ) from pyiceberg.partitioning import PartitionSpec, PartitionField from pyiceberg.transforms import DayTransform partition_spec = PartitionSpec( PartitionField( source_id=1, # Refers to "datetime" field_id=1000, transform=DayTransform(), name="datetime_day" ) ) database_name="mydb" partitioned_table_name="pyiceberg_table_partitioned" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{partitioned_table_name}" glue_catalog.create_table( identifier=f"{database_name}.{partitioned_table_name}", schema=schema, location=s3_table_path, partition_spec=partition_spec )
Sie können Daten in eine partitionierte Tabelle genauso einfügen wie in eine unpartitionierte Tabelle. Die Partitionierung wird automatisch durchgeführt.
from datetime import datetime arrow_schema = pa.schema([ pa.field("datetime", pa.timestamp("us"), nullable=False), pa.field("drone_id", pa.string(), nullable=False), pa.field("lat", pa.float64()), pa.field("lon", pa.float64()), pa.field("height", pa.float32()), ]) data = [ { "datetime": datetime(2024, 6, 1, 12, 0, 0), "drone_id": "drone_001", "lat": 52.371807, "lon": 4.896029, "height": 120.5, }, { "datetime": datetime(2024, 6, 1, 12, 5, 0), "drone_id": "drone_002", "lat": 37.773972, "lon": -122.431297, "height": 150.0, }, { "datetime": datetime(2024, 6, 2, 9, 0, 0), "drone_id": "drone_001", "lat": 53.11254, "lon": 6.0989, "height": 110.2, }, { "datetime": datetime(2024, 6, 2, 9, 30, 0), "drone_id": "drone_003", "lat": 48.864716, "lon": 2.349014, "height": 145.7, }, ] df = pa.Table.from_pylist(data, schema=arrow_schema) table = glue_catalog.load_table(f"{database_name}.{partitioned_table_name}") table.append(df)
Lesen von Daten
Sie können die PyIceberg scan
Funktion verwenden, um Daten aus Ihren Iceberg-Tabellen zu lesen. Sie können Zeilen filtern, bestimmte Spalten auswählen und die Anzahl der zurückgegebenen Datensätze einschränken.
table= glue_catalog.load_table(f"{database_name}.{table_name}") scan_df = table.scan( row_filter=( f"city = 'Amsterdam'" ), selected_fields=("city", "lat"), limit=100, ).to_pandas() print(scan_df)
Löschen von Daten
Mit PyIceberg delete
dieser Funktion können Sie Datensätze aus Ihrer Tabelle entfernen, indem Sie Folgendes verwendendelete_filter
:
table = glue_catalog.load_table(f"{database_name}.{table_name}") table.delete(delete_filter="city == 'Paris'")
Zugreifen auf Metadaten
PyIceberg bietet mehrere Funktionen für den Zugriff auf Tabellenmetadaten. So können Sie Informationen zu Tabellenschnappschüssen anzeigen:
#List of snapshots table.snapshots() #Current snapshot table.current_snapshot() #Take a previous snapshot second_last_snapshot_id=table.snapshots()[-2].snapshot_id print(f"Second last SnapshotID: {second_last_snapshot_id}")
Eine ausführliche Liste der verfügbaren Metadaten finden Sie im Abschnitt mit der Referenz zum Metadatencode
Zeitreisen nutzen
Sie können Tabellenschnappschüsse für Zeitreisen verwenden, um auf frühere Status Ihrer Tabelle zuzugreifen. So zeigen Sie den Tabellenstatus vor der letzten Operation an:
second_last_snapshot_id=table.snapshots()[-2].snapshot_id time_travel_df = table.scan( limit=100, snapshot_id=second_last_snapshot_id ).to_pandas() print(time_travel_df)
Eine vollständige Liste der verfügbaren Funktionen finden Sie in der PyIceberg Python-API-Dokumentation