PyIceberg を使用した Iceberg テーブルの操作 - AWS 規範ガイダンス

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

PyIceberg を使用した Iceberg テーブルの操作

このセクションでは、PyIceberg を使用して Iceberg テーブルを操作する方法について説明します。提供されている例は、Amazon Linux 2023 EC2 インスタンス、AWS Lambda関数、または適切に設定されたAWS 認証情報を持つ任意の Python 環境で実行できる定型コードです。

前提条件

注記

これらの例ではPyIceberg 1.9.1 を使用しています。

PyIceberg を使用するには、PyIceberg と AWS SDK for Python (Boto3) がインストールされている必要があります。PyIceberg と で動作するように Python 仮想環境を設定する方法の例を次に示します AWS Glue Data Catalog。

  1. pip python パッケージインストーラを使用して PyIceberg をダウンロードします。Boto3 の操作も必要です AWS のサービス。次のコマンドを使用して、テストするローカル Python 仮想環境を設定できます。

    python3 -m venv my_env cd my_env/bin/ source activate pip install "pyiceberg[pyarrow,pandas,glue]" pip install boto3
  2. を実行してpython Python シェルを開き、コマンドをテストします。

データカタログへの接続

で Iceberg テーブルの操作を開始するには AWS Glue、まず に接続する必要があります AWS Glue Data Catalog。 

このload_catalog関数は、すべての Iceberg オペレーションのプライマリインターフェイスとして機能するカタログオブジェクトを作成して、データカタログへの接続を初期化します。

from pyiceberg.catalog import load_catalog region = "us-east-1" glue_catalog = load_catalog( 'default', **{ 'client.region': region }, type='glue' )

データベースの一覧表示と作成

既存のデータベースを一覧表示するには、 list_namespaces関数を使用します。

databases = glue_catalog.list_namespaces() print(databases)

新しいデータベースを作成するには、 create_namespace関数を使用します。

database_name="mydb" s3_db_path=f"s3://amzn-s3-demo-bucket/{database_name}" glue_catalog.create_namespace(database_name, properties={"location": s3_db_path})

Iceberg テーブルの作成と書き込み

パーティション化されていないテーブル

create_table 関数を使用してパーティション分割されていない Iceberg テーブルを作成する例を次に示します。

from pyiceberg.schema import Schema from pyiceberg.types import NestedField, StringType, DoubleType database_name="mydb" table_name="pyiceberg_table" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{table_name}" schema = Schema( NestedField(1, "city", StringType(), required=False), NestedField(2, "lat", DoubleType(), required=False), NestedField(3, "long", DoubleType(), required=False), ) glue_catalog.create_table(f"{database_name}.{table_name}", schema=schema, location=s3_table_path)

list_tables 関数を使用して、データベース内のテーブルのリストを確認できます。

tables = glue_catalog.list_tables(namespace=database_name) print(tables)

関数と PyArrow を使用してappend、Iceberg テーブル内にデータを挿入できます。

import pyarrow as pa df = pa.Table.from_pylist( [ {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, {"city": "Paris", "lat": 48.864716, "long": 2.349014}, ], ) table = glue_catalog.load_table(f"{database_name}.{table_name}") table.append(df)

パーティションテーブル

create_table 関数 と を使用して、パーティション分割を非表示にしてパーティション分割された Iceberg テーブルを作成する例を次に示しますPartitionSpechttps://iceberg.apache.org/docs/1.4.0/partitioning/#icebergs-hidden-partitioning

from pyiceberg.schema import Schema from pyiceberg.types import ( NestedField, StringType, FloatType, DoubleType, TimestampType, ) # Define the schema schema = Schema( NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True), NestedField(field_id=2, name="drone_id", field_type=StringType(), required=True), NestedField(field_id=3, name="lat", field_type=DoubleType(), required=False), NestedField(field_id=4, name="lon", field_type=DoubleType(), required=False), NestedField(field_id=5, name="height", field_type=FloatType(), required=False), ) from pyiceberg.partitioning import PartitionSpec, PartitionField from pyiceberg.transforms import DayTransform partition_spec = PartitionSpec( PartitionField( source_id=1, # Refers to "datetime" field_id=1000, transform=DayTransform(), name="datetime_day" ) ) database_name="mydb" partitioned_table_name="pyiceberg_table_partitioned" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{partitioned_table_name}" glue_catalog.create_table( identifier=f"{database_name}.{partitioned_table_name}", schema=schema, location=s3_table_path, partition_spec=partition_spec )

パーティション分割されていないテーブルの場合と同じ方法で、パーティション分割されたテーブルにデータを挿入できます。パーティショニングは自動的に処理されます。

from datetime import datetime arrow_schema = pa.schema([ pa.field("datetime", pa.timestamp("us"), nullable=False), pa.field("drone_id", pa.string(), nullable=False), pa.field("lat", pa.float64()), pa.field("lon", pa.float64()), pa.field("height", pa.float32()), ]) data = [ { "datetime": datetime(2024, 6, 1, 12, 0, 0), "drone_id": "drone_001", "lat": 52.371807, "lon": 4.896029, "height": 120.5, }, { "datetime": datetime(2024, 6, 1, 12, 5, 0), "drone_id": "drone_002", "lat": 37.773972, "lon": -122.431297, "height": 150.0, }, { "datetime": datetime(2024, 6, 2, 9, 0, 0), "drone_id": "drone_001", "lat": 53.11254, "lon": 6.0989, "height": 110.2, }, { "datetime": datetime(2024, 6, 2, 9, 30, 0), "drone_id": "drone_003", "lat": 48.864716, "lon": 2.349014, "height": 145.7, }, ] df = pa.Table.from_pylist(data, schema=arrow_schema) table = glue_catalog.load_table(f"{database_name}.{partitioned_table_name}") table.append(df)

データの読み込み

PyIceberg scan関数を使用して Iceberg テーブルからデータを読み取ることができます。行をフィルタリングし、特定の列を選択し、返されるレコードの数を制限できます。

table= glue_catalog.load_table(f"{database_name}.{table_name}") scan_df = table.scan( row_filter=( f"city = 'Amsterdam'" ), selected_fields=("city", "lat"), limit=100, ).to_pandas() print(scan_df)

データの削除

PyIceberg delete関数を使用すると、 を使用してテーブルからレコードを削除できますdelete_filter

table = glue_catalog.load_table(f"{database_name}.{table_name}") table.delete(delete_filter="city == 'Paris'")

メタデータへのアクセス

PyIceberg には、テーブルメタデータにアクセスするための関数がいくつか用意されています。テーブルスナップショットに関する情報を表示する方法は次のとおりです。

#List of snapshots table.snapshots() #Current snapshot table.current_snapshot() #Take a previous snapshot second_last_snapshot_id=table.snapshots()[-2].snapshot_id print(f"Second last SnapshotID: {second_last_snapshot_id}")

使用可能なメタデータの詳細なリストについては、PyIceberg ドキュメントのメタデータコードリファレンスセクションを参照してください。

タイムトラベルの使用

タイムトラベルにテーブルスナップショットを使用して、テーブルの以前の状態にアクセスできます。最後のオペレーションの前にテーブルの状態を表示する方法は次のとおりです。

second_last_snapshot_id=table.snapshots()[-2].snapshot_id time_travel_df = table.scan( limit=100, snapshot_id=second_last_snapshot_id ).to_pandas() print(time_travel_df)

使用可能な関数の完全なリストについては、PyIcebergPython API ドキュメントを参照してください。