Bekerja dengan tabel Iceberg dengan menggunakan PyIceberg - AWS Bimbingan Preskriptif

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Bekerja dengan tabel Iceberg dengan menggunakan PyIceberg

Bagian ini menjelaskan bagaimana Anda dapat berinteraksi dengan tabel Iceberg dengan menggunakan. PyIceberg Contoh yang diberikan adalah kode boilerplate yang dapat Anda jalankan di EC2 instans Amazon Linux 2023, AWS Lambdafungsi, atau lingkungan Python apa pun dengan kredensi yang dikonfigurasi dengan benar.AWS

Prasyarat

catatan

Contoh-contoh ini menggunakan PyIceberg 1.9.1.

Untuk bekerja dengan PyIceberg, Anda perlu PyIceberg dan AWS SDK untuk Python (Boto3) menginstal. Berikut adalah contoh bagaimana Anda dapat mengatur lingkungan virtual Python untuk bekerja dengan PyIceberg dan: AWS Glue Data Catalog

  1. Unduh PyIcebergdengan menggunakan penginstal paket pip python. Anda juga membutuhkan Boto3 untuk berinteraksi. Layanan AWS Anda dapat mengkonfigurasi lingkungan virtual Python lokal untuk menguji dengan menggunakan perintah ini:

    python3 -m venv my_env cd my_env/bin/ source activate pip install "pyiceberg[pyarrow,pandas,glue]" pip install boto3
  2. Jalankan python untuk membuka shell Python dan menguji perintah.

Menghubungkan ke Katalog Data

Untuk mulai bekerja dengan tabel Iceberg di AWS Glue, Anda harus terlebih dahulu terhubung ke. AWS Glue Data CatalogĀ 

load_catalogFungsi menginisialisasi koneksi ke Katalog Data dengan membuat objek katalog yang berfungsi sebagai antarmuka utama Anda untuk semua operasi Iceberg:

from pyiceberg.catalog import load_catalog region = "us-east-1" glue_catalog = load_catalog( 'default', **{ 'client.region': region }, type='glue' )

Membuat daftar dan membuat database

Untuk membuat daftar database yang ada, gunakan list_namespaces fungsi:

databases = glue_catalog.list_namespaces() print(databases)

Untuk membuat database baru, gunakan create_namespace fungsi:

database_name="mydb" s3_db_path=f"s3://amzn-s3-demo-bucket/{database_name}" glue_catalog.create_namespace(database_name, properties={"location": s3_db_path})

Membuat dan menulis tabel Iceberg

Tabel yang tidak dipartisi

Berikut adalah contoh membuat tabel Iceberg yang tidak dipartisi dengan menggunakan fungsi: create_table

from pyiceberg.schema import Schema from pyiceberg.types import NestedField, StringType, DoubleType database_name="mydb" table_name="pyiceberg_table" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{table_name}" schema = Schema( NestedField(1, "city", StringType(), required=False), NestedField(2, "lat", DoubleType(), required=False), NestedField(3, "long", DoubleType(), required=False), ) glue_catalog.create_table(f"{database_name}.{table_name}", schema=schema, location=s3_table_path)

Anda dapat menggunakan list_tables fungsi untuk memeriksa daftar tabel di dalam database:

tables = glue_catalog.list_tables(namespace=database_name) print(tables)

Anda dapat menggunakan append fungsi dan PyArrow untuk menyisipkan data di dalam tabel Iceberg:

