使用 Snowflake Snowpipe、Amazon S3、Amazon SNS 和 Amazon Data Firehose 自动将数据流摄取至 Snowflake 数据库 - AWS 规范指引

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Snowflake Snowpipe、Amazon S3、Amazon SNS 和 Amazon Data Firehose 自动将数据流摄取至 Snowflake 数据库

Bikash Chandra Rout,Amazon Web Services

Summary

此模式描述了如何使用 Amazon Web Services(AWS)云上的服务处理连续数据流,并将其加载至 Snowflake 数据库。此模式使用 Amazon Data Firehose 将数据传输至 Amazon Simple Storage Service(Amazon S3),使用 Amazon Simple Notification Service(Amazon SNS)在收到新数据时发送通知,使用 Snowflake Snowpipe 将数据加载至 Snowflake 数据库。

通过遵循此模式,您可以在几秒钟内持续生成可供分析的数据,避免使用多个手动 COPY 命令,并且完全支持加载时的半结构化数据。

先决条件和限制

先决条件

  • 活跃 AWS 账户的.

  • 持续将数据发送至 Firehose 传输流的数据来源。

  • 从 Kinesis 传输流接收数据的现有 S3 存储桶。

  • 一个活跃的 Snowflake 账户。

限制

  • Snowflake Snowpipe 无法直接连接至 Firehose。

架构

Firehose 摄取的数据将进入 Amazon S3、Amazon SNS、Snowflake Snowpipe 和 Snowflake 数据库。

技术堆栈

  • Amazon Data Firehose

  • Amazon SNS

  • Amazon S3

  • Snowflake Snowpipe

  • Snowflake 数据库

工具

操作说明

Task说明所需技能

在 Snowflake 中创建 CSV 格式文件。

登录 Snowflake 并运行 CREATE FILE FORMAT 命令,以创建具有指定字段分隔符的 CSV 文件。有关此命令和其他 Snowflake 命令的更多信息,请参阅其他信息部分。

开发者版

创建外部 Snowflake 阶段。

运行 CREATE STAGE 命令,以创建一个引用您之前创建的 CSV 文件的外部 Snowflake 阶段。重要:您将需要 S3 存储桶的 URL、 AWS 访问密钥和私有访问 AWS 密钥。运行 SHOW STAGES 命令,以验证 Snowflake 阶段是否已创建。

开发者版

创建 Snowflake 目标表。

运行 CREATE TABLE 命令,以创建 Snowflake 表。

开发者版

创建管道。

运行 CREATE PIPE 命令;确保命令中包含 auto_ingest=true。运行 SHOW PIPES 命令,以验证管道是否已创建。复制并保存 notification_channel 列的值。此值可用于配置 Amazon S3 事件通知。

开发者版
Task说明所需技能

为 S3 存储桶创建 30 天的生命周期策略。

登录 AWS 管理控制台 并打开 Amazon S3 控制台。选择包含来自 Firehose 数据的 S3 存储桶。然后在 S3 存储桶中选择管理选项卡,再选择添加生命周期规则。在生命周期规则对话框内输入规则名称,并为存储桶配置 30 天生命周期规则。要获取有关此操作和其他操作的帮助,请参阅相关资源部分。

系统管理员、开发人员

为 S3 存储桶创建 IAM 策略。

打开 AWS Identity and Access Management (IAM) 控制台并选择策略。选择创建策略,然后选择 JSON 选项卡。将策略从其他信息部分复制并粘贴至 JSON 字段。此策略将授予 PutObjectDeleteObject 权限,以及GetObjectGetObjectVersionListBucket 权限。选择查看策略,输入策略名称,然后选择创建策略

系统管理员、开发人员

将该策略分配至 IAM 角色。

打开 IAM 控制台,选择角色,然后选择创建角色。选择其他 AWS 账户作为可信实体。输入您的 AWS 账户 ID,然后选择 “需要外部身份证”。输入占位符 ID,稍后将对其进行更改。选择下一步,并分配您之前创建的 IAM 策略。然后创建 IAM 角色。

系统管理员、开发人员

复制 IAM 角色的 Amazon 资源名称(ARN)。

打开 IAM 控制台,选择角色。选择您此前创建的 IAM 角色,然后复制并存储角色 ARN

系统管理员、开发人员
Task说明所需技能

在 Snowflake 创建存储集成。

登录 Snowflake,并运行 CREATE STORAGE INTEGRATION 命令。这将修改信任关系,授予 Snowflake 访问权限,并为您的 Snowflake 阶段提供外部 ID。

系统管理员、开发人员

为您的 Snowflake 账户检索 IAM 角色。

运行 DESC INTEGRATION 命令,以检索 IAM 角色的 ARN。

重要

<integration_ name> 是您之前创建的 Snowflake 存储集成的名称。

系统管理员、开发人员

记录两列的值。

复制并保存 storage_aws_iam_user_arnstorage_aws_external_id 列的值。

系统管理员、开发人员
Task说明所需技能

修改 IAM 角色策略。

