マテリアライズドビューへのストリーミング取り込み - Amazon Redshift

Amazon Redshift は、2025 年 11 月 1 日以降、新しい Python UDF の作成をサポートしなくなります。Python UDF を使用する場合は、その日付より前に UDF を作成してください。既存の Python UDF は引き続き通常どおり機能します。詳細については、ブログ記事を参照してください。

マテリアライズドビューへのストリーミング取り込み

このトピックでは、マテリアライズドビューを使用してストリーミングデータにすばやくアクセスする方法について説明します。

ストリーミング取り込みでは、Amazon Kinesis Data StreamsAmazon Managed Streaming for Apache Kafka から、Amazon Redshift でプロビジョニングされたビューや Amazon Redshift Serverless データベースへの、低レイテンシーかつ高速のデータインジェストを行います。データは、目的に合わせて設定された Redshift マテリアライズドビューに到達します。このため、外部データへのアクセスが高速化されます。ストリーミング取り込みは、データアクセス時間を短縮し、ストレージコストを削減します。ストリーミング取り込みは、SQL コマンドの小さなコレクションを使用して、Amazon Redshift クラスターまたは Amazon Redshift Serverless ワークグループ用に設定できます。設定後は、マテリアライズドビューの更新ごとに、毎秒数百メガバイトのデータを取り込むことができます。

ストリーミングサービスから Redshift へのデータフロー

これにより、ストリーミング取り込みの仕組みやプロセスで使用されるデータベースオブジェクトを理解しやすくなります。データは、データストリームプロバイダーから Amazon Redshift でプロビジョニングされたクラスターまたは Amazon Redshift Serverless ワークグループに直接流れます。Amazon S3 バケットなどの一時的な到着エリアはありません。プロビジョニングされたクラスターやワークグループは、ストリームコンシューマーです。Redshift データベースの場合、ストリームから読み取られたデータはマテリアライズドビューに到着します。データは到着時に処理されます。例えば、SQL を使用して JSON 値を消費し、マテリアライズドビューのデータ列にマッピングできます。マテリアライズドビューを更新すると、Redshift は、ストリームに伴ってビューが最新の状態になるまで、割り当てられた Kinesis データシャードまたは Kafka パーティションのデータを消費します。

Amazon Redshift ストリーミング取り込みのユースケースでは、データの継続的な生成と、生成時点からの短期間 (低レイテンシー) での処理が伴います。これは、一般的に、ほぼリアルタイムの分析と呼ばれます。ソースには、IT デバイス、システムテレメトリデバイス、ビジー状態のウェブサイトやアプリケーションからのクリックストリームデータなどが含まれます。

パフォーマンス改善に向けたデータ解析のベストプラクティス

ストリーミング取り込みを設定する場合、受信データを解析する方法について、いくつかのオプションがあります。プラクティスには、データの到着時にビジネスロジックやフォーマットを実行することが含まれます。エラーやデータ損失を避けるため、以下のベストプラクティスをお勧めします。ベストプラクティスは、内部テストに基づくものであり、設定や解析の問題のトラブルシューティングを行う際に役立ちます。

  • ストリーミングされたデータから値を抽出する — マテリアライズドビュー定義で JSON_EXTRACT_PATH_TEXT 関数を使用してストリーミング JSON を解析または細分化すると、パフォーマンスとレイテンシーに大きな影響を及ぼす可能性があります。具体的には、JSON_EXTRACT_PATH_TEXT を使用して抽出した列ごとに、着信 JSON が再解析されます。これに続けて、データ型の変換、フィルタリング、ビジネスロジックの計算が行われます。例えば、JSON データから 10 列を抽出すると、各 JSON レコードは 10 回解析されます (これには追加のロジックが含まれます)。その結果、取り込みのレイテンシーが長くなります。代わりに、JSON_PARSE 関数を使用して JSON レコードを Redshift の SUPER データ型に変換することをお勧めします。ストリーミングされたデータがマテリアライズドビューに到着したら、PartiQL を使用して JSON データの SUPER 表現から個々の文字列を抽出します。詳細については、「半構造化データのクエリ」を参照してください。

    さらに、JSON_EXTRACT_PATH_TEXT のデータサイズは最大 64 KB であることに注意してください。JSON レコードのサイズが 64 KB を超えると、JSON_EXTRACT_PATH_TEXT での処理エラーになります。

  • Amazon Kinesis Data Streams ストリームまたは Amazon MSK トピックを複数のマテリアライズドビューにマッピングする — 単一のストリームやトピックからデータを取り込むために、複数のマテリアライズドビューを作成することはお勧めしません。その理由として、各マテリアライズドビューは Kafka トピックの Kinesis Data Streams ストリームやパーティション内のシャードごとにコンシューマーを作成するためです。これにより、スロットリングや、ストリームまたはトピックのスループット超過が生じる場合があります。また、同じデータを複数回取り込むことになるため、コストが高くなる可能性もあります。ストリーミング取り込みを設定する場合、ストリームやトピックごとに 1 つのマテリアライズドビューを作成することをお勧めします。

    ユースケースで 1 つの KDS ストリームや MSK トピックから複数のマテリアライズドビューにデータを取り込む必要がある場合は、事前に AWS Big Data Blog の「Amazon MSK で Amazon Redshift ストリーミング取り込みを使用して、ほぼリアルタイムの分析を実装するためのベストプラクティス」を参照してください。

