Trino를 사용하여 Iceberg 테이블 작업 - AWS 권장 가이드

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Trino를 사용하여 Iceberg 테이블 작업

이 섹션에서는 Amazon EMR에서 Trino를 사용하여 Iceberg 테이블을 설정하고 운영하는 방법을 설명합니다. 이 예제는 Amazon EMR on EC2 클러스터에서 실행할 수 있는 표준 문안 코드입니다. 이 섹션의 코드 예제 및 구성에서는 Amazon EMR 릴리스 emr-7.9.0을 사용한다고 가정합니다.

Amazon EMR on EC2 설정

  1. 다음 콘텐츠가 포함된 iceberg.properties파일을 생성합니다. CREATE TABLE 문에 형식이 명시적으로 지정되지 않은 경우 iceberg.file-format=parquet 설정에 따라 새 테이블의 기본 스토리지 형식이 결정됩니다.

    connector.name=iceberg iceberg.catalog.type=glue iceberg.file-format=parquet fs.native-s3.enabled=true
  2. iceberg.properties 파일을 S3 버킷에 업로드합니다. 

  3. S3 버킷에서 iceberg.properties파일을 복사하여 생성할 Amazon EMR 클러스터에 Trino 구성 파일로 저장하는 부트스트랩 작업을 생성합니다.를 S3 버킷 이름으로 <S3-bucket-name>바꿔야 합니다. 

    #!/bin/bash set -ex sudo aws s3 cp s3://<S3-bucket-name>/iceberg.properties /etc/trino/conf/catalog/iceberg.properties
  4. Trino가 설치된 Amazon EMR 클러스터를 생성하고 이전 스크립트의 실행을 부트스트랩 작업으로 지정합니다. 다음은 클러스터를 생성하기 위한 sample AWS Command Line Interface (AWS CLI) 명령입니다.

    aws emr create-cluster --release-label emr-7.9.0 \ --applications Name=Trino \ --region <region> \ --name Trino_Iceberg_Cluster \ --bootstrap-actions '[{"Path":"s3://<S3-bucket-name>/bootstrap.sh","Name":"Add iceberg.properties"}]' \ --instance-groups '[{"InstanceGroupType":"MASTER","InstanceCount":1,"InstanceType":"m5.xlarge"},{"InstanceGroupType":"CORE","InstanceCount":3,"InstanceType":"m5.xlarge"}]' \ --service-role "<IAM-service-role>" \ --ec2-attributes '{"KeyName":"<key-name>","InstanceProfile":"<EMR-EC2-instance-profile>"}'

    여기서 다음을 바꿉니다.

    • <S3-bucket-name> S3 버킷 이름 사용

    • <region> 특정를 사용하는 AWS 리전

    • <key-name>를 키 페어와 함께 사용합니다. 키 페어가 없는 경우 키 페어가 생성됩니다.

    • <IAM-service-role> 최소 권한 원칙을 따르는 Amazon EMR 서비스 역할을 사용합니다. 

    • <EMR-EC2-instance-profile>인스턴스 프로파일과 함께 사용합니다. 

  5. Amazon EMR 클러스터가 초기화되면 다음 명령을 실행하여 Trino 세션을 초기화할 수 있습니다.

    trino-cli
  6. Trino CLI에서 다음을 실행하여 카탈로그를 볼 수 있습니다.

    SHOW CATALOGS;

Iceberg 테이블 생성

Iceberg 테이블을 생성하려면 CREATE TABLE 문을 사용할 수 있습니다.   다음은 Iceberg 숨겨진 파티셔닝을 사용하는 파티셔닝된 테이블을 생성하는 예입니다.

CREATE TABLE iceberg.iceberg_db.iceberg_table ( userid int, firstname varchar, city varchar) WITH ( format = 'PARQUET', partitioning = ARRAY['city', 'bucket(userid, 16)'], location = 's3://<S3-bucket>/<prefix>');
참고

형식을 지정하지 않으면 이전 섹션에서 구성한 iceberg.file-format 값이 사용됩니다.

데이터를 삽입하려면 INSERT INTO 명령을 사용합니다. 다음은 그 예입니다.