打开 IAM 控制台,选择角色。选择您此前创建的 IAM 角色,然后选择信任关系选项卡。选择编辑信任关系。将 snowflake_external_id 替换为您之前复制的 storage_aws_external_id 值。将 snowflake_user_arn 替换为您之前复制的 storage_aws_iam_user_arn 值。然后选择更新信任策略

系统管理员、开发人员
Task说明所需技能

打开 S3 存储桶事件通知。

打开 Amazon S3 控制台并选择存储桶。选择属性,然后在高级设置下选择事件。选择添加通知,然后输入此事件名称。如果未输入名称,则使用全局唯一标识符 (GUID)。

系统管理员、开发人员

为 S3 存储桶配置 Amazon SNS 通知。

在 “事件” 下,选择 ObjectCreate (全部),然后在 “发送至” 下拉列表中选择 SQS 队列。在 SNS 列表中,选择添加 SQS 队列 ARN,然后粘贴之前复制的 notification_channel 值。然后选择保存

系统管理员、开发人员

为 Snowflake SQS 队列订阅 SNS 主题。

为 Snowflake SQS 队列订阅您创建的 SNS 主题。有关此步骤的帮助,请参阅相关资源部分。

系统管理员、开发人员
Task说明所需技能

检查并测试 Snowpipe。

登录 Snowflake 并打开 Snowflake 阶段。将文件拖放至 S3 存储桶,然后检查 Snowflake 表是否已加载这些文件。当 S3 存储桶中显示新对象时,Amazon S3 将向 Snowpipe 发送 SNS 通知。

系统管理员、开发人员

相关资源

附加信息

创建文件格式:

CREATE FILE FORMAT <name> TYPE = 'CSV' FIELD_DELIMITER = '|' SKIP_HEADER = 1;

创建外部阶段:

externalStageParams (for Amazon S3) ::= URL = 's3://[//]' [ { STORAGE_INTEGRATION = } | { CREDENTIALS = ( { { AWS_KEY_ID = `` AWS_SECRET_KEY = `` [ AWS_TOKEN = `` ] } | AWS_ROLE = `` } ) ) }` ] [ ENCRYPTION = ( [ TYPE = 'AWS_CSE' ] [ MASTER_KEY = '' ] | [ TYPE = 'AWS_SSE_S3' ] | [ TYPE = 'AWS_SSE_KMS' [ KMS_KEY_ID = '' ] | [ TYPE = NONE ] )

创建表:

CREATE [ OR REPLACE ] [ { [ LOCAL | GLOBAL ] TEMP[ORARY] | VOLATILE } | TRANSIENT ] TABLE [ IF NOT EXISTS ] <table_name> ( <col_name> <col_type> [ { DEFAULT <expr> | { AUTOINCREMENT | IDENTITY } [ ( <start_num> , <step_num> ) | START <num> INCREMENT <num> ] } ] /* AUTOINCREMENT / IDENTITY supported only for numeric data types (NUMBER, INT, etc.) */ [ inlineConstraint ] [ , <col_name> <col_type> ... ] [ , outoflineConstraint ] [ , ... ] ) [ CLUSTER BY ( <expr> [ , <expr> , ... ] ) ] [ STAGE_FILE_FORMAT = ( { FORMAT_NAME = '<file_format_name>' | TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] } ) ] [ STAGE_COPY_OPTIONS = ( copyOptions ) ] [ DATA_RETENTION_TIME_IN_DAYS = <num> ] [ COPY GRANTS ] [ COMMENT = '<string_literal>' ]

显示阶段:

SHOW STAGES;

创建管道:

CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] [ AUTO_INGEST = [ TRUE | FALSE ] ] [ AWS_SNS_TOPIC = ] [ INTEGRATION = '' ] [ COMMENT = '' ] AS

显示管道:

SHOW PIPES [ LIKE '<pattern>' ] [ IN { ACCOUNT | [ DATABASE ] <db_name> | [ SCHEMA ] <schema_name> } ]

创建存储集成:

CREATE STORAGE INTEGRATION <integration_name> TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = S3 ENABLED = TRUE STORAGE_AWS_ROLE_ARN = '<iam_role>' STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') [ STORAGE_BLOCKED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') ]

示例:

create storage integration s3_int type = external_stage storage_provider = s3 enabled = true storage_aws_role_arn = 'arn:aws:iam::001234567890:role/myrole' storage_allowed_locations = ('s3://amzn-s3-demo-bucket1/mypath1/', 's3://amzn-s3-demo-bucket2/mypath2/') storage_blocked_locations = ('s3://amzn-s3-demo-bucket1/mypath1/sensitivedata/', 's3://amzn-s3-demo-bucket2/mypath2/sensitivedata/');

有关此步骤的更多信息,请参阅 Snowflake 文档中的配置 Snowflake 存储集成以访问 Amazon S3

描述集成:

DESC INTEGRATION <integration_name>;

S3 存储桶策略:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:GetObjectVersion", "s3:DeleteObject", "s3:DeleteObjectVersion" ], "Resource": "arn:aws:s3::://*" }, { "Effect": "Allow", "Action": "s3:ListBucket", "Resource": "arn:aws:s3:::", "Condition": { "StringLike": { "s3:prefix": [ "/*" ] } } } ] }