ストリーミング取り込みの動作とデータタイプ

次の表では、データタイプ別の技術的な動作の詳細とサイズ制限について説明します。ストリーミング取り込み用にマテリアライズドビューを設定する前に、以下を理解しておくことをお勧めします。

機能または動作 説明
Kafka トピックの長さ制限

Kafka トピックの名前は 128 文字 (引用符は含まない) を超えることはできません。詳細については、「名前と識別子」を参照してください。

マテリアライズドビューでの増分の更新と JOIN

マテリアライズドビューは、増分的な保守が可能である必要があります。Kinesis や Amazon MSK では、24 時間または 7 日前のストリームやトピックの履歴は保持されないため、完全な再計算は不可能です。Kinesis または Amazon MSK では、より長いデータ保持期間を設定することができます。ただし、これによりメンテナンスとコストが増える可能性があります。また、現在、Kinesis Streams や Amazon MSK トピックで作成されたマテリアライズドビューでは、JOIN の使用はサポートされていません。ストリームやトピックでマテリアライズドビューを作成した後、別のマテリアライズドビューを作成して、ストリーミングのマテリアライズドビューと他のマテリアライズドビュー、テーブル、またはビューとの結合のために使用できます。

詳細については、「REFRESH MATERIALIZED VIEW」を参照してください。

レコード解析

Amazon Redshift のストリーミング取り込みでは、Kinesis プロデューサーライブラリ (KPL の重要なコンセプト) によって集計されたレコードの解析をサポートしていません。集計されたレコードは取り込まれますが、バイナリプロトコルのバッファデータとして格納されます。(詳細については「Protocol buffers」(プロトコルバッファ) を参照してください。) Kinesis へのデータのプッシュ方法によっては、この機能の無効化が必要となる場合があります。

Kafka ヘッダーの重複した値

Amazon MSK、Confluent、または Apache Kafka から取得された Kafka トピックの Amazon Redshift ストリーミングコンシューマークライアントは、Kafka トピックヘッダーの重複した値をサポートしていません。

解凍

VARBYTE は解凍をサポートしていません。このため、圧縮データを含むレコードを Redshift でクエリすることはできません。データは、Kinesis ストリームや Amazon MSK トピックに追加する前に解凍してください。

レコードの最大サイズ

Amazon Redshift がストリーミングサービスから取り込むことができるレコードの最大サイズは 16,777,216 バイト (16 MiB) で、これは Amazon Redshift の VARBYTE データ型でサポートされる最大サイズです。ただし、Kinesis は最大レコードサイズ 1,048,576 バイト (1 MiB) のみをサポートします。Amazon MSK は、最大レコードサイズ 16,777,216 バイト (16 MiB) をサポートします。したがって、デフォルトで、Kinesis データストリームで作成された Amazon Redshift ストリーミングマテリアライズドビューは VARBYTE データ型列のサイズを 1,048,576 バイト (1 MiB) に設定し、Amazon MSK トピックで作成された Amazon Redshift ストリーミングマテリアライズドビューは VARBYTE データ列のサイズを 16,777,216 バイト (16 MiB) に設定します。

Kinesis のサイズ制限の詳細については、「Amazon Kinesis Data Streams 開発者ガイド」の「Quotas and limits」を参照してください。
エラーレコード

データが最大サイズを超えているためにレコードを Redshift に取り込めない場合、そのレコードはスキップされます。この場合でも、マテリアライズドビューの更新は成功し、各エラーレコードのセグメントが SYS_STREAM_SCAN_ERRORS システムテーブルに書き込まれます。計算のエラーやタイプ変換によるエラーなど、ビジネスロジックに起因するエラーはスキップされません。ロジックは、マテリアライズドビュー定義に追加する前に、入念にテストしてください。

Amazon MSK マルチ VPC プライベート接続

Amazon MSK マルチ VPC プライベート接続は、現在 Redshift ストリーミングの取り込みではサポートされていません。代わりに、VPC ピアリングを使用して VPC に接続するか、AWS Transit Gateway を使用しセントラルハブを介して VPC およびオンプレミスネットワークに接続することができます。これらのいずれかにより、Redshift は Amazon MSK クラスターまたは別の VPC にある Amazon MSK サーバーレスと通信できるようになります。

自動更新の使用とアクティブ化