import pyarrow as pa df = pa.Table.from_pylist( [ {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, {"city": "Paris", "lat": 48.864716, "long": 2.349014}, ], ) table = glue_catalog.load_table(f"{database_name}.{table_name}") table.append(df)

Tabel yang dipartisi

Berikut adalah contoh membuat tabel Iceberg yang dipartisi dengan partisi tersembunyi dengan menggunakan fungsi dan: create_table PartitionSpec

from pyiceberg.schema import Schema from pyiceberg.types import ( NestedField, StringType, FloatType, DoubleType, TimestampType, ) # Define the schema schema = Schema( NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True), NestedField(field_id=2, name="drone_id", field_type=StringType(), required=True), NestedField(field_id=3, name="lat", field_type=DoubleType(), required=False), NestedField(field_id=4, name="lon", field_type=DoubleType(), required=False), NestedField(field_id=5, name="height", field_type=FloatType(), required=False), ) from pyiceberg.partitioning import PartitionSpec, PartitionField from pyiceberg.transforms import DayTransform partition_spec = PartitionSpec( PartitionField( source_id=1, # Refers to "datetime" field_id=1000, transform=DayTransform(), name="datetime_day" ) ) database_name="mydb" partitioned_table_name="pyiceberg_table_partitioned" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{partitioned_table_name}" glue_catalog.create_table( identifier=f"{database_name}.{partitioned_table_name}", schema=schema, location=s3_table_path, partition_spec=partition_spec )

Anda dapat menyisipkan data ke dalam tabel yang dipartisi dengan cara yang sama seperti untuk tabel yang tidak dipartisi. Partisi ditangani secara otomatis.

from datetime import datetime arrow_schema = pa.schema([ pa.field("datetime", pa.timestamp("us"), nullable=False), pa.field("drone_id", pa.string(), nullable=False), pa.field("lat", pa.float64()), pa.field("lon", pa.float64()), pa.field("height", pa.float32()), ]) data = [ { "datetime": datetime(2024, 6, 1, 12, 0, 0), "drone_id": "drone_001", "lat": 52.371807, "lon": 4.896029, "height": 120.5, }, { "datetime": datetime(2024, 6, 1, 12, 5, 0), "drone_id": "drone_002", "lat": 37.773972, "lon": -122.431297, "height": 150.0, }, { "datetime": datetime(2024, 6, 2, 9, 0, 0), "drone_id": "drone_001", "lat": 53.11254, "lon": 6.0989, "height": 110.2, }, { "datetime": datetime(2024, 6, 2, 9, 30, 0), "drone_id": "drone_003", "lat": 48.864716, "lon": 2.349014, "height": 145.7, }, ] df = pa.Table.from_pylist(data, schema=arrow_schema) table = glue_catalog.load_table(f"{database_name}.{partitioned_table_name}") table.append(df)

Membaca data

Anda dapat menggunakan PyIceberg scan fungsi untuk membaca data dari tabel Iceberg Anda. Anda dapat memfilter baris, memilih kolom tertentu, dan membatasi jumlah catatan yang dikembalikan.

table= glue_catalog.load_table(f"{database_name}.{table_name}") scan_df = table.scan( row_filter=( f"city = 'Amsterdam'" ), selected_fields=("city", "lat"), limit=100, ).to_pandas() print(scan_df)

Menghapus data

PyIceberg deleteFungsi ini memungkinkan Anda menghapus catatan dari tabel Anda dengan menggunakandelete_filter:

table = glue_catalog.load_table(f"{database_name}.{table_name}") table.delete(delete_filter="city == 'Paris'")

Mengakses metadata

PyIceberg menyediakan beberapa fungsi untuk mengakses metadata tabel. Berikut cara Anda dapat melihat informasi tentang snapshot tabel:

#List of snapshots table.snapshots() #Current snapshot table.current_snapshot() #Take a previous snapshot second_last_snapshot_id=table.snapshots()[-2].snapshot_id print(f"Second last SnapshotID: {second_last_snapshot_id}")

Untuk daftar rinci metadata yang tersedia, lihat bagian referensi kode metadata dokumentasi. PyIceberg

Menggunakan perjalanan waktu

Anda dapat menggunakan snapshot tabel untuk perjalanan waktu untuk mengakses status sebelumnya dari tabel Anda. Berikut cara melihat status tabel sebelum operasi terakhir:

second_last_snapshot_id=table.snapshots()[-2].snapshot_id time_travel_df = table.scan( limit=100, snapshot_id=second_last_snapshot_id ).to_pandas() print(time_travel_df)

Untuk daftar lengkap fungsi yang tersedia, lihat dokumentasi PyIceberg Python API.