Snowflake Snowpipe, Amazon S3, Amazon SNS 및 Amazon Data Firehose를 사용하여 Snowflake 데이터베이스로 데이터 스트림 수집 자동화 - 권장 가이드

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Snowflake Snowpipe, Amazon S3, Amazon SNS 및 Amazon Data Firehose를 사용하여 Snowflake 데이터베이스로 데이터 스트림 수집 자동화

작성자: 비카쉬 찬드라 라우트(AWS)

요약

이 패턴은 Amazon Web Services(AWS) 클라우드에서 서비스를 사용하여 지속적인 데이터 스트림을 처리하고 이를 Snowflake 데이터베이스에 로드하는 방법을 설명합니다. 이 패턴은 Amazon Data Firehose를 사용하여 Amazon Simple Storage Service(Amazon S3), Amazon Simple Notification Service(Amazon SNS)로 데이터를 전송하여 새 데이터가 수신될 때 알림을 보내고, Snowflake Snowpipe를 사용하여 데이터를 Snowflake 데이터베이스로 로드합니다.

이 패턴을 따르면 몇 초 만에 분석에 사용할 수 있는 데이터를 지속적으로 생성하고, 여러 수동 COPY 명령을 피하고, 로드 시 반정형 데이터를 완벽하게 지원할 수 있습니다.

사전 조건 및 제한 사항

사전 조건 

  • 활성. AWS 계정

  • Firehose 전송 스트림으로 데이터를 지속적으로 전송하는 데이터 소스입니다.

  • Firehose 전송 스트림에서 데이터를 수신하는 기존 S3 버킷입니다.

  • 활성 상태의 Snowflake 계정.

제한 사항

  • Snowflake Snowpipe는 Firehose에 직접 연결되지 않습니다.

아키텍처

Firehose에서 수집한 데이터는 Amazon S3, Amazon SNS, Snowflake Snowpipe 및 Snowflake DB로 이동합니다.

기술 스택

  • Amazon Data Firehose

  • Amazon SNS

  • Amazon S3

  • Snowflake Snowpipe

  • Snowflake 데이터베이스

도구

  • Amazon Data Firehose는 Amazon S3, Amazon Redshift, Amazon OpenSearch Service, Splunk 및 지원되는 타사 서비스 공급자가 소유한 사용자 지정 HTTP 엔드포인트 또는 HTTP 엔드포인트와 같은 대상에 실시간 스트리밍 데이터를 제공하기 위한 완전관리형 서비스입니다.

  • Amazon Simple Storage Service(Amazon S3)는 인터넷용 스토리지입니다.

  • Amazon Simple Notification Service(Amazon SNS)는 구독 중인 엔드포인트 또는 클라이언트에 메시지를 전달 또는 전송하는 것을 조정하고 관리합니다.

  • Snowflake - Snowflake는 서비스형 소프트웨어(SaaS)로 제공되는 분석 데이터 웨어하우스입니다.

  • Snowflake Snowpipe - Snowpipe는 Snowflake 스테이지에서 파일이 제공되는 즉시 파일에서 데이터를 로드합니다.

에픽

작업설명필요한 기술

Snowflake에서 CSV 파일을 생성합니다.

Snowflake에 로그인하고 CREATE FILE FORMAT 명령을 실행하여 지정된 필드 구분 기호가 있는 CSV 파일을 생성합니다. 이 명령 및 기타 Snowflake 명령에 대한 자세한 내용은 추가 정보 섹션을 참조하세요.

개발자

외부 Snowflake 스테이지를 생성합니다.

CREATE STAGE 명령을 실행하여 이전에 생성한 CSV 파일을 참조하는 외부 Snowflake 단계를 생성합니다. 중요: S3 버킷의 URL, AWS 액세스 키 및 AWS 보안 액세스 키가 필요합니다. SHOW STAGES 명령을 실행하여 Snowflake 단계가 생성되었는지 확인합니다.

개발자

Snowflake 대상 테이블을 생성합니다.

CREATE TABLE 명령을 실행하여 Snowflake 테이블을 생성합니다.

개발자

파이프를 생성합니다.

CREATE PIPE 명령을 실행합니다. auto_ingest=true가 명령에 있는지 확인합니다. SHOW PIPES 명령을 실행하여 파이프가 생성되었는지 확인합니다. notification_channel 열 값을 복사하고 저장합니다. 이 값은 Amazon S3 이벤트 알림을 구성하는 데 사용됩니다.

개발자
작업설명필요한 기술