マテリアライズドビューに対する自動更新クエリは、他のユーザーワークロードと同様に扱われます。自動更新は、ストリームが到達するとデータをロードします。

ストリーミング取り込み用に作成されたマテリアライズドビューでは、自動更新を明示的にオンにできます。これを行うには、マテリアライズドビュー定義で AUTO REFRESH を指定します。手動更新がデフォルトです。ストリーミング取り込み用の既存のマテリアライズドビューに自動更新を指定するには、ALTER MATERIALIZED VIEW を実行してオンにします。詳細については、「CREATE MATERIALIZED VIEW」または「ALTER MATERIALIZED VIEW」を参照してください。

ストリーミング取り込みと Amazon Redshift Serverless

プロビジョニングされたクラスターで Amazon Redshift ストリーミング取り込みに適用されるセットアップと設定の手順は、Amazon Redshift Serverless でのストリーミング取り込みにも適用されます。自動更新やその他のワークロードでストリーミング取り込みをサポートするには、必要なレベルの RPU を指定することが重要です。詳細については、「Amazon Redshift Serverless の料金」を参照してください。

Amazon MSK クラスターとは異なるアベイラビリティーゾーンの Amazon Redshift ノード

ストリーミングの取り込みを設定すると、Amazon MSK のラック認識が有効になっている場合、Amazon Redshift は同じアベイラビリティーゾーンの Amazon MSK クラスターへの接続を試みます。すべてのノードが Amazon Redshift クラスターとは異なるアベイラビリティーゾーンにある場合、アベイラビリティーゾーン間のデータ転送コストが発生する可能性があります。これを回避するには、Redshift のプロビジョニングされたクラスターまたはワークグループと同じ AZ に少なくとも 1 つの Amazon MSK ブローカークラスターノードを保持します。

更新の開始場所

マテリアライズドビューを作成すると、最初の更新は Kinesis ストリームの TRIM_HORIZON または Amazon MSK トピックのオフセット 0 から始まります。

データ形式

サポートされるデータ形式は、VARBYTE からの変換が可能なデータ形式に限られます。詳細については、「VARBYTE 型」および「VARBYTE 演算子」を参照してください。

テーブルへのレコードの追加

既存のソースマテリアライズドビューからターゲットテーブルに行を追加するには、ALTER TABLE APPEND を実行できます。これは、マテリアライズドビューがストリーミング取り込み用に設定されている場合にのみ機能します。詳細については、「ALTER TABLE APPEND」を参照してください。

ALTER TABLE APPEND オペレーションは、次のいずれかに接続された Amazon Redshift ストリーミングマテリアライズドビューで実行されると、排他的ロックを保持します。

  • 1 つの Amazon Kinesis Data Streams

  • 1 つの Amazon Managed Streaming for Apache Kafka トピック

  • Confluent Cloud Kafka トピックなど、サポートされている外部ストリーム

TRUNCATE または DELETE の実行

ストリーミング取り込みに使用するマテリアライズドビューからレコードを削除するには、以下を使用します。

  • TRUNCATE — ストリーミング取り込み用に設定されたマテリアライズドビューからすべての行を削除します。テーブルスキャンは行われません。詳細については、「TRUNCATE」を参照してください。

  • DELETE — ストリーミング取り込み用に設定されたマテリアライズドビューからすべての行を削除します。詳細については、「DELETE」を参照してください。

TRUNCATE および DELETE オペレーションは、次のいずれかに接続された Amazon Redshift ストリーミングマテリアライズドビューで実行されると、排他的ロックを保持します。

  • 1 つの Amazon Kinesis Data Streams

  • 1 つの Amazon Managed Streaming for Apache Kafka トピック

  • Confluent Cloud Kafka トピックなど、サポートされている外部ストリーム

小文字以外の識別子

小文字以外の識別子を含む Amazon Managed Streaming for Apache Kafka トピックまたは Kinesis Data Streams でストリーミングマテリアライズドビューを作成すると、自動更新が失敗する可能性があります。これを解決するには、次のいずれかを実行します。

  • enable_case_sensitive_identifier パラメータを true に設定して手動更新を使用する

  • データベースまたはクラスターレベルで enable_case_sensitive_identifiertrue に設定して自動更新を有効にする

注記

ユーザーレベルで enable_case_sensitive_identifier を設定することは、自動更新には不十分ですが、手動更新では機能します。

大文字と小文字を区別する識別子の詳細については、「enable_case_sensitive_identifier」を参照してください。

べき等性

Amazon Redshift は、ストリーミングソースからデータを取り込むときに各レコードが 1 回だけ処理されることを保証します。この保証は、Amazon Kinesis (ストリーム、シャード、シーケンス番号識別子を使用) と Apache Kafka (トピック、パーティション、オフセット識別子を使用) の 2 種類のソースに適用されます。これには、Amazon Managed Streaming for Apache Kafka (Amazon MSK) と Confluent Cloud が含まれます。