Snowflake Snowpipe、Amazon S3、Amazon SNS、Amazon Data Firehose を使用して Snowflake データベースへのデータストリームの取り込みを自動化します - AWS 規範ガイダンス

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Snowflake Snowpipe、Amazon S3、Amazon SNS、Amazon Data Firehose を使用して Snowflake データベースへのデータストリームの取り込みを自動化します

Bikash Chandra Rout (Amazon Web Services)

概要

このパターンは、Amazon Web Services (AWS) クラウド上のサービスを使用して、連続したデータストリームを処理し、Snowflake データベースにロードする方法を説明しています。このパターンでは、Amazon Data Firehose を使用して Amazon Simple Storage Service (Amazon S3) にデータを配信し、新しいデータを受信したときに Amazon Simple Notification Service (Amazon SNS) を使用して通知を送信し、Snowflake Snowpipe を使用して Snowflake データベースにデータをロードします。

このパターンに従うことで、継続的に生成されるデータを数秒で分析できるようになり、手動の COPY コマンドを複数回実行する必要がなくなり、ロード時の半構造化データを完全にサポートできます。

前提条件と制限事項

前提条件

  • アクティブ AWS アカウント。

  • Firehose 配信ストリームに継続的にデータを送信する、データソース。

  • Firehose 配信ストリームからデータを受信する、既存の S3 バケット。

  • アクティブな Snowflake アカウント。

制限事項

  • Snowflake Snowpipe は、Firehose には直接接続しません。

アーキテクチャ

Firehose によって取り込まれたデータは、Amazon S3、Amazon SNS、Snowflake Snowpipe、Snowflake DB に送信されます。

テクノロジースタック

  • Amazon Data Firehose

  • Amazon SNS

  • Amazon S3

  • Snowflake Snowflake Snowpipe

  • Snowflake データレイク

ツール

  • Amazon Data Firehose は、Amazon S3、Amazon Redshift、Amazon OpenSearch Service、Splunk、カスタム HTTP エンドポイント、またはサポートされているサードパーティーのサービスプロバイダーが所有する HTTP エンドポイントなどの宛先にリアルタイムのストリーミングデータを配信するためのフルマネージドサービスです。

  • Amazon Simple Storage Service (Amazon S3) は、インターネット用のストレージです。

  • Amazon Simple Notification Service (Amazon SNS) は、サブスクライブしているエンドポイントやクライアントへのメッセージ配信や送信を調整および管理します。

  • Snowflake – Snowflakeは、Software as a Service (SaaS) として提供される分析データウェアハウスです。

  • Snowflake Snowpipe — Snowpipe は、スノーフレークの段階でファイルが利用可能になるとすぐにファイルからデータをロードします。

エピック

タスク説明必要なスキル

Snowflake で CSV ファイルの作成

Snowflake にサインインし、CREATE FILE FORMAT コマンドを実行して、指定されたフィールド区切り文字の CSV ファイルを作成します。このコマンドやその他 Snowflake コマンドの詳細については、追加情報のセクションを参照してください。

開発者

外部の Snowflake ステージを作成します。

CREATE STAGE コマンドを実行して、前に作成した CSV ファイルを参照する外部の Snowflake ステージを作成します。重要: S3 バケットの URL、 AWS アクセスキー、シー AWS クレットアクセスキーが必要です。SHOW STAGES コマンドを実行して、Snowflake ステージが作成されていることを確認します。

開発者

Snowflake ターゲットテーブルを作成します。

CREATE TABLE コマンドを実行してスノーフレークテーブルを作成します。

開発者

パイプを作成します。

CREATE PIPE コマンドを実行します。auto_ingest=true がこのコマンドに含まれていることを確認します。SHOW PIPES コマンドを実行して、このパイプが作成されていることを確認します。notification_channel 列の値をコピーして保存します。この値は Amazon S3 イベント通知の設定に使用されます。

開発者
タスク説明必要なスキル

S3 バケットの 30 日間のライフサイクルポリシーを作成します。