S3 버킷에 대한 30일 수명 주기 정책을 생성합니다.

에 로그인 AWS Management Console 하고 Amazon S3 콘솔을 엽니다. Firehose의 데이터가 포함된 S3 버킷을 선택합니다. 그런 다음 S3 버킷에서 관리 탭을 선택하고 수명 주기 규칙 추가를 선택합니다. 수명 주기 규칙 대화 상자에 규칙 이름을 입력하고, 버킷의 30일 수명 주기 규칙을 구성합니다. 이 이야기와 다른 이야기에 대한 도움이 필요하면 관련 리소스 섹션을 참조하십시오.

시스템 관리자, 개발자

S3 버킷에 대한 IAM 정책을 생성합니다.

AWS Identity and Access Management (IAM) 콘솔을 열고 정책을 선택합니다. 정책 생성을 선택한 후 JSON 탭을 선택합니다. 추가 정보 섹션의 정책을 복사하여 JSON 필드에 붙여 넣습니다. 이 정책은 PutObjectDeleteObject 권한과 , GetObject GetObjectVersionListBucket 권한을 부여합니다. 정책 검토를 선택하고 정책 이름을 입력한 다음 정책 생성을 선택합니다.

시스템 관리자, 개발자

IAM 역할에 정책을 할당합니다.

IAM 콘솔을 열고 역할을 선택한 다음 역할 생성을 선택합니다. 신뢰할 수 있는 엔터티로 다른 AWS 계정을 선택합니다. AWS 계정 ID를 입력하고 외부 ID 필요를 선택합니다. 나중에 변경할 자리 표시자 ID를 입력합니다. 다음을 선택하고 이전에 생성한 IAM 정책을 할당합니다. 그런 다음 IAM 역할을 생성합니다.

시스템 관리자, 개발자

IAM 역할의 Amazon 리소스 이름(ARN)을 복사합니다.

IAM 콘솔을 열고 역할을 선택합니다. 이전에 생성한 IAM 역할을 선택한 다음 역할 ARN을 복사하고 저장합니다.

시스템 관리자, 개발자
작업설명필요한 기술

Snowflake에서 스토리지 통합을 생성합니다.

Snowflake에 로그인하고 CREATE STORAGE INTEGRATION 명령을 실행합니다. 그러면 신뢰할 수 있는 관계를 수정되고, Snowflake에 대한 액세스 권한이 부여되고, Snowflake 스테이지에 대한 외부 ID가 제공됩니다.

시스템 관리자, 개발자

Snowflake 계정에 대한 IAM 역할을 검색합니다.

DESC INTEGRATION 명령을 실행하여 IAM 역할의 ARN을 검색합니다.

중요

<integration_ name>는 이전에 생성한 Snowflake 스토리지 통합의 이름입니다.

시스템 관리자, 개발자

두 개의 열 값을 기록합니다.

storage_aws_iam_user_arnstorage_aws_external_id 열의 값을 복사하고 저장합니다.

시스템 관리자, 개발자
작업설명필요한 기술

IAM 역할 정책을 수정합니다.

IAM 콘솔을 열고 역할을 선택합니다. 이전에 생성한 IAM 역할을 선택하고 신뢰 관계 탭을 선택합니다. 신뢰 관계 편집을 선택합니다. snowflake_external_id를 이전에 복사한 storage_aws_external_id 값으로 바꿉니다. snowflake_user_arn를 이전에 복사한 storage_aws_iam_user_arn 값으로 바꿉니다. 그런 다음 신뢰 정책 업데이트를 선택합니다.

시스템 관리자, 개발자
작업설명필요한 기술

S3 버킷에 대한 이벤트 알림을 켭니다.

Amazon S3 콘솔을 열고 버킷을 선택합니다. 속성을 선택하고 고급 설정에서 이벤트를 선택합니다. 알림 추가를 선택하고이 이벤트의 이름을 입력합니다. 이름을 입력하지 않으면 전역 고유 식별자(GUID)가 사용됩니다.

시스템 관리자, 개발자

S3 버킷에 대한 Amazon SNS 알림을 구성합니다.

이벤트에서 ObjectCreate(모두)를 선택한 다음 전송 대상 드롭다운 목록에서 SQS 대기열 선택합니다. SNS 목록에서 SQS 대기열 ARN 추가를 선택하고 이전에 복사한 notification_channel 값을 붙여넣습니다. 그런 다음 저장을 선택합니다.

시스템 관리자, 개발자

