Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Bekerja dengan tabel Iceberg dengan menggunakan PyIceberg
Bagian ini menjelaskan bagaimana Anda dapat berinteraksi dengan tabel Iceberg dengan menggunakan. PyIceberg
Prasyarat
catatan
Contoh-contoh ini menggunakan PyIceberg 1.9.1
Untuk bekerja dengan PyIceberg, Anda perlu PyIceberg dan AWS SDK untuk Python (Boto3) menginstal. Berikut adalah contoh bagaimana Anda dapat mengatur lingkungan virtual Python untuk bekerja dengan PyIceberg dan: AWS Glue Data Catalog
-
Unduh PyIceberg
dengan menggunakan penginstal paket pip python . Anda juga membutuhkan Boto3 untuk berinteraksi. Layanan AWS Anda dapat mengkonfigurasi lingkungan virtual Python lokal untuk menguji dengan menggunakan perintah ini: python3 -m venv my_env cd my_env/bin/ source activate pip install "pyiceberg[pyarrow,pandas,glue]" pip install boto3
-
Jalankan
python
untuk membuka shell Python dan menguji perintah.
Menghubungkan ke Katalog Data
Untuk mulai bekerja dengan tabel Iceberg di AWS Glue, Anda harus terlebih dahulu terhubung ke. AWS Glue Data CatalogĀ
load_catalog
Fungsi menginisialisasi koneksi ke Katalog Data dengan membuat objek katalog
from pyiceberg.catalog import load_catalog region = "us-east-1" glue_catalog = load_catalog( 'default', **{ 'client.region': region }, type='glue' )
Membuat daftar dan membuat database
Untuk membuat daftar database yang ada, gunakan list_namespaces
fungsi:
databases = glue_catalog.list_namespaces() print(databases)
Untuk membuat database baru, gunakan create_namespace
fungsi:
database_name="mydb" s3_db_path=f"s3://amzn-s3-demo-bucket/{database_name}" glue_catalog.create_namespace(database_name, properties={"location": s3_db_path})
Membuat dan menulis tabel Iceberg
Tabel yang tidak dipartisi
Berikut adalah contoh membuat tabel Iceberg yang tidak dipartisi dengan menggunakan fungsi: create_table
from pyiceberg.schema import Schema from pyiceberg.types import NestedField, StringType, DoubleType database_name="mydb" table_name="pyiceberg_table" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{table_name}" schema = Schema( NestedField(1, "city", StringType(), required=False), NestedField(2, "lat", DoubleType(), required=False), NestedField(3, "long", DoubleType(), required=False), ) glue_catalog.create_table(f"{database_name}.{table_name}", schema=schema, location=s3_table_path)
Anda dapat menggunakan list_tables
fungsi untuk memeriksa daftar tabel di dalam database:
tables = glue_catalog.list_tables(namespace=database_name) print(tables)
Anda dapat menggunakan append
fungsi dan PyArrow untuk menyisipkan data di dalam tabel Iceberg:
import pyarrow as pa df = pa.Table.from_pylist( [ {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, {"city": "Paris", "lat": 48.864716, "long": 2.349014}, ], ) table = glue_catalog.load_table(f"{database_name}.{table_name}") table.append(df)
Tabel yang dipartisi
Berikut adalah contoh membuat tabel Iceberg yang dipartisicreate_table
PartitionSpec
from pyiceberg.schema import Schema from pyiceberg.types import ( NestedField, StringType, FloatType, DoubleType, TimestampType, ) # Define the schema schema = Schema( NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True), NestedField(field_id=2, name="drone_id", field_type=StringType(), required=True), NestedField(field_id=3, name="lat", field_type=DoubleType(), required=False), NestedField(field_id=4, name="lon", field_type=DoubleType(), required=False), NestedField(field_id=5, name="height", field_type=FloatType(), required=False), ) from pyiceberg.partitioning import PartitionSpec, PartitionField from pyiceberg.transforms import DayTransform partition_spec = PartitionSpec( PartitionField( source_id=1, # Refers to "datetime" field_id=1000, transform=DayTransform(), name="datetime_day" ) ) database_name="mydb" partitioned_table_name="pyiceberg_table_partitioned" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{partitioned_table_name}" glue_catalog.create_table( identifier=f"{database_name}.{partitioned_table_name}", schema=schema, location=s3_table_path, partition_spec=partition_spec )
Anda dapat menyisipkan data ke dalam tabel yang dipartisi dengan cara yang sama seperti untuk tabel yang tidak dipartisi. Partisi ditangani secara otomatis.
from datetime import datetime arrow_schema = pa.schema([ pa.field("datetime", pa.timestamp("us"), nullable=False), pa.field("drone_id", pa.string(), nullable=False), pa.field("lat", pa.float64()), pa.field("lon", pa.float64()), pa.field("height", pa.float32()), ]) data = [ { "datetime": datetime(2024, 6, 1, 12, 0, 0), "drone_id": "drone_001", "lat": 52.371807, "lon": 4.896029, "height": 120.5, }, { "datetime": datetime(2024, 6, 1, 12, 5, 0), "drone_id": "drone_002", "lat": 37.773972, "lon": -122.431297, "height": 150.0, }, { "datetime": datetime(2024, 6, 2, 9, 0, 0), "drone_id": "drone_001", "lat": 53.11254, "lon": 6.0989, "height": 110.2, }, { "datetime": datetime(2024, 6, 2, 9, 30, 0), "drone_id": "drone_003", "lat": 48.864716, "lon": 2.349014, "height": 145.7, }, ] df = pa.Table.from_pylist(data, schema=arrow_schema) table = glue_catalog.load_table(f"{database_name}.{partitioned_table_name}") table.append(df)
Membaca data
Anda dapat menggunakan PyIceberg scan
fungsi untuk membaca data dari tabel Iceberg Anda. Anda dapat memfilter baris, memilih kolom tertentu, dan membatasi jumlah catatan yang dikembalikan.
table= glue_catalog.load_table(f"{database_name}.{table_name}") scan_df = table.scan( row_filter=( f"city = 'Amsterdam'" ), selected_fields=("city", "lat"), limit=100, ).to_pandas() print(scan_df)
Menghapus data
PyIceberg delete
Fungsi ini memungkinkan Anda menghapus catatan dari tabel Anda dengan menggunakandelete_filter
:
table = glue_catalog.load_table(f"{database_name}.{table_name}") table.delete(delete_filter="city == 'Paris'")
Mengakses metadata
PyIceberg menyediakan beberapa fungsi untuk mengakses metadata tabel. Berikut cara Anda dapat melihat informasi tentang snapshot tabel:
#List of snapshots table.snapshots() #Current snapshot table.current_snapshot() #Take a previous snapshot second_last_snapshot_id=table.snapshots()[-2].snapshot_id print(f"Second last SnapshotID: {second_last_snapshot_id}")
Untuk daftar rinci metadata yang tersedia, lihat bagian referensi kode metadata
Menggunakan perjalanan waktu
Anda dapat menggunakan snapshot tabel untuk perjalanan waktu untuk mengakses status sebelumnya dari tabel Anda. Berikut cara melihat status tabel sebelum operasi terakhir:
second_last_snapshot_id=table.snapshots()[-2].snapshot_id time_travel_df = table.scan( limit=100, snapshot_id=second_last_snapshot_id ).to_pandas() print(time_travel_df)
Untuk daftar lengkap fungsi yang tersedia, lihat dokumentasi PyIceberg Python API