にサインイン AWS マネジメントコンソール し、Amazon S3 コンソールを開きます。Firehose からのデータを含む S3 バケットを選択します。次に S3 バケットの [管理] タブを選択し、[ライフサイクルルールを追加] を選択します。[ライフサイクルルール] ダイアログボックスにルールの名前を入力し、バケットの 30 日間のライフサイクルルールを設定します。このストーリーやその他のストーリーに関するヘルプは、関連リソースのセクションを参照してください。

システム管理者、開発者

S3 バケット用の IAM ポリシーを作成します。

AWS Identity and Access Management (IAM) コンソールを開き、ポリシーを選択します。[Create policy] (ポリシーの作成) を選択し、[JSON] タブを選択します。追加情報のセクションにあるポリシーをコピーして、JSON フィールドに貼り付けます。このポリシーは、PutObjectDeleteObject のアクセス許可に加え、GetObjectGetObjectVersionListBucket のアクセス許可を付与します。[ポリシーの確認] を選択し、ポリシーの名前を入力して、[ポリシーの作成] を選択します。

システム管理者、開発者

ポリシーを IAM ロールにアサインします。

IAM コンソールで [ロール][ロールの作成] の順に選択します。[別の AWS アカウント] を信頼されたエンティティとして選択します。 AWS アカウント ID を入力し、外部 ID を要求するを選択します。後で変更するプレースホルダー ID を入力します。[次へ] を選択し、前に作成した IAM ポリシーを割り当てます。IAM ロールを作成します。

システム管理者、開発者

IAM ロールの Amazon リソースネーム (ARN) をメモします。

IAM コンソールを開き、[ロール] を選択します。前に作成した IAM ロールを選択し、[ロール ARN] をコピーして保存します。

システム管理者、開発者
タスク説明必要なスキル

Snowflake でクラウドストレージの統合を作成します。

Snowflake にサインインし、CREATE STORAGE INTEGRATION コマンドを実行します。これにより、信頼関係が変更され、Snowflakeへのアクセス権が付与され、Snowflakeステージの外部 ID が提供されます。

システム管理者、開発者

Snowflake アカウントの IAM ユーザーを取得します。

DESC INTEGRATION コマンドを実行して IAM ロールの ARN を取得します。

重要

<integration_ name> は、前に作成した Snowflake のストレージ統合の名前です。

システム管理者、開発者

2 つの列の値を記録します。

storage_aws_iam_user_arn 列と storage_aws_external_id 列の値をコピーして保存します。

システム管理者、開発者
タスク説明必要なスキル

IAM ロールポリシーを変更します。

IAM コンソールを開き、[Roles] (ロール) を選択します。前に作成した IAM ロールを選択し、[信頼関係] タブを選択します。[Edit trust relationship (信頼関係の編集)] を選択します。snowflake_external_id を、前にコピーした storage_aws_external_id の値に置き換えます。snowflake_user_arn を、前にコピーした storage_aws_iam_user_arn の値に置き換えます。次に、[信頼ポリシーの更新] を選択します。

システム管理者、開発者
タスク説明必要なスキル

S3 バケットのイベント通知をオンにします。

Amazon S3 コンソールを開き、 バケットを見つけます。[プロパティ] を選択し、[詳細設定][イベント] を選択します。[通知を追加] を選択し、このイベントの名前を入力します。名前を入力しない場合は、グローバル一意識別子 (GUID) が使用されます。

システム管理者、開発者

通知用に Amazon SNS トピックを設定する

[イベント][ObjectCreate (All)] を選択し、[送信先] ドロップダウンリストから [SQS キュー] を選択します。[SNS] リストで [SQS キューの ARN を追加] を選択し、前にコピーした notification_channel 値を貼り付けます。次に、[Save (保存)] を選択します。

システム管理者、開発者

Snowflake SQS キューを SNS トピックにサブスクライブする。

Snowflake SQS キューを作成した SNS トピックにサブスクライブします。このステップに関するヘルプは、関連リソースのセクションを参照してください。

システム管理者、開発者
タスク説明必要なスキル

Snowpipeをチェックしてテストしてください。

