本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用使用 Iceberg 表来处理冰山表 PyIceberg
本节介绍如何使用PyIceberg
先决条件
注意
这些示例使用 PyIceberg 1.9.1
要使用 PyIceberg,您需要 PyIceberg 并 适用于 Python (Boto3) 的 AWS SDK 安装。以下是如何设置要使用的 Python 虚拟环境的示例, PyIceberg 以及 AWS Glue Data Catalog:
-
使用 pip python 软件包安装程序
进行下载PyIceberg 。您还需要与 Boto3 进行交互。 AWS 服务您可以使用以下命令配置本地 Python 虚拟环境进行测试: python3 -m venv my_env cd my_env/bin/ source activate pip install "pyiceberg[pyarrow,pandas,glue]" pip install boto3
-
运行打开
python
Python 外壳并测试命令。
连接到数据目录
要开始在中使用 Iceberg 表 AWS Glue,您首先需要连接到。 AWS Glue Data Catalog
该load_catalog
函数通过创建一个作为所有 Iceberg 操作的主接口的目录
from pyiceberg.catalog import load_catalog region = "us-east-1" glue_catalog = load_catalog( 'default', **{ 'client.region': region }, type='glue' )
列出和创建数据库
要列出现有数据库,请使用以下list_namespaces
函数:
databases = glue_catalog.list_namespaces() print(databases)
要创建新数据库,请使用以下create_namespace
函数:
database_name="mydb" s3_db_path=f"s3://amzn-s3-demo-bucket/{database_name}" glue_catalog.create_namespace(database_name, properties={"location": s3_db_path})
创建和写作 Iceberg 表
未分区的表
以下是使用函数创建未分区的 Iceberg 表的示例:create_table
from pyiceberg.schema import Schema from pyiceberg.types import NestedField, StringType, DoubleType database_name="mydb" table_name="pyiceberg_table" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{table_name}" schema = Schema( NestedField(1, "city", StringType(), required=False), NestedField(2, "lat", DoubleType(), required=False), NestedField(3, "long", DoubleType(), required=False), ) glue_catalog.create_table(f"{database_name}.{table_name}", schema=schema, location=s3_table_path)
你可以使用该list_tables
函数来检查数据库中的表列表:
tables = glue_catalog.list_tables(namespace=database_name) print(tables)
你可以使用append
函数 and 在 Iceberg 表中插入数据: PyArrow
import pyarrow as pa df = pa.Table.from_pylist( [ {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, {"city": "Paris", "lat": 48.864716, "long": 2.349014}, ], ) table = glue_catalog.load_table(f"{database_name}.{table_name}") table.append(df)
分区表
以下是使用函数和创建带有隐藏分区的分create_table
PartitionSpec
from pyiceberg.schema import Schema from pyiceberg.types import ( NestedField, StringType, FloatType, DoubleType, TimestampType, ) # Define the schema schema = Schema( NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True), NestedField(field_id=2, name="drone_id", field_type=StringType(), required=True), NestedField(field_id=3, name="lat", field_type=DoubleType(), required=False), NestedField(field_id=4, name="lon", field_type=DoubleType(), required=False), NestedField(field_id=5, name="height", field_type=FloatType(), required=False), ) from pyiceberg.partitioning import PartitionSpec, PartitionField from pyiceberg.transforms import DayTransform partition_spec = PartitionSpec( PartitionField( source_id=1, # Refers to "datetime" field_id=1000, transform=DayTransform(), name="datetime_day" ) ) database_name="mydb" partitioned_table_name="pyiceberg_table_partitioned" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{partitioned_table_name}" glue_catalog.create_table( identifier=f"{database_name}.{partitioned_table_name}", schema=schema, location=s3_table_path, partition_spec=partition_spec )
您可以采用与未分区表相同的方式向分区表中插入数据。分区是自动处理的。
from datetime import datetime arrow_schema = pa.schema([ pa.field("datetime", pa.timestamp("us"), nullable=False), pa.field("drone_id", pa.string(), nullable=False), pa.field("lat", pa.float64()), pa.field("lon", pa.float64()), pa.field("height", pa.float32()), ]) data = [ { "datetime": datetime(2024, 6, 1, 12, 0, 0), "drone_id": "drone_001", "lat": 52.371807, "lon": 4.896029, "height": 120.5, }, { "datetime": datetime(2024, 6, 1, 12, 5, 0), "drone_id": "drone_002", "lat": 37.773972, "lon": -122.431297, "height": 150.0, }, { "datetime": datetime(2024, 6, 2, 9, 0, 0), "drone_id": "drone_001", "lat": 53.11254, "lon": 6.0989, "height": 110.2, }, { "datetime": datetime(2024, 6, 2, 9, 30, 0), "drone_id": "drone_003", "lat": 48.864716, "lon": 2.349014, "height": 145.7, }, ] df = pa.Table.from_pylist(data, schema=arrow_schema) table = glue_catalog.load_table(f"{database_name}.{partitioned_table_name}") table.append(df)
读取数据
您可以使用该 PyIceberg scan
函数从 Iceberg 表中读取数据。您可以筛选行、选择特定列并限制返回的记录数。
table= glue_catalog.load_table(f"{database_name}.{table_name}") scan_df = table.scan( row_filter=( f"city = 'Amsterdam'" ), selected_fields=("city", "lat"), limit=100, ).to_pandas() print(scan_df)
删除数据
该 PyIceberg delete
函数允许您使用以下方法从表中删除记录delete_filter
:
table = glue_catalog.load_table(f"{database_name}.{table_name}") table.delete(delete_filter="city == 'Paris'")
访问元数据
PyIceberg 提供了多种访问表元数据的函数。您可以通过以下方式查看有关表快照的信息:
#List of snapshots table.snapshots() #Current snapshot table.current_snapshot() #Take a previous snapshot second_last_snapshot_id=table.snapshots()[-2].snapshot_id print(f"Second last SnapshotID: {second_last_snapshot_id}")
有关可用元数据的详细列表,请参阅 PyIceberg 文档的元数据
使用时空旅行
您可以使用时空旅行的表快照来访问表的先前状态。以下是查看上次操作之前的表状态的方法:
second_last_snapshot_id=table.snapshots()[-2].snapshot_id time_travel_df = table.scan( limit=100, snapshot_id=second_last_snapshot_id ).to_pandas() print(time_travel_df)
有关可用函数的完整列表,请参阅 PyIceberg Python API