Arbeiten mit Iceberg-Tabellen mithilfe von PyIceberg - AWS Präskriptive Leitlinien

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Arbeiten mit Iceberg-Tabellen mithilfe von PyIceberg

In diesem Abschnitt wird erklärt, wie Sie mit Iceberg-Tabellen interagieren können, indem Sie PyIceberg Bei den bereitgestellten Beispielen handelt es sich um Standardcode, den Sie auf Amazon Linux EC2 2023-Instances, AWS Lambda-Funktionen oder einer beliebigen Python-Umgebung mit ordnungsgemäß konfigurierten AWS Anmeldeinformationen ausführen können.

Voraussetzungen

Um damit arbeiten zu können PyIceberg, benötigen Sie PyIceberg und AWS SDK für Python (Boto3) haben es installiert. Hier ist ein Beispiel dafür, wie Sie eine virtuelle Python-Umgebung einrichten können, um mit PyIceberg und zu arbeiten AWS Glue Data Catalog:

  1. Laden Sie es mit PyIcebergdem Pip-Python-Paketinstallationsprogramm herunter. Sie benötigen außerdem Boto3, um damit zu interagieren. AWS-Services Mit den folgenden Befehlen können Sie eine lokale virtuelle Python-Umgebung zum Testen konfigurieren:

    python3 -m venv my_env cd my_env/bin/ source activate pip install "pyiceberg[pyarrow,pandas,glue]" pip install boto3
  2. Führen Sie auspython, um die Python-Shell zu öffnen und die Befehle zu testen.

Verbindung zum Datenkatalog herstellen

Um mit der Arbeit mit Iceberg-Tabellen in zu beginnen AWS Glue, müssen Sie zunächst eine Verbindung mit dem AWS Glue Data Catalog herstellen. 

Die load_catalog Funktion initialisiert eine Verbindung zum Datenkatalog, indem sie ein Katalogobjekt erstellt, das als primäre Schnittstelle für alle Iceberg-Operationen dient:

from pyiceberg.catalog import load_catalog region = "us-east-1" glue_catalog = load_catalog( 'default', **{ 'client.region': region }, type='glue' )

Datenbanken auflisten und erstellen

Verwenden Sie die list_namespaces Funktion, um bestehende Datenbanken aufzulisten:

databases = glue_catalog.list_namespaces() print(databases)

Verwenden Sie die create_namespace Funktion, um eine neue Datenbank zu erstellen:

database_name="mydb" s3_db_path=f"s3://amzn-s3-demo-bucket/{database_name}" glue_catalog.create_namespace(database_name, properties={"location": s3_db_path})

Iceberg-Tabellen erstellen und schreiben

Unpartitionierte Tabellen

Hier ist ein Beispiel für die Erstellung einer unpartitionierten Iceberg-Tabelle mithilfe der Funktion: create_table

from pyiceberg.schema import Schema from pyiceberg.types import NestedField, StringType, DoubleType database_name="mydb" table_name="pyiceberg_table" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{table_name}" schema = Schema( NestedField(1, "city", StringType(), required=False), NestedField(2, "lat", DoubleType(), required=False), NestedField(3, "long", DoubleType(), required=False), ) glue_catalog.create_table(f"{database_name}.{table_name}", schema=schema, location=s3_table_path)

Sie können die list_tables Funktion verwenden, um die Liste der Tabellen in einer Datenbank zu überprüfen:

tables = glue_catalog.list_tables(namespace=database_name) print(tables)

Sie können die append Funktion und PyArrow zum Einfügen von Daten in eine Iceberg-Tabelle verwenden:

