PyIceberg 테이블 작업 - AWS 권장 가이드

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

PyIceberg 테이블 작업

이 섹션에서는 PyIceberg를 사용하여 Iceberg 테이블과 상호 작용하는 방법을 설명합니다. 제공된 예제는 Amazon Linux 2023 EC2 인스턴스, AWS Lambda 함수 또는 자격 AWS 증명이 올바르게 구성된 Python 환경에서 실행할 수 있는 표준 문안 코드입니다.

사전 조건

참고

이 예제에서는 PyIceberg 1.9.1을 사용합니다.

PyIceberg를 사용하려면 PyIceberg 및가 AWS SDK for Python (Boto3) 설치되어 있어야 합니다. 다음은 PyIceberg 및와 함께 작동하도록 Python 가상 환경을 설정하는 방법의 예입니다. AWS Glue Data Catalog

  1. pip python 패키지 설치 관리자를 사용하여 PyIceberg를 다운로드합니다. 또한와 상호 작용하려면 Boto3가 필요합니다 AWS 서비스. 다음 명령을 사용하여 테스트하도록 로컬 Python 가상 환경을 구성할 수 있습니다.

    python3 -m venv my_env cd my_env/bin/ source activate pip install "pyiceberg[pyarrow,pandas,glue]" pip install boto3
  2. 를 실행python하여 Python 셸을 열고 명령을 테스트합니다.

데이터 카탈로그에 연결

에서 Iceberg 테이블 작업을 시작하려면 AWS Glue먼저에 연결해야 합니다 AWS Glue Data Catalog. 

load_catalog함수는 모든 Iceberg 작업에 대한 기본 인터페이스 역할을 하는 카탈로그 객체를 생성하여 데이터 카탈로그에 대한 연결을 초기화합니다.

from pyiceberg.catalog import load_catalog region = "us-east-1" glue_catalog = load_catalog( 'default', **{ 'client.region': region }, type='glue' )

데이터베이스 나열 및 생성

기존 데이터베이스를 나열하려면 list_namespaces 함수를 사용합니다.

databases = glue_catalog.list_namespaces() print(databases)

새 데이터베이스를 생성하려면 create_namespace 함수를 사용합니다.

database_name="mydb" s3_db_path=f"s3://amzn-s3-demo-bucket/{database_name}" glue_catalog.create_namespace(database_name, properties={"location": s3_db_path})

Iceberg 테이블 생성 및 작성

분할되지 않은 테이블

다음은 create_table 함수를 사용하여 파티셔닝되지 않은 Iceberg 테이블을 생성하는 예제입니다.

from pyiceberg.schema import Schema from pyiceberg.types import NestedField, StringType, DoubleType database_name="mydb" table_name="pyiceberg_table" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{table_name}" schema = Schema( NestedField(1, "city", StringType(), required=False), NestedField(2, "lat", DoubleType(), required=False), NestedField(3, "long", DoubleType(), required=False), ) glue_catalog.create_table(f"{database_name}.{table_name}", schema=schema, location=s3_table_path)

list_tables 함수를 사용하여 데이터베이스 내의 테이블 목록을 확인할 수 있습니다.

tables = glue_catalog.list_tables(namespace=database_name) print(tables)

함수와 PyArrow를append 사용하여 Iceberg 테이블 내에 데이터를 삽입할 수 있습니다.

