

 Amazon Redshift は、パッチ 198 以降、新しい Python UDF の作成をサポートしなくなります。既存の Python UDF は、2026 年 6 月 30 日まで引き続き機能します。詳細については、[ブログ記事](https://aws.amazon.com/blogs/big-data/amazon-redshift-python-user-defined-functions-will-reach-end-of-support-after-june-30-2026/)を参照してください。

# Amazon Kinesis Data Streams からストリーミング取り込みを開始する方法
<a name="materialized-view-streaming-ingestion-getting-started"></a>

このトピックでは、マテリアライズドビューを使用して Kinesis Data Streams からストリーミングデータを使用する方法について説明します。

 Amazon Redshift ストリーミング取り込みを設定する際は、外部スキーマを作成しストリーミングのデータソースにマッピングした上で、その外部スキーマを参照するマテリアライズドビューを作成します。Amazon Redshift のストリーミング取り込みでは、データソースとして Kinesis Data Streams をサポートします。そのため、ストリーミング取り込みを設定する前に、Kinesis Data Streams ソースを使用可能な状態にしておく必要があります。ソースがない場合は、Kinesis ドキュメント「[Amazon Kinesis Data Streams の開始方法](https://docs.aws.amazon.com/streams/latest/dev/getting-started.html)」の手順に従います。または「[AWS マネジメントコンソールを介してのストリームの作成](https://docs.aws.amazon.com/streams/latest/dev/how-do-i-create-a-stream.html)」の手順に従って、コンソールからソースを作成します。

 Amazon Redshift のストリーミング取り込みではマテリアライズドビューが使用されます。マテリアライズドビューは、`REFRESH`実行時にストリームにより直接更新されます。マテリアライズドビューは、ストリームのデータソースにマッピングされています。マテリアライズドビューの定義を行う際には、ストリームデータをフィルタリングしたり集計したりできます。ストリーミング取り込みのマテリアライズドビュー (*基盤の*マテリアライズドビュー) は 1 つのストリームのみを参照します。ただし、追加のマテリアライズドビューを作成して、それを基盤または別のマテリアライズドビュー、あるいはテーブルと結合させることができます。

**注記**  
*ストリーミング取り込みと Amazon Redshift Serverless* - このトピックの設定手順は Amazon Redshift クラスターおよび Amazon Redshift Serverless の両方に適用されます。詳細については、「[ストリーミング取り込みの動作とデータタイプ](materialized-view-streaming-ingestion.md#materialized-view-streaming-ingestion-limitations)」を参照してください。

Kinesis Data Streams のストリームが利用可能な場合の最初のステップは、`CREATE EXTERNAL SCHEMA` を使用して Amazon Redshift 内にスキーマを定義することです。その上で、Kinesis Data Streams リソースを参照します。その後、ストリーム内のデータにアクセスするために、マテリアライズドビュー内で `STREAM` を定義します。ストリームレコードは半構造的な `SUPER` 形式で保存できます。あるいは、Redshift データ型に変換されたデータを出力するスキーマを定義することができます。マテリアライズドビューをクエリすると、返されるレコードにはその時点のストリームが反映されます。

1. Amazon Redshift クラスターまたは Amazon Redshift Serverless ワークグループがロールを引き受けることを許可する信頼ポリシーを持つ IAM ロールを作成します。IAM ロール向けに信頼ポリシーを設定する方法については、「[ユーザーに代わって Amazon Redshift が他の AWS サービスにアクセスすることを許可する](https://docs.aws.amazon.com/redshift/latest/mgmt/authorizing-redshift-service.html)」を参照してください。作成されたロールには次の IAM ポリシーが設定されており、これにより、Amazon Kinesis のデータストリームとの通信に関するアクセス許可が提供されます。

   **Kinesis データストリームからの暗号化されていないストリームの IAM ポリシー**

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadStream",
               "Effect": "Allow",
               "Action": [
                   "kinesis:DescribeStreamSummary",
                   "kinesis:GetShardIterator",
                   "kinesis:GetRecords",
                   "kinesis:ListShards",
                   "kinesis:DescribeStream"
               ],
               "Resource": "arn:aws:kinesis:*:111122223333:stream/*"
           },
           {
               "Sid": "ListStream",
               "Effect": "Allow",
               "Action": "kinesis:ListStreams",
               "Resource": "*"
           }
       ]
   }
   ```

------

   **Kinesis データストリームからの暗号化されたストリームの IAM ポリシー**

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadStream",
               "Effect": "Allow",
               "Action": [
                   "kinesis:DescribeStreamSummary",
                   "kinesis:GetShardIterator",
                   "kinesis:GetRecords",
                   "kinesis:ListShards",
                   "kinesis:DescribeStream"
               ],
               "Resource": "arn:aws:kinesis:*:111122223333:stream/*"
           },
           {
               "Sid": "DecryptStream",
               "Effect": "Allow",
               "Action": [
                   "kms:Decrypt"
               ],
               "Resource": "arn:aws:kms:us-east-1:111122223333:key/1234abcd-12ab-34cd-56ef-1234567890ab"
           },
           {
               "Sid": "ListStream",
               "Effect": "Allow",
               "Action": "kinesis:ListStreams",
               "Resource": "*"
           }
       ]
   }
   ```

------

1. VPC をチェックして、Amazon Redshift クラスターまたは Amazon Redshift Serverless に NAT ゲートウェイまたはインターネットゲートウェイを使用して、インターネット経由で Kinesis Data Streams エンドポイントに到達するルートがあることを確認します。Redshift と Kinesis Data Streams 間のトラフィックを AWS ネットワーク内に残しておきたい場合は、Kinesis インターフェイス VPC エンドポイントの使用を検討してください。詳細については、「[Using Amazon Kinesis Data Streams Kinesis Data Streams with Interface VPC Endpoints](https://docs.aws.amazon.com/streams/latest/dev/vpc.html)」(Amazon Kinesis Data Streamsでのインターフェイス VPC エンドポイントの使用) を参照してください。

1. Amazon Redshift で、Kinesis からのデータをスキーマにマッピングするために、外部スキーマを作成します。

   ```
   CREATE EXTERNAL SCHEMA kds
   FROM KINESIS
   IAM_ROLE { default | 'iam-role-arn' };
   ```

    Kinesis Data Streams のストリーミング取り込みには認証タイプは必要ありません。`CREATE EXTERNAL SCHEMA` ステートメントで定義されている IAM ロールを使用して Kinesis Data Streams リクエストを行います。

    オプション: REGION キーワードを使用して、Amazon Kinesis Data Streams または Amazon MSK ストリームが存在するリージョンを指定します。

   ```
   CREATE EXTERNAL SCHEMA kds
   FROM KINESIS
   REGION 'us-west-2'
   IAM_ROLE { default | 'iam-role-arn' };
   ```

   このサンプルでは、リージョンがソースストリームの場所を指定します。IAM\$1ROLE はサンプルです。

1. ストリームデータを利用するためのマテリアライズドビューを作成します。次のようなステートメントの場合、レコードを解析できないと、エラーが発生します。エラーレコードをスキップしない場合は、このようなコマンドを使用します。

   ```
   CREATE MATERIALIZED VIEW my_view AUTO REFRESH YES AS
   SELECT *
   FROM kds.my_stream_name;
   ```

   Kinesis のストリームの名前では大文字と小文字が区別され、その両方を使用することができます。名前が大文字のストリームからインジェストするには、データベースレベルで設定 `enable_case_sensitive_identifier` を `true` に指定できます。詳細については、「[名前と識別子](https://docs.aws.amazon.com/redshift/latest/dg/r_names.html)」ならびに「[enable\$1case\$1sensitive\$1identifier](https://docs.aws.amazon.com/redshift/latest/dg/r_enable_case_sensitive_identifier.html)」を参照してください。

   自動更新をオンにするには、`AUTO REFRESH YES` を使用してください。デフォルトの動作は手動更新です。CAN\$1JSON\$1PARSE を使用する場合、解析できないレコードはスキップされる可能性があることに注意してください。

   メタデータ列には次の列が含まれます。    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/redshift/latest/dg/materialized-view-streaming-ingestion-getting-started.html)

   マテリアライズドビュー定義内のビジネスロジックによっては、ビジネスロジックのエラーに伴ってストリーミング取り込みがブロックされることに注意してください。場合によっては、マテリアライズドビューを削除して再作成しなければならないことがあります。これを回避するには、ロジックをできるだけシンプルにし、ビジネスロジックのチェックのほとんどをインジェスト後のデータに実行することをお勧めします。

1. ビューを更新します。これにより、Redshift を呼び出してストリームから読み取り、データをマテリアライズドビューにロードします。

   ```
   REFRESH MATERIALIZED VIEW my_view;
   ```

1. マテリアライズドビュー内でデータをクエリします。

   ```
   select * from my_view;
   ```