Trabalhando com tabelas Iceberg usando PyIceberg - AWS Orientação prescritiva

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Trabalhando com tabelas Iceberg usando PyIceberg

Esta seção explica como você pode interagir com as tabelas Iceberg usando PyIceberg. Os exemplos fornecidos são códigos padronizados que você pode executar em EC2 instâncias, AWS Lambdafunções do Amazon Linux 2023 ou em qualquer ambiente Python com credenciais configuradas corretamente.AWS

Pré-requisitos

nota

Esses exemplos usam PyIceberg 1.9.1.

Para trabalhar com PyIceberg, você precisa PyIceberg e AWS SDK para Python (Boto3) instalou. Aqui está um exemplo de como você pode configurar um ambiente virtual Python para trabalhar com PyIceberg e: AWS Glue Data Catalog

  1. Faça o download PyIcebergusando o instalador do pacote pip python. Você também precisa do Boto3 para interagir. Serviços da AWS Você pode configurar um ambiente virtual Python local para testar usando estes comandos:

    python3 -m venv my_env cd my_env/bin/ source activate pip install "pyiceberg[pyarrow,pandas,glue]" pip install boto3
  2. Execute python para abrir o shell do Python e testar os comandos.

Conectando-se ao catálogo de dados

Para começar a trabalhar com tabelas Iceberg em AWS Glue, primeiro você precisa se conectar ao AWS Glue Data Catalog. 

A load_catalog função inicializa uma conexão com o Catálogo de Dados criando um objeto de catálogo que serve como sua interface principal para todas as operações do Iceberg:

from pyiceberg.catalog import load_catalog region = "us-east-1" glue_catalog = load_catalog( 'default', **{ 'client.region': region }, type='glue' )

Listando e criando bancos de dados

Para listar bancos de dados existentes, use a list_namespaces função:

databases = glue_catalog.list_namespaces() print(databases)

Para criar um novo banco de dados, use a create_namespace função:

database_name="mydb" s3_db_path=f"s3://amzn-s3-demo-bucket/{database_name}" glue_catalog.create_namespace(database_name, properties={"location": s3_db_path})

Criando e escrevendo tabelas Iceberg

Tabelas não particionadas

Aqui está um exemplo de criação de uma tabela Iceberg não particionada usando a função: create_table

from pyiceberg.schema import Schema from pyiceberg.types import NestedField, StringType, DoubleType database_name="mydb" table_name="pyiceberg_table" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{table_name}" schema = Schema( NestedField(1, "city", StringType(), required=False), NestedField(2, "lat", DoubleType(), required=False), NestedField(3, "long", DoubleType(), required=False), ) glue_catalog.create_table(f"{database_name}.{table_name}", schema=schema, location=s3_table_path)

Você pode usar a list_tables função para verificar a lista de tabelas dentro de um banco de dados:

tables = glue_catalog.list_tables(namespace=database_name) print(tables)

Você pode usar a append função e PyArrow inserir dados dentro de uma tabela Iceberg:

import pyarrow as pa df = pa.Table.from_pylist( [ {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, {"city": "Paris", "lat": 48.864716, "long": 2.349014}, ], ) table = glue_catalog.load_table(f"{database_name}.{table_name}") table.append(df)

Tabelas particionadas

Aqui está um exemplo de criação de uma tabela Iceberg particionada com particionamento oculto usando a função e: create_table PartitionSpec

from pyiceberg.schema import Schema from pyiceberg.types import ( NestedField, StringType, FloatType, DoubleType, TimestampType, ) # Define the schema schema = Schema( NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True), NestedField(field_id=2, name="drone_id", field_type=StringType(), required=True), NestedField(field_id=3, name="lat", field_type=DoubleType(), required=False), NestedField(field_id=4, name="lon", field_type=DoubleType(), required=False), NestedField(field_id=5, name="height", field_type=FloatType(), required=False), ) from pyiceberg.partitioning import PartitionSpec, PartitionField from pyiceberg.transforms import DayTransform partition_spec = PartitionSpec( PartitionField( source_id=1, # Refers to "datetime" field_id=1000, transform=DayTransform(), name="datetime_day" ) ) database_name="mydb" partitioned_table_name="pyiceberg_table_partitioned" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{partitioned_table_name}" glue_catalog.create_table( identifier=f"{database_name}.{partitioned_table_name}", schema=schema, location=s3_table_path, partition_spec=partition_spec )

Você pode inserir dados em uma tabela particionada da mesma forma que em uma tabela não particionada. O particionamento é feito automaticamente.

from datetime import datetime arrow_schema = pa.schema([ pa.field("datetime", pa.timestamp("us"), nullable=False), pa.field("drone_id", pa.string(), nullable=False), pa.field("lat", pa.float64()), pa.field("lon", pa.float64()), pa.field("height", pa.float32()), ]) data = [ { "datetime": datetime(2024, 6, 1, 12, 0, 0), "drone_id": "drone_001", "lat": 52.371807, "lon": 4.896029, "height": 120.5, }, { "datetime": datetime(2024, 6, 1, 12, 5, 0), "drone_id": "drone_002", "lat": 37.773972, "lon": -122.431297, "height": 150.0, }, { "datetime": datetime(2024, 6, 2, 9, 0, 0), "drone_id": "drone_001", "lat": 53.11254, "lon": 6.0989, "height": 110.2, }, { "datetime": datetime(2024, 6, 2, 9, 30, 0), "drone_id": "drone_003", "lat": 48.864716, "lon": 2.349014, "height": 145.7, }, ] df = pa.Table.from_pylist(data, schema=arrow_schema) table = glue_catalog.load_table(f"{database_name}.{partitioned_table_name}") table.append(df)

Leitura de dados

Você pode usar a PyIceberg scan função para ler dados de suas tabelas do Iceberg. Você pode filtrar linhas, selecionar colunas específicas e limitar o número de registros retornados.

table= glue_catalog.load_table(f"{database_name}.{table_name}") scan_df = table.scan( row_filter=( f"city = 'Amsterdam'" ), selected_fields=("city", "lat"), limit=100, ).to_pandas() print(scan_df)

Excluir dados

A PyIceberg delete função permite que você remova registros da sua tabela usandodelete_filter:

table = glue_catalog.load_table(f"{database_name}.{table_name}") table.delete(delete_filter="city == 'Paris'")

Acessando metadados

PyIceberg fornece várias funções para acessar os metadados da tabela. Veja como você pode visualizar informações sobre instantâneos de tabelas:

#List of snapshots table.snapshots() #Current snapshot table.current_snapshot() #Take a previous snapshot second_last_snapshot_id=table.snapshots()[-2].snapshot_id print(f"Second last SnapshotID: {second_last_snapshot_id}")

Para obter uma lista detalhada dos metadados disponíveis, consulte a seção de referência do código de metadados da PyIceberg documentação.

Usando a viagem no tempo

Você pode usar instantâneos da tabela para viajar no tempo para acessar os estados anteriores da sua tabela. Veja como visualizar o estado da tabela antes da última operação:

second_last_snapshot_id=table.snapshots()[-2].snapshot_id time_travel_df = table.scan( limit=100, snapshot_id=second_last_snapshot_id ).to_pandas() print(time_travel_df)

Para ver uma lista completa das funções disponíveis, consulte a documentação da API PyIceberg Python.