Firehose ストリームを設定する
Apache Iceberg テーブルを送信先として Firehose ストリームを作成するには、次を設定する必要があります。
注記
S3 テーブルバケット内のテーブルに配信するための Firehose ストリームの設定は、Amazon S3 の Apache Iceberg テーブルと同じです。
ソースと宛先を設定する
Apache Iceberg テーブルにデータを配信するには、ストリームのソースを選択します。
ストリームのソースを設定するには、「Configure source settings」を参照してください。
次に、宛先として [Apache Iceberg テーブル] を選択し、Firehose ストリーム名を指定します。
データ変換を設定する
着信ストリーム内のレコードの追加や変更など、データに対してカスタム変換を実行するには、Firehose ストリームに Lambda 関数を追加できます。Firehose ストリームでの Lambda を使用したデータ変換の詳細については、「Amazon Data Firehose でソースデータを変換する」を参照してください。
Apache Iceberg テーブルでは、着信レコードを異なる送信先テーブルにルーティングする方法と、実行するオペレーションを指定する必要があります。必要なルーティング情報を Firehose に提供する方法の 1 つは、Lambda 関数を使用することです。
詳細については、「Route records to different Iceberg tables」を参照してください。
データカタログを接続する
Apache Iceberg には、Apache Iceberg テーブルに書き込むデータカタログが必要です。Firehose は、Apache Iceberg テーブルのために AWS Glue Data Catalog と統合します。
Firehose ストリームと同じアカウント、またはクロスアカウント、および Firehose ストリームと同じリージョン (デフォルト)、または別のリージョンで AWS Glue Data Catalog を使用できます。
Amazon S3 テーブルに配信している場合、およびコンソールを使用して Firehose ストリームを設定している場合は、Amazon S3 テーブルカタログに対応するカタログを選択します。CLI を使用して Firehose ストリームを設定している場合は、CatalogConfiguration 入力で arn:aws:glue:<region>:<account-id>:catalog/s3tablescatalog/<s3
table bucket name> という形式の CatalogARN を使用します。詳細については、「Setting up a Firehose stream to Amazon S3 tables」を参照してください。
注記
Firehose は Iceberg テーブルに対して、挿入、更新、削除の 3 種類のオペレーションをサポートしています。オペレーションが指定されていない場合、Firehose はデフォルトで挿入を実行し、各着信レコードを新しい行として追加し、重複を保持します。既存のレコードを変更するには、プライマリキーを使用して既存の行を検索および変更する「更新」オペレーションを指定します。
例:
デフォルト (挿入): 同一のカスタマーレコードが複数存在すると、重複行が作成されます。
指定された更新: 新しいカスタマーアドレスによって既存のレコードが更新されます。
JQ 式を設定する
Apache Iceberg テーブルでは、着信レコードを異なる宛先テーブルにルーティングする方法と、挿入、更新、削除などの実行するオペレーションを指定する必要があります。これを実行するには、Firehose のために JQ 式を設定して、必要な情報を解析および取得します。詳細については、「JSONQuery 式を使用して Firehose にルーティング情報を提供する」を参照してください。
一意のキーを設定する
複数のテーブルを使用した更新と削除 – 一意のキーは、ソースレコード内の 1 つ以上のフィールドであり、Apache Iceberg テーブルの行を一意に識別します。複数のテーブルを含むシナリオのみを挿入する場合は、一意のキーを設定する必要はありません。特定のテーブルで更新と削除を実行する場合は、必要なテーブルのために一意のキーを設定する必要があります。テーブルの行が欠落している場合、更新によって行が自動的に挿入されることに留意してください。テーブルが 1 つしかない場合は、一意のキーを設定できます。更新オペレーションの場合、Firehose は削除ファイルを配置し、その後に挿入を実行します。
Firehose ストリーム作成の一環としてテーブルごとに一意のキーを設定するか、または create tableidentifier-field-ids をチェックし、それらを一意のキーとして使用します。両方が設定されていない場合、更新オペレーションと削除オペレーションを使用したデータの配信は失敗します。
このセクションを設定するには、データを更新または削除するテーブルのデータベース名、テーブル名、一意のキーを入力します。設定内の各テーブルのためにのみエントリを持つことができます。追加のみのシナリオでは、このセクションを設定する必要はありません。オプションで、次の例に示すように、テーブルからのデータが配信に失敗した場合に備えてエラーバケットプレフィックスを指定することもできます。
[ { "DestinationDatabaseName": "MySampleDatabase", "DestinationTableName": "MySampleTable", "UniqueKeys": [ "COLUMN_PLACEHOLDER" ], "S3ErrorOutputPrefix": "OPTIONAL_PREFIX_PLACEHOLDER" } ]
指定された列名がテーブル全体で一意である場合、Firehose は一意のキーの設定をサポートします。ただし、完全修飾列名は一意のキーとしてサポートされていません。例えば、top._id という名前のキーは、列名 _id が最上位レベルにも存在する場合、一意のキーとは見なされません。_id がテーブル全体で一意である場合、テーブル構造内の場所に関係なく使用されます。つまり、最上位列であるか、ネストされた列であるかは関係ありません。次の例では、列名がスキーマ全体で一意であるため、_id はスキーマの有効な一意のキーです。
[ "schema": { "type": "struct", "fields": [ { "name": "top", "type": { "type": "struct", "fields": [ { "name": "_id", "type": "string" }, { "name": "name", "type": "string" } ] } }, { "name": "user", "type": "string" } ] } ]
次の例では、_id は最上位列とネストされた構造体の両方で使用されるため、スキーマの有効な一意のキーではありません。
[ "schema": { "type": "struct", "fields": [ { "name": "top", "type": { "type": "struct", "fields": [ { "name": "_id", "type": "string" }, { "name": "name", "type": "string" } ] } }, { "name": "_id", "type": "string" } ] } ]
再試行期間を指定する
この設定を使用して、Amazon S3 の Apache Iceberg テーブルへの書き込みで障害が発生した場合に Firehose が再試行を試みる期間 (秒) を指定できます。再試行を実行するには、0~7,200 秒の任意の値を設定できます。デフォルトでは、Firehose は 300 秒間再試行します。
失敗した配信または処理に対応する
再試行期間の経過後にストリームの処理または配信に障害が発生した場合に備えて、レコードを S3 バックアップバケットに配信するように Firehose を設定する必要があります。これを実行するには、コンソールの [バックアップ設定] から、S3 バックアップバケットと S3 バックアップバケットのエラー出力プレフィックスを設定します。
エラー処理
Firehose は、すべての配信エラーを CloudWatch Logs と Amazon S3 エラーバケットに送信します。
エラーのリスト:
|
エラーメッセージ |
説明 |
|---|---|
|
|
Firehose は存在しないテーブルに書き込んでいるか、テーブルが V2 形式ではありません。Firehose は V1 形式のテーブルをサポートしていません。 |
|
|
null または空のテーブル名が渡されたか、テーブルが V2 形式ではありません。Firehose は V1 形式のテーブルをサポートしていません。 |
|
|
前提条件のステップで作成された IAM ロールに必要なアクセス許可と信頼ポリシーがあることを確認します。 |
|
|
前提条件のステップで作成された IAM ロールに必要なアクセス許可と信頼ポリシーがあることを確認します。 |
バッファリングのヒントを設定する
Firehose は、着信ストリーミングデータを、特定のサイズ (バッファリングサイズ)、および特定の期間 (バッファリング間隔) にわたって、メモリにバッファリングしてから、Apache Iceberg テーブルに配信します。バッファリングサイズ (1~128 MB) およびバッファリング間隔 (0~900 秒) を選択できます。バッファリングのヒントが大きいほど、S3 書き込みの回数が少なくなり、データファイルがより大きいために圧縮コストが低減され、クエリランタイムはより高速になります。ただし、レイテンシーは高くなります。バッファリングのヒントの値が小さいと、レイテンシーが低くなります。
詳細設定の設定
Apache Iceberg テーブルに対して、サーバー側の暗号化、エラーログ記録、アクセス許可、およびタグを設定できます。詳細については、「詳細設定の設定」を参照してください。Apache Iceberg テーブルを宛先として使用するための前提条件 の一部として作成した IAM ロールを追加する必要があります。Firehose は、AWS Glue テーブルにアクセスして Amazon S3 バケットに書き込むロールを引き受けます。
Firehose ストリームの作成が完了するまでに数分かかる場合があります。Firehose ストリームが正常に作成されたら、そのストリームへのデータの取り込みを開始し、Apache Iceberg テーブルにデータを表示できます。