使用 PyIceberg 處理 Iceberg 資料表 - AWS 方案指引

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 PyIceberg 處理 Iceberg 資料表

本節說明如何使用 PyIceberg 與 Iceberg 資料表互動。提供的範例是樣板程式碼,您可以在 Amazon Linux 2023 EC2 執行個體、AWS Lambda函數或任何具有正確設定AWS 憑證Python 環境中執行。

先決條件

注意

這些範例使用 PyIceberg 1.9.1。

若要使用 PyIceberg,您需要 適用於 Python (Boto3) 的 AWS SDK 安裝 PyIceberg 和 。以下是如何設定 Python 虛擬環境以使用 PyIceberg 和 的範例 AWS Glue Data Catalog:

  1. 使用 pip python 套件安裝程式下載 PyIceberg。您也需要 Boto3 才能與 互動 AWS 服務。您可以使用下列命令,設定要測試的本機 Python 虛擬環境:

    python3 -m venv my_env cd my_env/bin/ source activate pip install "pyiceberg[pyarrow,pandas,glue]" pip install boto3
  2. 執行python 以開啟 Python shell 並測試命令。

連線至 Data Catalog

若要開始在 中使用 Iceberg 資料表 AWS Glue,您必須先連線到 AWS Glue Data Catalog。 

load_catalog函數會建立目錄物件,做為所有 Iceberg 操作的主要界面,以初始化與 Data Catalog 的連線:

from pyiceberg.catalog import load_catalog region = "us-east-1" glue_catalog = load_catalog( 'default', **{ 'client.region': region }, type='glue' )

列出和建立資料庫

若要列出現有的資料庫,請使用 list_namespaces函數:

databases = glue_catalog.list_namespaces() print(databases)

若要建立新的資料庫,請使用 create_namespace函數:

database_name="mydb" s3_db_path=f"s3://amzn-s3-demo-bucket/{database_name}" glue_catalog.create_namespace(database_name, properties={"location": s3_db_path})

建立和寫入 Iceberg 資料表

未分割的資料表

以下是使用 create_table函數建立未分割 Iceberg 資料表的範例:

from pyiceberg.schema import Schema from pyiceberg.types import NestedField, StringType, DoubleType database_name="mydb" table_name="pyiceberg_table" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{table_name}" schema = Schema( NestedField(1, "city", StringType(), required=False), NestedField(2, "lat", DoubleType(), required=False), NestedField(3, "long", DoubleType(), required=False), ) glue_catalog.create_table(f"{database_name}.{table_name}", schema=schema, location=s3_table_path)

您可以使用 list_tables函數來檢查資料庫內的資料表清單:

tables = glue_catalog.list_tables(namespace=database_name) print(tables)

您可以使用 append函數和 PyArrow 在 Iceberg 資料表中插入資料:

import pyarrow as pa df = pa.Table.from_pylist( [ {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, {"city": "Paris", "lat": 48.864716, "long": 2.349014}, ], ) table = glue_catalog.load_table(f"{database_name}.{table_name}") table.append(df)

分割的資料表

以下是使用 create_table函數和 建立具有隱藏分割的分割 Iceberg 資料表的範例PartitionSpec

from pyiceberg.schema import Schema from pyiceberg.types import ( NestedField, StringType, FloatType, DoubleType, TimestampType, ) # Define the schema schema = Schema( NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True), NestedField(field_id=2, name="drone_id", field_type=StringType(), required=True), NestedField(field_id=3, name="lat", field_type=DoubleType(), required=False), NestedField(field_id=4, name="lon", field_type=DoubleType(), required=False), NestedField(field_id=5, name="height", field_type=FloatType(), required=False), ) from pyiceberg.partitioning import PartitionSpec, PartitionField from pyiceberg.transforms import DayTransform partition_spec = PartitionSpec( PartitionField( source_id=1, # Refers to "datetime" field_id=1000, transform=DayTransform(), name="datetime_day" ) ) database_name="mydb" partitioned_table_name="pyiceberg_table_partitioned" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{partitioned_table_name}" glue_catalog.create_table( identifier=f"{database_name}.{partitioned_table_name}", schema=schema, location=s3_table_path, partition_spec=partition_spec )

您可以將資料插入分割資料表的方式與未分割資料表相同。分割會自動處理。

from datetime import datetime arrow_schema = pa.schema([ pa.field("datetime", pa.timestamp("us"), nullable=False), pa.field("drone_id", pa.string(), nullable=False), pa.field("lat", pa.float64()), pa.field("lon", pa.float64()), pa.field("height", pa.float32()), ]) data = [ { "datetime": datetime(2024, 6, 1, 12, 0, 0), "drone_id": "drone_001", "lat": 52.371807, "lon": 4.896029, "height": 120.5, }, { "datetime": datetime(2024, 6, 1, 12, 5, 0), "drone_id": "drone_002", "lat": 37.773972, "lon": -122.431297, "height": 150.0, }, { "datetime": datetime(2024, 6, 2, 9, 0, 0), "drone_id": "drone_001", "lat": 53.11254, "lon": 6.0989, "height": 110.2, }, { "datetime": datetime(2024, 6, 2, 9, 30, 0), "drone_id": "drone_003", "lat": 48.864716, "lon": 2.349014, "height": 145.7, }, ] df = pa.Table.from_pylist(data, schema=arrow_schema) table = glue_catalog.load_table(f"{database_name}.{partitioned_table_name}") table.append(df)

讀取資料

您可以使用 PyIceberg scan函數從 Iceberg 資料表讀取資料。您可以篩選資料列、選取特定資料欄,以及限制傳回的記錄數目。

table= glue_catalog.load_table(f"{database_name}.{table_name}") scan_df = table.scan( row_filter=( f"city = 'Amsterdam'" ), selected_fields=("city", "lat"), limit=100, ).to_pandas() print(scan_df)

刪除資料

PyIceberg delete函數可讓您使用 從資料表中移除記錄delete_filter

table = glue_catalog.load_table(f"{database_name}.{table_name}") table.delete(delete_filter="city == 'Paris'")

存取中繼資料

PyIceberg 提供多種函數來存取資料表中繼資料。以下是您可以檢視資料表快照相關資訊的方式:

#List of snapshots table.snapshots() #Current snapshot table.current_snapshot() #Take a previous snapshot second_last_snapshot_id=table.snapshots()[-2].snapshot_id print(f"Second last SnapshotID: {second_last_snapshot_id}")

如需可用中繼資料的詳細清單,請參閱 PyIceberg 文件的中繼資料程式碼參考區段。

使用時間歷程

您可以使用資料表快照進行時間歷程,以存取資料表的先前狀態。以下是在上次操作之前檢視資料表狀態的方法:

second_last_snapshot_id=table.snapshots()[-2].snapshot_id time_travel_df = table.scan( limit=100, snapshot_id=second_last_snapshot_id ).to_pandas() print(time_travel_df)

如需可用函數的完整清單,請參閱 PyIcebergPython API 文件。