SNS 주제에 대한 Snowflake SQS 대기열을 구독합니다.

생성한 SNS 주제에 대한 Snowflake SQS 대기열을 구독합니다. 이 단계에 대한 도움말은 관련 리소스 섹션을 참조하세요.

시스템 관리자, 개발자
작업설명필요한 기술

Snowpipe를 확인하고 테스트합니다.

Snowflake에 로그인하고 Snowflake 스테이지를 엽니다. S3 버킷에 파일을 드롭하고 Snowflake 테이블에 파일이 로드되는지 확인합니다. Amazon S3는 S3 버킷에 새 객체가 나타나면 Snowpipe에 SNS 알림을 전송합니다.

시스템 관리자, 개발자

관련 리소스

추가 정보

파일 형식 생성:

CREATE FILE FORMAT <name> TYPE = 'CSV' FIELD_DELIMITER = '|' SKIP_HEADER = 1;

외부 스테이지 생성:

externalStageParams (for Amazon S3) ::= URL = 's3://[//]' [ { STORAGE_INTEGRATION = } | { CREDENTIALS = ( { { AWS_KEY_ID = `` AWS_SECRET_KEY = `` [ AWS_TOKEN = `` ] } | AWS_ROLE = `` } ) ) }` ] [ ENCRYPTION = ( [ TYPE = 'AWS_CSE' ] [ MASTER_KEY = '' ] | [ TYPE = 'AWS_SSE_S3' ] | [ TYPE = 'AWS_SSE_KMS' [ KMS_KEY_ID = '' ] | [ TYPE = NONE ] )

테이블 생성:

CREATE [ OR REPLACE ] [ { [ LOCAL | GLOBAL ] TEMP[ORARY] | VOLATILE } | TRANSIENT ] TABLE [ IF NOT EXISTS ] <table_name> ( <col_name> <col_type> [ { DEFAULT <expr> | { AUTOINCREMENT | IDENTITY } [ ( <start_num> , <step_num> ) | START <num> INCREMENT <num> ] } ] /* AUTOINCREMENT / IDENTITY supported only for numeric data types (NUMBER, INT, etc.) */ [ inlineConstraint ] [ , <col_name> <col_type> ... ] [ , outoflineConstraint ] [ , ... ] ) [ CLUSTER BY ( <expr> [ , <expr> , ... ] ) ] [ STAGE_FILE_FORMAT = ( { FORMAT_NAME = '<file_format_name>' | TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] } ) ] [ STAGE_COPY_OPTIONS = ( copyOptions ) ] [ DATA_RETENTION_TIME_IN_DAYS = <num> ] [ COPY GRANTS ] [ COMMENT = '<string_literal>' ]

스테이지 보기:

SHOW STAGES;

파이프 생성:

CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] [ AUTO_INGEST = [ TRUE | FALSE ] ] [ AWS_SNS_TOPIC = ] [ INTEGRATION = '' ] [ COMMENT = '' ] AS

파이프 보기:

SHOW PIPES [ LIKE '<pattern>' ] [ IN { ACCOUNT | [ DATABASE ] <db_name> | [ SCHEMA ] <schema_name> } ]

스토리지 통합 생성:

CREATE STORAGE INTEGRATION <integration_name> TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = S3 ENABLED = TRUE STORAGE_AWS_ROLE_ARN = '<iam_role>' STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') [ STORAGE_BLOCKED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') ]

예시

create storage integration s3_int type = external_stage storage_provider = s3 enabled = true storage_aws_role_arn = 'arn:aws:iam::001234567890:role/myrole' storage_allowed_locations = ('s3://amzn-s3-demo-bucket1/mypath1/', 's3://amzn-s3-demo-bucket2/mypath2/') storage_blocked_locations = ('s3://amzn-s3-demo-bucket1/mypath1/sensitivedata/', 's3://amzn-s3-demo-bucket2/mypath2/sensitivedata/');

이 단계에 관한 자세한 내용은 Snowflake 설명서에서 Amazon S3에 액세스하기 위한 Snowflake 스토리지 통합 구성을 참조하세요.

통합 설명:

DESC INTEGRATION <integration_name>;

S3 버킷 정책:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:GetObjectVersion", "s3:DeleteObject", "s3:DeleteObjectVersion" ], "Resource": "arn:aws:s3::://*" }, { "Effect": "Allow", "Action": "s3:ListBucket", "Resource": "arn:aws:s3:::", "Condition": { "StringLike": { "s3:prefix": [ "/*" ] } } } ] }