import pyarrow as pa df = pa.Table.from_pylist( [ {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, {"city": "Paris", "lat": 48.864716, "long": 2.349014}, ], ) table = glue_catalog.load_table(f"{database_name}.{table_name}") table.append(df)

분할된 테이블

다음은 create_table 함수 및를 사용하여 숨겨진 파티셔닝이 있는 파티셔닝된 Iceberg 테이블을 생성하는 예제입니다PartitionSpec.

from pyiceberg.schema import Schema from pyiceberg.types import ( NestedField, StringType, FloatType, DoubleType, TimestampType, ) # Define the schema schema = Schema( NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True), NestedField(field_id=2, name="drone_id", field_type=StringType(), required=True), NestedField(field_id=3, name="lat", field_type=DoubleType(), required=False), NestedField(field_id=4, name="lon", field_type=DoubleType(), required=False), NestedField(field_id=5, name="height", field_type=FloatType(), required=False), ) from pyiceberg.partitioning import PartitionSpec, PartitionField from pyiceberg.transforms import DayTransform partition_spec = PartitionSpec( PartitionField( source_id=1, # Refers to "datetime" field_id=1000, transform=DayTransform(), name="datetime_day" ) ) database_name="mydb" partitioned_table_name="pyiceberg_table_partitioned" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{partitioned_table_name}" glue_catalog.create_table( identifier=f"{database_name}.{partitioned_table_name}", schema=schema, location=s3_table_path, partition_spec=partition_spec )

파티셔닝되지 않은 테이블과 동일한 방식으로 파티셔닝된 테이블에 데이터를 삽입할 수 있습니다. 파티셔닝은 자동으로 처리됩니다.

from datetime import datetime arrow_schema = pa.schema([ pa.field("datetime", pa.timestamp("us"), nullable=False), pa.field("drone_id", pa.string(), nullable=False), pa.field("lat", pa.float64()), pa.field("lon", pa.float64()), pa.field("height", pa.float32()), ]) data = [ { "datetime": datetime(2024, 6, 1, 12, 0, 0), "drone_id": "drone_001", "lat": 52.371807, "lon": 4.896029, "height": 120.5, }, { "datetime": datetime(2024, 6, 1, 12, 5, 0), "drone_id": "drone_002", "lat": 37.773972, "lon": -122.431297, "height": 150.0, }, { "datetime": datetime(2024, 6, 2, 9, 0, 0), "drone_id": "drone_001", "lat": 53.11254, "lon": 6.0989, "height": 110.2, }, { "datetime": datetime(2024, 6, 2, 9, 30, 0), "drone_id": "drone_003", "lat": 48.864716, "lon": 2.349014, "height": 145.7, }, ] df = pa.Table.from_pylist(data, schema=arrow_schema) table = glue_catalog.load_table(f"{database_name}.{partitioned_table_name}") table.append(df)

데이터 읽기

PyIceberg scan 함수를 사용하여 Iceberg 테이블에서 데이터를 읽을 수 있습니다. 행을 필터링하고, 특정 열을 선택하고, 반환된 레코드 수를 제한할 수 있습니다.

table= glue_catalog.load_table(f"{database_name}.{table_name}") scan_df = table.scan( row_filter=( f"city = 'Amsterdam'" ), selected_fields=("city", "lat"), limit=100, ).to_pandas() print(scan_df)

데이터 삭제

PyIceberg delete 함수를 사용하면 delete_filter를 사용하여 테이블에서 레코드를 제거할 수 있습니다.

table = glue_catalog.load_table(f"{database_name}.{table_name}") table.delete(delete_filter="city == 'Paris'")

메타데이터 액세스

PyIceberg는 테이블 메타데이터에 액세스하는 여러 함수를 제공합니다. 테이블 스냅샷에 대한 정보를 보는 방법은 다음과 같습니다.

#List of snapshots table.snapshots() #Current snapshot table.current_snapshot() #Take a previous snapshot second_last_snapshot_id=table.snapshots()[-2].snapshot_id print(f"Second last SnapshotID: {second_last_snapshot_id}")

사용 가능한 메타데이터의 자세한 목록은 PyIceberg 설명서의 메타데이터 코드 참조 섹션을 참조하세요.

시간 이동 사용

테이블 스냅샷을 시간 이동에 사용하여 테이블의 이전 상태에 액세스할 수 있습니다. 다음은 마지막 작업 이전의 테이블 상태를 보는 방법입니다.

second_last_snapshot_id=table.snapshots()[-2].snapshot_id time_travel_df = table.scan( limit=100, snapshot_id=second_last_snapshot_id ).to_pandas() print(time_travel_df)

사용 가능한 함수의 전체 목록은 PyIcebergPython API 설명서를 참조하세요.