Snowflakeにサインインし、スノーフレークステージを開きます。S3 バケットにファイルをドロップし、Snowflake テーブルに読み込まれるかどうかを確認します。S3 バケットに新しいオブジェクトが表示されると、Amazon S3 は SNS 通知を Snowpipe に送信します。

システム管理者、開発者

関連リソース

追加情報

ファイル形式の作成:

CREATE FILE FORMAT <name> TYPE = 'CSV' FIELD_DELIMITER = '|' SKIP_HEADER = 1;

外部ステージの作成:

externalStageParams (for Amazon S3) ::= URL = 's3://[//]' [ { STORAGE_INTEGRATION = } | { CREDENTIALS = ( { { AWS_KEY_ID = `` AWS_SECRET_KEY = `` [ AWS_TOKEN = `` ] } | AWS_ROLE = `` } ) ) }` ] [ ENCRYPTION = ( [ TYPE = 'AWS_CSE' ] [ MASTER_KEY = '' ] | [ TYPE = 'AWS_SSE_S3' ] | [ TYPE = 'AWS_SSE_KMS' [ KMS_KEY_ID = '' ] | [ TYPE = NONE ] )

テーブルの作成:

CREATE [ OR REPLACE ] [ { [ LOCAL | GLOBAL ] TEMP[ORARY] | VOLATILE } | TRANSIENT ] TABLE [ IF NOT EXISTS ] <table_name> ( <col_name> <col_type> [ { DEFAULT <expr> | { AUTOINCREMENT | IDENTITY } [ ( <start_num> , <step_num> ) | START <num> INCREMENT <num> ] } ] /* AUTOINCREMENT / IDENTITY supported only for numeric data types (NUMBER, INT, etc.) */ [ inlineConstraint ] [ , <col_name> <col_type> ... ] [ , outoflineConstraint ] [ , ... ] ) [ CLUSTER BY ( <expr> [ , <expr> , ... ] ) ] [ STAGE_FILE_FORMAT = ( { FORMAT_NAME = '<file_format_name>' | TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] } ) ] [ STAGE_COPY_OPTIONS = ( copyOptions ) ] [ DATA_RETENTION_TIME_IN_DAYS = <num> ] [ COPY GRANTS ] [ COMMENT = '<string_literal>' ]

ショーステージ:

SHOW STAGES;

パイプを作成します:

CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] [ AUTO_INGEST = [ TRUE | FALSE ] ] [ AWS_SNS_TOPIC = ] [ INTEGRATION = '' ] [ COMMENT = '' ] AS

パイプを表示:

SHOW PIPES [ LIKE '<pattern>' ] [ IN { ACCOUNT | [ DATABASE ] <db_name> | [ SCHEMA ] <schema_name> } ]

ストレージインテグレーションの作成:

CREATE STORAGE INTEGRATION <integration_name> TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = S3 ENABLED = TRUE STORAGE_AWS_ROLE_ARN = '<iam_role>' STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') [ STORAGE_BLOCKED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') ]

例:

create storage integration s3_int type = external_stage storage_provider = s3 enabled = true storage_aws_role_arn = 'arn:aws:iam::001234567890:role/myrole' storage_allowed_locations = ('s3://amzn-s3-demo-bucket1/mypath1/', 's3://amzn-s3-demo-bucket2/mypath2/') storage_blocked_locations = ('s3://amzn-s3-demo-bucket1/mypath1/sensitivedata/', 's3://amzn-s3-demo-bucket2/mypath2/sensitivedata/');

このステップに関する詳細については、Snowflake のドキュメントにある Amazon S3 にアクセスするための Snowflake ストレージ統合の設定を参照してください。

インテグレーションの説明:

DESC INTEGRATION <integration_name>;

S3 バケットポリシー

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:GetObjectVersion", "s3:DeleteObject", "s3:DeleteObjectVersion" ], "Resource": "arn:aws:s3::://*" }, { "Effect": "Allow", "Action": "s3:ListBucket", "Resource": "arn:aws:s3:::", "Condition": { "StringLike": { "s3:prefix": [ "/*" ] } } } ] }