기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
PyIceberg 테이블 작업
이 섹션에서는 PyIceberg
사전 조건
참고
이 예제에서는 PyIceberg 1.9.1
PyIceberg를 사용하려면 PyIceberg 및가 AWS SDK for Python (Boto3) 설치되어 있어야 합니다. 다음은 PyIceberg 및와 함께 작동하도록 Python 가상 환경을 설정하는 방법의 예입니다. AWS Glue Data Catalog
-
pip python 패키지 설치
관리자를 사용하여 PyIceberg 를 다운로드합니다. 또한와 상호 작용하려면 Boto3 가 필요합니다 AWS 서비스. 다음 명령을 사용하여 테스트하도록 로컬 Python 가상 환경을 구성할 수 있습니다. python3 -m venv my_env cd my_env/bin/ source activate pip install "pyiceberg[pyarrow,pandas,glue]" pip install boto3
-
를 실행
python
하여 Python 셸을 열고 명령을 테스트합니다.
데이터 카탈로그에 연결
에서 Iceberg 테이블 작업을 시작하려면 AWS Glue먼저에 연결해야 합니다 AWS Glue Data Catalog.
load_catalog
함수는 모든 Iceberg 작업에 대한 기본 인터페이스 역할을 하는 카탈로그
from pyiceberg.catalog import load_catalog region = "us-east-1" glue_catalog = load_catalog( 'default', **{ 'client.region': region }, type='glue' )
데이터베이스 나열 및 생성
기존 데이터베이스를 나열하려면 list_namespaces
함수를 사용합니다.
databases = glue_catalog.list_namespaces() print(databases)
새 데이터베이스를 생성하려면 create_namespace
함수를 사용합니다.
database_name="mydb" s3_db_path=f"s3://amzn-s3-demo-bucket/{database_name}" glue_catalog.create_namespace(database_name, properties={"location": s3_db_path})
Iceberg 테이블 생성 및 작성
분할되지 않은 테이블
다음은 create_table
함수를 사용하여 파티셔닝되지 않은 Iceberg 테이블을 생성하는 예제입니다.
from pyiceberg.schema import Schema from pyiceberg.types import NestedField, StringType, DoubleType database_name="mydb" table_name="pyiceberg_table" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{table_name}" schema = Schema( NestedField(1, "city", StringType(), required=False), NestedField(2, "lat", DoubleType(), required=False), NestedField(3, "long", DoubleType(), required=False), ) glue_catalog.create_table(f"{database_name}.{table_name}", schema=schema, location=s3_table_path)
list_tables
함수를 사용하여 데이터베이스 내의 테이블 목록을 확인할 수 있습니다.
tables = glue_catalog.list_tables(namespace=database_name) print(tables)
함수와 PyArrow를append
사용하여 Iceberg 테이블 내에 데이터를 삽입할 수 있습니다.
import pyarrow as pa df = pa.Table.from_pylist( [ {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, {"city": "Paris", "lat": 48.864716, "long": 2.349014}, ], ) table = glue_catalog.load_table(f"{database_name}.{table_name}") table.append(df)
분할된 테이블
다음은 create_table
함수 및를 사용하여 숨겨진PartitionSpec
.
from pyiceberg.schema import Schema from pyiceberg.types import ( NestedField, StringType, FloatType, DoubleType, TimestampType, ) # Define the schema schema = Schema( NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True), NestedField(field_id=2, name="drone_id", field_type=StringType(), required=True), NestedField(field_id=3, name="lat", field_type=DoubleType(), required=False), NestedField(field_id=4, name="lon", field_type=DoubleType(), required=False), NestedField(field_id=5, name="height", field_type=FloatType(), required=False), ) from pyiceberg.partitioning import PartitionSpec, PartitionField from pyiceberg.transforms import DayTransform partition_spec = PartitionSpec( PartitionField( source_id=1, # Refers to "datetime" field_id=1000, transform=DayTransform(), name="datetime_day" ) ) database_name="mydb" partitioned_table_name="pyiceberg_table_partitioned" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{partitioned_table_name}" glue_catalog.create_table( identifier=f"{database_name}.{partitioned_table_name}", schema=schema, location=s3_table_path, partition_spec=partition_spec )
파티셔닝되지 않은 테이블과 동일한 방식으로 파티셔닝된 테이블에 데이터를 삽입할 수 있습니다. 파티셔닝은 자동으로 처리됩니다.
from datetime import datetime arrow_schema = pa.schema([ pa.field("datetime", pa.timestamp("us"), nullable=False), pa.field("drone_id", pa.string(), nullable=False), pa.field("lat", pa.float64()), pa.field("lon", pa.float64()), pa.field("height", pa.float32()), ]) data = [ { "datetime": datetime(2024, 6, 1, 12, 0, 0), "drone_id": "drone_001", "lat": 52.371807, "lon": 4.896029, "height": 120.5, }, { "datetime": datetime(2024, 6, 1, 12, 5, 0), "drone_id": "drone_002", "lat": 37.773972, "lon": -122.431297, "height": 150.0, }, { "datetime": datetime(2024, 6, 2, 9, 0, 0), "drone_id": "drone_001", "lat": 53.11254, "lon": 6.0989, "height": 110.2, }, { "datetime": datetime(2024, 6, 2, 9, 30, 0), "drone_id": "drone_003", "lat": 48.864716, "lon": 2.349014, "height": 145.7, }, ] df = pa.Table.from_pylist(data, schema=arrow_schema) table = glue_catalog.load_table(f"{database_name}.{partitioned_table_name}") table.append(df)
데이터 읽기
PyIceberg scan
함수를 사용하여 Iceberg 테이블에서 데이터를 읽을 수 있습니다. 행을 필터링하고, 특정 열을 선택하고, 반환된 레코드 수를 제한할 수 있습니다.
table= glue_catalog.load_table(f"{database_name}.{table_name}") scan_df = table.scan( row_filter=( f"city = 'Amsterdam'" ), selected_fields=("city", "lat"), limit=100, ).to_pandas() print(scan_df)
데이터 삭제
PyIceberg delete
함수를 사용하면 delete_filter
를 사용하여 테이블에서 레코드를 제거할 수 있습니다.
table = glue_catalog.load_table(f"{database_name}.{table_name}") table.delete(delete_filter="city == 'Paris'")
메타데이터 액세스
PyIceberg는 테이블 메타데이터에 액세스하는 여러 함수를 제공합니다. 테이블 스냅샷에 대한 정보를 보는 방법은 다음과 같습니다.
#List of snapshots table.snapshots() #Current snapshot table.current_snapshot() #Take a previous snapshot second_last_snapshot_id=table.snapshots()[-2].snapshot_id print(f"Second last SnapshotID: {second_last_snapshot_id}")
사용 가능한 메타데이터의 자세한 목록은 PyIceberg 설명서의 메타데이터
시간 이동 사용
테이블 스냅샷을 시간 이동에 사용하여 테이블의 이전 상태에 액세스할 수 있습니다. 다음은 마지막 작업 이전의 테이블 상태를 보는 방법입니다.
second_last_snapshot_id=table.snapshots()[-2].snapshot_id time_travel_df = table.scan( limit=100, snapshot_id=second_last_snapshot_id ).to_pandas() print(time_travel_df)
사용 가능한 함수의 전체 목록은 PyIcebergPython API