import pyarrow as pa df = pa.Table.from_pylist( [ {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, {"city": "Paris", "lat": 48.864716, "long": 2.349014}, ], ) table = glue_catalog.load_table(f"{database_name}.{table_name}") table.append(df)

Partitionierte Tabellen

Hier ist ein Beispiel für die Erstellung einer partitionierten Iceberg-Tabelle mit versteckter Partitionierung mithilfe der Funktion und: create_table PartitionSpec

from pyiceberg.schema import Schema from pyiceberg.types import ( NestedField, StringType, FloatType, DoubleType, TimestampType, ) # Define the schema schema = Schema( NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True), NestedField(field_id=2, name="drone_id", field_type=StringType(), required=True), NestedField(field_id=3, name="lat", field_type=DoubleType(), required=False), NestedField(field_id=4, name="lon", field_type=DoubleType(), required=False), NestedField(field_id=5, name="height", field_type=FloatType(), required=False), ) from pyiceberg.partitioning import PartitionSpec, PartitionField from pyiceberg.transforms import DayTransform partition_spec = PartitionSpec( PartitionField( source_id=1, # Refers to "datetime" field_id=1000, transform=DayTransform(), name="datetime_day" ) ) database_name="mydb" partitioned_table_name="pyiceberg_table_partitioned" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{partitioned_table_name}" glue_catalog.create_table( identifier=f"{database_name}.{partitioned_table_name}", schema=schema, location=s3_table_path, partition_spec=partition_spec )

Sie können Daten in eine partitionierte Tabelle genauso einfügen wie in eine unpartitionierte Tabelle. Die Partitionierung wird automatisch durchgeführt.

from datetime import datetime arrow_schema = pa.schema([ pa.field("datetime", pa.timestamp("us"), nullable=False), pa.field("drone_id", pa.string(), nullable=False), pa.field("lat", pa.float64()), pa.field("lon", pa.float64()), pa.field("height", pa.float32()), ]) data = [ { "datetime": datetime(2024, 6, 1, 12, 0, 0), "drone_id": "drone_001", "lat": 52.371807, "lon": 4.896029, "height": 120.5, }, { "datetime": datetime(2024, 6, 1, 12, 5, 0), "drone_id": "drone_002", "lat": 37.773972, "lon": -122.431297, "height": 150.0, }, { "datetime": datetime(2024, 6, 2, 9, 0, 0), "drone_id": "drone_001", "lat": 53.11254, "lon": 6.0989, "height": 110.2, }, { "datetime": datetime(2024, 6, 2, 9, 30, 0), "drone_id": "drone_003", "lat": 48.864716, "lon": 2.349014, "height": 145.7, }, ] df = pa.Table.from_pylist(data, schema=arrow_schema) table = glue_catalog.load_table(f"{database_name}.{partitioned_table_name}") table.append(df)

Lesen von Daten

Sie können die PyIceberg scan Funktion verwenden, um Daten aus Ihren Iceberg-Tabellen zu lesen. Sie können Zeilen filtern, bestimmte Spalten auswählen und die Anzahl der zurückgegebenen Datensätze einschränken.

table= glue_catalog.load_table(f"{database_name}.{table_name}") scan_df = table.scan( row_filter=( f"city = 'Amsterdam'" ), selected_fields=("city", "lat"), limit=100, ).to_pandas() print(scan_df)

Löschen von Daten

Mit PyIceberg delete dieser Funktion können Sie Datensätze aus Ihrer Tabelle entfernen, indem Sie Folgendes verwendendelete_filter:

table = glue_catalog.load_table(f"{database_name}.{table_name}") table.delete(delete_filter="city == 'Paris'")

Zugreifen auf Metadaten

PyIceberg bietet mehrere Funktionen für den Zugriff auf Tabellenmetadaten. So können Sie Informationen zu Tabellenschnappschüssen anzeigen:

#List of snapshots table.snapshots() #Current snapshot table.current_snapshot() #Take a previous snapshot second_last_snapshot_id=table.snapshots()[-2].snapshot_id print(f"Second last SnapshotID: {second_last_snapshot_id}")

Eine ausführliche Liste der verfügbaren Metadaten finden Sie im Abschnitt mit der Referenz zum Metadatencode der PyIceberg Dokumentation.

Zeitreisen nutzen

Sie können Tabellenschnappschüsse für Zeitreisen verwenden, um auf frühere Status Ihrer Tabelle zuzugreifen. So zeigen Sie den Tabellenstatus vor der letzten Operation an:

second_last_snapshot_id=table.snapshots()[-2].snapshot_id time_travel_df = table.scan( limit=100, snapshot_id=second_last_snapshot_id ).to_pandas() print(time_travel_df)

Eine vollständige Liste der verfügbaren Funktionen finden Sie in der PyIceberg Python-API-Dokumentation.