INSERT INTO iceberg.iceberg_db.iceberg_table (userid, firstname, city) VALUES (1001, 'John', 'New York'), (1002, 'Mary', 'Los Angeles'), (1003, 'Mateo', 'Chicago'), (1004, 'Shirley', 'Houston'), (1005, 'Diego', 'Miami'), (1006, 'Nikki', 'Seattle'), (1007, 'Pat', 'Boston'), (1008, 'Terry', 'San Francisco'), (1009, 'Richard', 'Denver'), (1010, 'Pat', 'Phoenix');

Iceberg 테이블에서 읽기

다음과 같이 SELECT 문을 사용하여 Iceberg 테이블의 최신 상태를 읽을 수 있습니다.

SELECT * FROM iceberg.iceberg_db.iceberg_table;

Iceberg 테이블로 데이터 업서스팅

MERGE INTO 문을 사용하여 업서트 작업을 수행할 수 있습니다(새 레코드를 동시에 삽입하고 기존 레코드를 업데이트). 다음은 그 예입니다.

MERGE INTO iceberg.iceberg_db.iceberg_table target USING ( VALUES (1001, 'John Updated', 'Boston'), -- Update existing user (1002, 'Mary Updated', 'Seattle'), -- Update existing user (1011, 'Martha', 'Portland'), -- Insert new user (1012, 'Paulo', 'Austin') -- Insert new user ) AS source (userid, firstname, city) ON target.userid = source.userid WHEN MATCHED THEN UPDATE SET firstname = source.firstname, city = source.city WHEN NOT MATCHED THEN INSERT (userid, firstname, city) VALUES (source.userid, source.firstname, source.city);

Iceberg 테이블에서 레코드 삭제

Iceberg 테이블에서 데이터를 삭제하려면 DELETE FROM 표현식을 사용하고 삭제할 행과 일치하는 필터를 지정합니다. 다음은 그 예입니다.

DELETE FROM iceberg.iceberg_db.iceberg_table WHERE userid IN (1003, 1004);

Iceberg 테이블 메타데이터 쿼리

Iceberg는 SQL을 통해 메타데이터에 대한 액세스를 제공합니다. 네임스페이스를 쿼리하여 지정된 테이블(<table_name>)의 메타데이터에 액세스할 수 있습니다"<table_name>.$<metadata_table>". 메타데이터 테이블의 전체 목록은 Iceberg 설명서의 테이블 검사를 참조하세요.

다음은 Iceberg 메타데이터를 검사하기 위한 쿼리 목록의 예입니다.

SELECT FROM iceberg.iceberg_db."iceberg_table$snapshots"; SELECT FROM iceberg.iceberg_db."iceberg_table$history"; SELECT FROM iceberg.iceberg_db."iceberg_table$partitions"; SELECT FROM iceberg.iceberg_db."iceberg_table$files"; SELECT FROM iceberg.iceberg_db."iceberg_table$manifests"; SELECT FROM iceberg.iceberg_db."iceberg_table$refs"; SELECT * FROM iceberg.iceberg_db."iceberg_table$metadata_log_entries";

예를 들어, 쿼리는 다음과 같습니다.

SELECT * FROM iceberg.iceberg_db."iceberg_table$snapshots";

는 출력을 제공합니다.

Iceberg 테이블 메타데이터 쿼리의 출력입니다.

시간 이동 사용

Iceberg 테이블의 각 쓰기 작업(삽입, 업데이트, 업서트 또는 삭제)은 새 스냅샷을 생성합니다. 그런 다음 이러한 스냅샷을 시간 이동에 사용하여 과거로 돌아가서 과거의 테이블 상태를 확인할 수 있습니다.

다음 시간 이동 쿼리는 특정를 기반으로 테이블의 상태를 표시합니다. snapshot_id

SELECT * FROM iceberg.iceberg_db.iceberg_table FOR VERSION AS OF 241938428756831817;

다음 시간 이동 쿼리는 특정 타임스탬프를 기반으로 테이블의 상태를 표시합니다.

SELECT * FROM iceberg.iceberg_db.iceberg_table FOR TIMESTAMP AS OF TIMESTAMP '2025-05-28 16:09:40.268 UTC'

Iceberg를 Trino와 함께 사용할 때 고려 사항

Iceberg 테이블의 Trino 쓰기 작업은 merge-on-read 설계를 따르므로 업데이트 또는 삭제의 영향을 받는 전체 데이터 파일을 다시 쓰는 대신 위치 삭제 파일을 생성합니다. copy-on-write 접근 방식을 사용하려면 쓰기 작업에 Spark를 사용하는 것이 좋습니다.