Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Trabajar con tablas Iceberg mediante PyIceberg
En esta sección se explica cómo puede interactuar con las tablas Iceberg mediante el uso de. PyIceberg
Requisitos previos
Para trabajar con PyIceberg él, necesita una PyIceberg AWS SDK para Python (Boto3) instalación. A continuación, se muestra un ejemplo de cómo puede configurar un entorno virtual de Python para trabajar con él PyIceberg y AWS Glue Data Catalog:
-
Descárguelo PyIceberg
mediante el instalador del paquete pip python . También necesitas Boto3 para interactuar con él . Servicios de AWS Puede configurar un entorno virtual Python local para realizar pruebas mediante estos comandos: python3 -m venv my_env cd my_env/bin/ source activate pip install "pyiceberg[pyarrow,pandas,glue]" pip install boto3
-
Ejecute
python
para abrir el shell de Python y probar los comandos.
Conexión al catálogo de datos
Para empezar a trabajar con las tablas Iceberg AWS Glue, primero tiene que conectarse al AWS Glue Data Catalog.
La load_catalog
función inicializa una conexión al catálogo de datos mediante la creación de un objeto de catálogo
from pyiceberg.catalog import load_catalog region = "us-east-1" glue_catalog = load_catalog( 'default', **{ 'client.region': region }, type='glue' )
Listar y crear bases de datos
Para enumerar las bases de datos existentes, utilice la list_namespaces
función:
databases = glue_catalog.list_namespaces() print(databases)
Para crear una base de datos nueva, utilice la create_namespace
función:
database_name="mydb" s3_db_path=f"s3://amzn-s3-demo-bucket/{database_name}" glue_catalog.create_namespace(database_name, properties={"location": s3_db_path})
Crear y escribir tablas Iceberg
Tablas no particionadas
Este es un ejemplo de cómo crear una tabla Iceberg sin particiones mediante la función: create_table
from pyiceberg.schema import Schema from pyiceberg.types import NestedField, StringType, DoubleType database_name="mydb" table_name="pyiceberg_table" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{table_name}" schema = Schema( NestedField(1, "city", StringType(), required=False), NestedField(2, "lat", DoubleType(), required=False), NestedField(3, "long", DoubleType(), required=False), ) glue_catalog.create_table(f"{database_name}.{table_name}", schema=schema, location=s3_table_path)
Puedes usar la list_tables
función para comprobar la lista de tablas de una base de datos:
tables = glue_catalog.list_tables(namespace=database_name) print(tables)
Puede usar la append
función e PyArrow insertar datos dentro de una tabla Iceberg:
import pyarrow as pa df = pa.Table.from_pylist( [ {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, {"city": "Paris", "lat": 48.864716, "long": 2.349014}, ], ) table = glue_catalog.load_table(f"{database_name}.{table_name}") table.append(df)
Tablas particionadas
Este es un ejemplo de cómo crear una tabla Iceberg particionadacreate_table
PartitionSpec
from pyiceberg.schema import Schema from pyiceberg.types import ( NestedField, StringType, FloatType, DoubleType, TimestampType, ) # Define the schema schema = Schema( NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True), NestedField(field_id=2, name="drone_id", field_type=StringType(), required=True), NestedField(field_id=3, name="lat", field_type=DoubleType(), required=False), NestedField(field_id=4, name="lon", field_type=DoubleType(), required=False), NestedField(field_id=5, name="height", field_type=FloatType(), required=False), ) from pyiceberg.partitioning import PartitionSpec, PartitionField from pyiceberg.transforms import DayTransform partition_spec = PartitionSpec( PartitionField( source_id=1, # Refers to "datetime" field_id=1000, transform=DayTransform(), name="datetime_day" ) ) database_name="mydb" partitioned_table_name="pyiceberg_table_partitioned" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{partitioned_table_name}" glue_catalog.create_table( identifier=f"{database_name}.{partitioned_table_name}", schema=schema, location=s3_table_path, partition_spec=partition_spec )
Puedes insertar datos en una tabla particionada del mismo modo que en una tabla sin particiones. La partición se gestiona automáticamente.
from datetime import datetime arrow_schema = pa.schema([ pa.field("datetime", pa.timestamp("us"), nullable=False), pa.field("drone_id", pa.string(), nullable=False), pa.field("lat", pa.float64()), pa.field("lon", pa.float64()), pa.field("height", pa.float32()), ]) data = [ { "datetime": datetime(2024, 6, 1, 12, 0, 0), "drone_id": "drone_001", "lat": 52.371807, "lon": 4.896029, "height": 120.5, }, { "datetime": datetime(2024, 6, 1, 12, 5, 0), "drone_id": "drone_002", "lat": 37.773972, "lon": -122.431297, "height": 150.0, }, { "datetime": datetime(2024, 6, 2, 9, 0, 0), "drone_id": "drone_001", "lat": 53.11254, "lon": 6.0989, "height": 110.2, }, { "datetime": datetime(2024, 6, 2, 9, 30, 0), "drone_id": "drone_003", "lat": 48.864716, "lon": 2.349014, "height": 145.7, }, ] df = pa.Table.from_pylist(data, schema=arrow_schema) table = glue_catalog.load_table(f"{database_name}.{partitioned_table_name}") table.append(df)
Lectura de datos
Puede utilizar la PyIceberg scan
función para leer datos de sus tablas Iceberg. Puede filtrar filas, seleccionar columnas específicas y limitar el número de registros devueltos.
table= glue_catalog.load_table(f"{database_name}.{table_name}") scan_df = table.scan( row_filter=( f"city = 'Amsterdam'" ), selected_fields=("city", "lat"), limit=100, ).to_pandas() print(scan_df)
Eliminación de datos
La PyIceberg delete
función le permite eliminar registros de la tabla mediantedelete_filter
:
table = glue_catalog.load_table(f"{database_name}.{table_name}") table.delete(delete_filter="city == 'Paris'")
Acceder a los metadatos
PyIceberg proporciona varias funciones para acceder a los metadatos de las tablas. A continuación, le indicamos cómo puede ver la información sobre las instantáneas de las tablas:
#List of snapshots table.snapshots() #Current snapshot table.current_snapshot() #Take a previous snapshot second_last_snapshot_id=table.snapshots()[-2].snapshot_id print(f"Second last SnapshotID: {second_last_snapshot_id}")
Para obtener una lista detallada de los metadatos disponibles, consulta la sección de referencia de códigos de metadatos
Uso del viaje en el tiempo
Puede utilizar instantáneas de tablas para viajar en el tiempo para acceder a estados anteriores de la tabla. A continuación, te explicamos cómo ver el estado de la tabla antes de la última operación:
second_last_snapshot_id=table.snapshots()[-2].snapshot_id time_travel_df = table.scan( limit=100, snapshot_id=second_last_snapshot_id ).to_pandas() print(time_travel_df)
Para obtener una lista completa de las funciones disponibles, consulta la documentación de la API de PyIceberg Python