翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Apache Spark を使用した Apache Iceberg テーブルの操作
このセクションでは、Apache Spark を使用して Iceberg テーブルを操作する方法の概要を説明します。例は、Amazon EMR または で実行できる定型コードです AWS Glue。
注: Iceberg テーブルを操作するためのプライマリインターフェイスは SQL であるため、ほとんどの例では Spark SQL と DataFrames API が組み合わされます。
Iceberg テーブルの作成と書き込み
Spark SQL と Spark DataFrames を使用して、Iceberg テーブルにデータを作成して追加できます。
Spark SQL の使用
Iceberg データセットを記述するには、 CREATE TABLE
や などの標準の Spark SQL ステートメントを使用しますINSERT INTO
。
パーティション化されていないテーブル
Spark SQL を使用してパーティション分割されていない Iceberg テーブルを作成する例を次に示します。
spark.sql(f""" CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions ( c_customer_sk int, c_customer_id string, c_first_name string, c_last_name string, c_birth_country string, c_email_address string) USING iceberg OPTIONS ('format-version'='2') """)
パーティション化されていないテーブルにデータを挿入するには、標準INSERT INTO
ステートメントを使用します。
spark.sql(f""" INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address FROM another_table """)
パーティションテーブル
Spark SQL を使用してパーティション化された Iceberg テーブルを作成する例を次に示します。
spark.sql(f""" CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions ( c_customer_sk int, c_customer_id string, c_first_name string, c_last_name string, c_birth_country string, c_email_address string) USING iceberg PARTITIONED BY (c_birth_country) OPTIONS ('format-version'='2') """)
Spark SQL を使用してパーティション化された Iceberg テーブルにデータを挿入するには、標準INSERT INTO
ステートメントを使用します。
spark.sql(f""" INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address FROM another_table """)
注記
Iceberg 1.5.0 以降、パーティションテーブルにデータを挿入する場合、hash
書き込み分散モードがデフォルトになります。詳細については、Iceberg ドキュメントの「Writing Distribution Modes
DataFrames API の使用
Iceberg データセットを記述するには、 DataFrameWriterV2
API を使用できます。
Iceberg テーブルを作成し、そのテーブルにデータを書き込むには、df.writeTo(
t) 関数を使用します。テーブルが存在する場合は、 .append()
関数を使用します。そうでない場合は、.create().
次の例で を使用します。これは .createOrReplace()
.create()
に相当する のバリエーションですCREATE OR REPLACE TABLE AS SELECT
。
パーティション化されていないテーブル
DataFrameWriterV2
API を使用してパーティション分割されていない Iceberg テーブルを作成して入力するには:
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \ .tableProperty("format-version", "2") \ .createOrReplace()
DataFrameWriterV2
API を使用して、パーティション分割されていない既存の Iceberg テーブルにデータを挿入するには:
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \ .append()
パーティションテーブル
DataFrameWriterV2
API を使用してパーティション化された Iceberg テーブルを作成して入力するには:
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \ .tableProperty("format-version", "2") \ .partitionedBy("c_birth_country") \ .createOrReplace()
DataFrameWriterV2
API を使用してパーティション化された Iceberg テーブルにデータを挿入するには:
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \ .append()
Iceberg テーブルのデータの更新
次の例は、Iceberg テーブルのデータを更新する方法を示しています。この例では、c_customer_sk
列に偶数を持つすべての行を変更します。
spark.sql(f""" UPDATE {CATALOG_NAME}.{db.name}.{table.name} SET c_email_address = 'even_row' WHERE c_customer_sk % 2 == 0 """)
このオペレーションはデフォルトのcopy-on-write戦略を使用するため、影響を受けるすべてのデータファイルを書き換えます。
Iceberg テーブルのデータの更新
データの更新とは、新しいデータレコードを挿入し、既存のデータレコードを 1 回のトランザクションで更新することです。データを Iceberg テーブルにアップサートするには、 SQL MERGE INTO
ステートメントを使用します。
次の例では、テーブル 内のテーブル {UPSERT_TABLE_NAME
} の内容をアップサートします{TABLE_NAME}
。
spark.sql(f""" MERGE INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} t USING {UPSERT_TABLE_NAME} s ON t.c_customer_id = s.c_customer_id WHEN MATCHED THEN UPDATE SET t.c_email_address = s.c_email_address WHEN NOT MATCHED THEN INSERT * """)
-
にある顧客レコードが同じ
{TABLE_NAME}
に{UPSERT_TABLE_NAME}
既に存在する場合c_customer_id
、{UPSERT_TABLE_NAME}
レコードc_email_address
値は既存の値を上書きします (更新オペレーション)。 -
にある顧客レコード
{UPSERT_TABLE_NAME}
が に存在しない場合{TABLE_NAME}
、{UPSERT_TABLE_NAME}
レコードは{TABLE_NAME}
(オペレーションを挿入) に追加されます。
Iceberg テーブル内のデータの削除
Iceberg テーブルからデータを削除するには、 DELETE FROM
式を使用して、削除する行に一致するフィルターを指定します。
spark.sql(f""" DELETE FROM {CATALOG_NAME}.{db.name}.{table.name} WHERE c_customer_sk % 2 != 0 """)
フィルターがパーティション全体と一致する場合、Iceberg はメタデータのみの削除を実行し、データファイルを配置したままにします。それ以外の場合は、影響を受けるデータファイルのみを書き換えます。
delete メソッドは、 WHERE
句の影響を受けるデータファイルを取得し、削除されたレコードなしでそれらのコピーを作成します。次に、新しいデータファイルを指す新しいテーブルスナップショットを作成します。したがって、削除されたレコードはテーブルの古いスナップショットにまだ存在します。たとえば、テーブルの前のスナップショットを取得すると、先ほど削除したデータが表示されます。クリーンアップの目的で関連データファイルを使用して不要な古いスナップショットを削除する方法については、このガイドの後半の「圧縮を使用してファイルを維持する」セクションを参照してください。
データの読み込み
Spark SQL と DataFrames の両方で、Spark の Iceberg テーブルの最新ステータスを読み取ることができます。
Spark SQL の使用例:
spark.sql(f""" SELECT * FROM {CATALOG_NAME}.{db.name}.{table.name} LIMIT 5 """)
DataFrames API の使用例:
df = spark.table(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}").limit(5)
タイムトラベルの使用
Iceberg テーブルの各書き込みオペレーション (挿入、更新、アップサート、削除) は、新しいスナップショットを作成します。その後、これらのスナップショットをタイムトラベルに使用できます。タイムトラベルをさかのぼって、過去のテーブルのステータスを確認できます。
snapshot-id
およびタイミング値を使用してテーブルのスナップショットの履歴を取得する方法については、このガイドの後半にある「メタデータへのアクセス」セクションを参照してください。
次のタイムトラベルクエリは、特定の に基づいてテーブルのステータスを表示しますsnapshot-id
。
Spark SQL の使用:
spark.sql(f""" SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} VERSION AS OF {snapshot_id} """)
DataFrames API の使用:
df_1st_snapshot_id = spark.read.option("snapshot-id", snapshot_id) \ .format("iceberg") \ .load(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}") \ .limit(5)
次のタイムトラベルクエリは、特定のタイムスタンプより前に作成された最後のスナップショットに基づいて、テーブルのステータスをミリ秒 () 単位で表示しますas-of-timestamp
。
Spark SQL の使用:
spark.sql(f""" SELECT * FROM dev.{db.name}.{table.name} TIMESTAMP AS OF '{snapshot_ts}' """)
DataFrames API の使用:
df_1st_snapshot_ts = spark.read.option("as-of-timestamp", snapshot_ts) \ .format("iceberg") \ .load(f"dev.{DB_NAME}.{TABLE_NAME}") \ .limit(5)
増分クエリの使用
Iceberg スナップショットを使用して、追加したデータを段階的に読み取ることもできます。
注: 現在、このオペレーションはappend
スナップショットからのデータの読み取りをサポートしています。replace
、、 overwrite
などのオペレーションからのデータの取得はサポートされていませんdelete
。 さらに、増分読み取りオペレーションは Spark SQL 構文ではサポートされていません。
次の の例では、スナップショット start-snapshot-id
(排他的) と end-snapshot-id
(包括的) の間に Iceberg テーブルに追加されたすべてのレコードを取得します。
df_incremental = (spark.read.format("iceberg") .option("start-snapshot-id", snapshot_id_start) .option("end-snapshot-id", snapshot_id_end) .load(f"glue_catalog.{DB_NAME}.{TABLE_NAME}") )
メタデータへのアクセス
Iceberg は SQL を介してメタデータへのアクセスを提供します。名前空間 をクエリすることで、任意のテーブル (<table_name>
) のメタデータにアクセスできます<table_name>.<metadata_table>
。メタデータテーブルの完全なリストについては、Iceberg ドキュメントの「テーブルの検査
次の例は、Iceberg テーブルのコミット (変更) の履歴を示す Iceberg 履歴メタデータテーブルにアクセスする方法を示しています。
Amazon EMR Studio ノートブックからの Spark SQL (%%sql
マジックを使用) の使用:
Spark.sql(f""" SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history LIMIT 5 """)
DataFrames API の使用:
spark.read.format("iceberg").load("{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history").show(5,False)
サンプル出力:
