翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Snowflake Snowpipe、Amazon S3、Amazon SNS、Amazon Data Firehose を使用して Snowflake データベースへのデータストリームの取り込みを自動化します
Bikash Chandra Rout (Amazon Web Services)
概要
このパターンは、Amazon Web Services (AWS) クラウド上のサービスを使用して、連続したデータストリームを処理し、Snowflake データベースにロードする方法を説明しています。このパターンでは、Amazon Data Firehose を使用して Amazon Simple Storage Service (Amazon S3) にデータを配信し、新しいデータを受信したときに Amazon Simple Notification Service (Amazon SNS) を使用して通知を送信し、Snowflake Snowpipe を使用して Snowflake データベースにデータをロードします。
このパターンに従うことで、継続的に生成されるデータを数秒で分析できるようになり、手動の COPY コマンドを複数回実行する必要がなくなり、ロード時の半構造化データを完全にサポートできます。
前提条件と制限事項
前提条件
アクティブ AWS アカウント。
Firehose 配信ストリームに継続的にデータを送信する、データソース。
Firehose 配信ストリームからデータを受信する、既存の S3 バケット。
アクティブな Snowflake アカウント。
制限事項
Snowflake Snowpipe は、Firehose には直接接続しません。
アーキテクチャ

テクノロジースタック
Amazon Data Firehose
Amazon SNS
Amazon S3
Snowflake Snowflake Snowpipe
Snowflake データレイク
ツール
Amazon Data Firehose は、Amazon S3、Amazon Redshift、Amazon OpenSearch Service、Splunk、カスタム HTTP エンドポイント、またはサポートされているサードパーティーのサービスプロバイダーが所有する HTTP エンドポイントなどの宛先にリアルタイムのストリーミングデータを配信するためのフルマネージドサービスです。
Amazon Simple Storage Service (Amazon S3) は、インターネット用のストレージです。
Amazon Simple Notification Service (Amazon SNS) は、サブスクライブしているエンドポイントやクライアントへのメッセージ配信や送信を調整および管理します。
Snowflake
– Snowflakeは、Software as a Service (SaaS) として提供される分析データウェアハウスです。 Snowflake Snowpipe
— Snowpipe は、スノーフレークの段階でファイルが利用可能になるとすぐにファイルからデータをロードします。
エピック
| タスク | 説明 | 必要なスキル |
|---|---|---|
Snowflake で CSV ファイルの作成 | Snowflake にサインインし、 | 開発者 |
外部の Snowflake ステージを作成します。 |
| 開発者 |
Snowflake ターゲットテーブルを作成します。 |
| 開発者 |
パイプを作成します。 |
| 開発者 |
| タスク | 説明 | 必要なスキル |
|---|---|---|
S3 バケットの 30 日間のライフサイクルポリシーを作成します。 | にサインイン AWS マネジメントコンソール し、Amazon S3 コンソールを開きます。Firehose からのデータを含む S3 バケットを選択します。次に S3 バケットの [管理] タブを選択し、[ライフサイクルルールを追加] を選択します。[ライフサイクルルール] ダイアログボックスにルールの名前を入力し、バケットの 30 日間のライフサイクルルールを設定します。このストーリーやその他のストーリーに関するヘルプは、関連リソースのセクションを参照してください。 | システム管理者、開発者 |
S3 バケット用の IAM ポリシーを作成します。 | AWS Identity and Access Management (IAM) コンソールを開き、ポリシーを選択します。[Create policy] (ポリシーの作成) を選択し、[JSON] タブを選択します。追加情報のセクションにあるポリシーをコピーして、JSON フィールドに貼り付けます。このポリシーは、 | システム管理者、開発者 |
ポリシーを IAM ロールにアサインします。 | IAM コンソールで [ロール]、[ロールの作成] の順に選択します。[別の AWS アカウント] を信頼されたエンティティとして選択します。 AWS アカウント ID を入力し、外部 ID を要求するを選択します。後で変更するプレースホルダー ID を入力します。[次へ] を選択し、前に作成した IAM ポリシーを割り当てます。IAM ロールを作成します。 | システム管理者、開発者 |
IAM ロールの Amazon リソースネーム (ARN) をメモします。 | IAM コンソールを開き、[ロール] を選択します。前に作成した IAM ロールを選択し、[ロール ARN] をコピーして保存します。 | システム管理者、開発者 |
| タスク | 説明 | 必要なスキル |
|---|---|---|
Snowflake でクラウドストレージの統合を作成します。 | Snowflake にサインインし、 | システム管理者、開発者 |
Snowflake アカウントの IAM ユーザーを取得します。 |
重要
| システム管理者、開発者 |
2 つの列の値を記録します。 |
| システム管理者、開発者 |
| タスク | 説明 | 必要なスキル |
|---|---|---|
IAM ロールポリシーを変更します。 | IAM コンソールを開き、[Roles] (ロール) を選択します。前に作成した IAM ロールを選択し、[信頼関係] タブを選択します。[Edit trust relationship (信頼関係の編集)] を選択します。 | システム管理者、開発者 |
| タスク | 説明 | 必要なスキル |
|---|---|---|
S3 バケットのイベント通知をオンにします。 | Amazon S3 コンソールを開き、 バケットを見つけます。[プロパティ] を選択し、[詳細設定] で [イベント] を選択します。[通知を追加] を選択し、このイベントの名前を入力します。名前を入力しない場合は、グローバル一意識別子 (GUID) が使用されます。 | システム管理者、開発者 |
通知用に Amazon SNS トピックを設定する | [イベント] で [ObjectCreate (All)] を選択し、[送信先] ドロップダウンリストから [SQS キュー] を選択します。[SNS] リストで [SQS キューの ARN を追加] を選択し、前にコピーした | システム管理者、開発者 |
Snowflake SQS キューを SNS トピックにサブスクライブする。 | Snowflake SQS キューを作成した SNS トピックにサブスクライブします。このステップに関するヘルプは、関連リソースのセクションを参照してください。 | システム管理者、開発者 |
| タスク | 説明 | 必要なスキル |
|---|---|---|
Snowpipeをチェックしてテストしてください。 | Snowflakeにサインインし、スノーフレークステージを開きます。S3 バケットにファイルをドロップし、Snowflake テーブルに読み込まれるかどうかを確認します。S3 バケットに新しいオブジェクトが表示されると、Amazon S3 は SNS 通知を Snowpipe に送信します。 | システム管理者、開発者 |
関連リソース
追加情報
ファイル形式の作成:
CREATE FILE FORMAT <name> TYPE = 'CSV' FIELD_DELIMITER = '|' SKIP_HEADER = 1;
外部ステージの作成:
externalStageParams (for Amazon S3) ::= URL = 's3://[//]' [ { STORAGE_INTEGRATION = } | { CREDENTIALS = ( { { AWS_KEY_ID = `` AWS_SECRET_KEY = `` [ AWS_TOKEN = `` ] } | AWS_ROLE = `` } ) ) }` ] [ ENCRYPTION = ( [ TYPE = 'AWS_CSE' ] [ MASTER_KEY = '' ] | [ TYPE = 'AWS_SSE_S3' ] | [ TYPE = 'AWS_SSE_KMS' [ KMS_KEY_ID = '' ] | [ TYPE = NONE ] )
テーブルの作成:
CREATE [ OR REPLACE ] [ { [ LOCAL | GLOBAL ] TEMP[ORARY] | VOLATILE } | TRANSIENT ] TABLE [ IF NOT EXISTS ] <table_name> ( <col_name> <col_type> [ { DEFAULT <expr> | { AUTOINCREMENT | IDENTITY } [ ( <start_num> , <step_num> ) | START <num> INCREMENT <num> ] } ] /* AUTOINCREMENT / IDENTITY supported only for numeric data types (NUMBER, INT, etc.) */ [ inlineConstraint ] [ , <col_name> <col_type> ... ] [ , outoflineConstraint ] [ , ... ] ) [ CLUSTER BY ( <expr> [ , <expr> , ... ] ) ] [ STAGE_FILE_FORMAT = ( { FORMAT_NAME = '<file_format_name>' | TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] } ) ] [ STAGE_COPY_OPTIONS = ( copyOptions ) ] [ DATA_RETENTION_TIME_IN_DAYS = <num> ] [ COPY GRANTS ] [ COMMENT = '<string_literal>' ]
ショーステージ:
SHOW STAGES;
パイプを作成します:
CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] [ AUTO_INGEST = [ TRUE | FALSE ] ] [ AWS_SNS_TOPIC = ] [ INTEGRATION = '' ] [ COMMENT = '' ] AS
パイプを表示:
SHOW PIPES [ LIKE '<pattern>' ] [ IN { ACCOUNT | [ DATABASE ] <db_name> | [ SCHEMA ] <schema_name> } ]
ストレージインテグレーションの作成:
CREATE STORAGE INTEGRATION <integration_name> TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = S3 ENABLED = TRUE STORAGE_AWS_ROLE_ARN = '<iam_role>' STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') [ STORAGE_BLOCKED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') ]
例:
create storage integration s3_int type = external_stage storage_provider = s3 enabled = true storage_aws_role_arn = 'arn:aws:iam::001234567890:role/myrole' storage_allowed_locations = ('s3://amzn-s3-demo-bucket1/mypath1/', 's3://amzn-s3-demo-bucket2/mypath2/') storage_blocked_locations = ('s3://amzn-s3-demo-bucket1/mypath1/sensitivedata/', 's3://amzn-s3-demo-bucket2/mypath2/sensitivedata/');
このステップに関する詳細については、Snowflake のドキュメントにある Amazon S3 にアクセスするための Snowflake ストレージ統合の設定
インテグレーションの説明:
DESC INTEGRATION <integration_name>;
S3 バケットポリシー
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:GetObjectVersion", "s3:DeleteObject", "s3:DeleteObjectVersion" ], "Resource": "arn:aws:s3::://*" }, { "Effect": "Allow", "Action": "s3:ListBucket", "Resource": "arn:aws:s3:::", "Condition": { "StringLike": { "s3:prefix": [ "/*" ] } } } ] }