Mengotomatiskan konsumsi aliran data ke dalam database Snowflake dengan menggunakan Snowflake Snowpipe, Amazon S3, Amazon SNS, dan Amazon Data Firehose - AWS Prescriptive Guidance

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Mengotomatiskan konsumsi aliran data ke dalam database Snowflake dengan menggunakan Snowflake Snowpipe, Amazon S3, Amazon SNS, dan Amazon Data Firehose

Bikash Chandra Rout, Amazon Web Services

Ringkasan

Pola ini menjelaskan bagaimana Anda dapat menggunakan layanan di Amazon Web Services (AWS) Cloud untuk memproses aliran data yang berkelanjutan dan memuatnya ke database Snowflake. Pola ini menggunakan Amazon Data Firehose untuk mengirimkan data ke Amazon Simple Storage Service (Amazon S3), Amazon Simple Notification Service (Amazon SNS) untuk mengirim notifikasi saat data baru diterima, dan Snowflake Snowpipe untuk memuat data ke database Snowflake.

Dengan mengikuti pola ini, Anda dapat terus menghasilkan data yang tersedia untuk analisis dalam hitungan detik, menghindari beberapa COPY perintah manual, dan memiliki dukungan penuh untuk data semi-terstruktur saat dimuat.

Prasyarat dan batasan

Prasyarat

  • Aktif Akun AWS.

  • Sumber data yang terus mengirim data ke aliran pengiriman Firehose.

  • Bucket S3 yang sudah ada yang menerima data dari aliran pengiriman Firehose.

  • Akun Snowflake yang aktif.

Batasan

  • Snowflake Snowpipe tidak terhubung langsung ke Firehose.

Arsitektur

Data yang dicerna oleh Firehose masuk ke Amazon S3, Amazon SNS, Snowflake Snowpipe, dan Snowflake DB.

Tumpukan teknologi

  • Amazon Data Firehose

  • Amazon SNS

  • Amazon S3

  • Pipa Salju Kepingan Salju

  • Database kepingan salju

Alat

  • Amazon Data Firehose adalah layanan yang dikelola sepenuhnya untuk mengirimkan data streaming real-time ke tujuan seperti Amazon S3, Amazon Redshift, OpenSearch Amazon Service, Splunk, dan titik akhir HTTP kustom atau titik akhir HTTP apa pun yang dimiliki oleh penyedia layanan pihak ketiga yang didukung.

  • Amazon Simple Storage Service (Amazon S3) adalah penyimpanan untuk internet.

  • Amazon Simple Notification Service (Amazon SNS) mengoordinasikan dan mengelola penyampaian atau pengiriman pesan ke titik akhir atau klien yang berlangganan .

  • Snowflake — Snowflake adalah gudang data analitik yang disediakan sebagai (SaaS). Software-as-a-Service

  • Snowflake Snowpipe — Snowpipe memuat data dari file segera setelah tersedia dalam tahap Snowflake.

Epik

TugasDeskripsiKeterampilan yang dibutuhkan

Buat file CSV di Snowflake.

Masuk ke Snowflake dan jalankan CREATE FILE FORMAT perintah untuk membuat file CSV dengan pembatas bidang tertentu. Untuk informasi lebih lanjut tentang ini dan perintah Snowflake lainnya, lihat bagian Informasi tambahan.

Developer

Buat panggung Snowflake eksternal.

Jalankan CREATE STAGE perintah untuk membuat tahap Snowflake eksternal yang mereferensikan file CSV yang Anda buat sebelumnya. Penting: Anda memerlukan URL untuk bucket S3, kunci AWS akses, dan kunci akses AWS rahasia Anda. Jalankan SHOW STAGES perintah untuk memverifikasi bahwa tahap Snowflake dibuat.

Developer

Buat tabel target Snowflake.

Jalankan CREATE TABLE perintah untuk membuat tabel Snowflake.

Developer

Buat pipa.

Jalankan CREATE PIPE perintah; pastikan itu auto_ingest=true ada di perintah. Jalankan SHOW PIPES perintah untuk memverifikasi bahwa pipa dibuat. Salin dan simpan nilai notification_channel kolom. Nilai ini akan digunakan untuk mengonfigurasi pemberitahuan acara Amazon S3.

Developer
TugasDeskripsiKeterampilan yang dibutuhkan

Buat kebijakan siklus hidup 30 hari untuk bucket S3.

Masuk ke Konsol Manajemen AWS dan buka konsol Amazon S3. Pilih bucket S3 yang berisi data dari Firehose. Kemudian pilih tab Manajemen di bucket S3 dan pilih Tambahkan aturan siklus hidup. Masukkan nama aturan Anda di kotak dialog Aturan Siklus Hidup, dan konfigurasikan aturan siklus hidup 30 hari untuk bucket Anda. Untuk bantuan tentang ini dan cerita lainnya, lihat bagian Sumber daya terkait.

Administrator Sistem, Pengembang

Buat kebijakan IAM untuk bucket S3.

Buka konsol AWS Identity and Access Management (IAM) dan pilih Kebijakan. Pilih Buat kebijakan, dan pilih tab JSON. Salin dan tempel kebijakan dari bagian Informasi tambahan ke bidang JSON. Kebijakan ini akan memberikan PutObject dan DeleteObject izin, serta GetObjectGetObjectVersion, dan ListBucket izin. Pilih Kebijakan ulasan, masukkan nama kebijakan, lalu pilih Buat kebijakan.

Administrator Sistem, Pengembang

Tetapkan kebijakan ke peran IAM.

Buka konsol IAM, pilih Peran, lalu pilih Buat peran. Pilih akun AWS lain sebagai entitas tepercaya. Masukkan Akun AWS ID Anda, dan pilih Memerlukan ID eksternal. Masukkan ID placeholder yang akan Anda ubah nanti. Pilih Berikutnya, dan tetapkan kebijakan IAM yang Anda buat sebelumnya. Kemudian buat peran IAM.

Administrator Sistem, Pengembang

Salin Nama Sumber Daya Amazon (ARN) untuk peran IAM.

Buka konsol IAM, dan pilih Peran. Pilih peran IAM yang Anda buat sebelumnya, lalu salin dan simpan ARN Peran.

Administrator Sistem, Pengembang
TugasDeskripsiKeterampilan yang dibutuhkan

Buat integrasi penyimpanan di Snowflake.

Masuk ke Snowflake dan jalankan perintah. CREATE STORAGE INTEGRATION Ini akan mengubah hubungan tepercaya, memberikan akses ke Snowflake, dan memberikan ID eksternal untuk tahap Snowflake Anda.

Administrator Sistem, Pengembang

Ambil peran IAM untuk akun Snowflake Anda.

Jalankan DESC INTEGRATION perintah untuk mengambil ARN untuk peran IAM.

penting

<integration_ name>adalah nama integrasi penyimpanan Snowflake yang Anda buat sebelumnya.

Administrator Sistem, Pengembang

Rekam dua nilai kolom.

Salin dan simpan nilai untuk storage_aws_external_id kolom storage_aws_iam_user_arn dan.

Administrator Sistem, Pengembang
TugasDeskripsiKeterampilan yang dibutuhkan

Ubah kebijakan peran IAM.

Buka konsol IAM dan pilih Peran. Pilih peran IAM yang Anda buat sebelumnya dan pilih tab Trust relationship. Pilih Edit trust relationship (Edit Hubungan Kepercayaan). Ganti snowflake_external_id dengan storage_aws_external_id nilai yang Anda salin sebelumnya. Ganti snowflake_user_arn dengan storage_aws_iam_user_arn nilai yang Anda salin sebelumnya. Kemudian pilih Perbarui kebijakan kepercayaan.

Administrator Sistem, Pengembang
TugasDeskripsiKeterampilan yang dibutuhkan

Aktifkan notifikasi acara untuk bucket S3.

Buka konsol Amazon S3 dan pilih bucket Anda. Pilih Properti, dan di bawah Pengaturan lanjutan s, pilih Acara. Pilih Tambahkan pemberitahuan, dan masukkan nama untuk acara ini. Jika Anda tidak memasukkan nama, pengidentifikasi unik global (GUID) akan digunakan.

Administrator Sistem, Pengembang

Konfigurasikan notifikasi Amazon SNS untuk bucket S3.

Di bawah Peristiwa, pilih ObjectCreate (Semua), lalu pilih SQS Queue di daftar dropdown Kirim ke. Dalam daftar SNS, pilih Add SQS queue ARN, dan paste notification_channel nilai yang Anda salin sebelumnya. Lalu, pilih Simpan.

Administrator Sistem, Pengembang

Berlangganan antrian Snowflake SQS ke topik SNS.

Berlangganan antrian Snowflake SQS ke topik SNS yang Anda buat. Untuk bantuan dengan langkah ini, lihat bagian Sumber daya terkait.

Administrator Sistem, Pengembang
TugasDeskripsiKeterampilan yang dibutuhkan

Periksa dan uji Snowpipe.

Masuk ke Snowflake dan buka panggung Snowflake. Jatuhkan file ke dalam ember S3 Anda dan periksa apakah tabel Snowflake memuatnya. Amazon S3 akan mengirim notifikasi SNS ke Snowpipe saat objek baru muncul di bucket S3.

Administrator Sistem, Pengembang

Sumber daya terkait

Informasi tambahan

Buat format file:

CREATE FILE FORMAT <name> TYPE = 'CSV' FIELD_DELIMITER = '|' SKIP_HEADER = 1;

Buat tahap eksternal:

externalStageParams (for Amazon S3) ::= URL = 's3://[//]' [ { STORAGE_INTEGRATION = } | { CREDENTIALS = ( { { AWS_KEY_ID = `` AWS_SECRET_KEY = `` [ AWS_TOKEN = `` ] } | AWS_ROLE = `` } ) ) }` ] [ ENCRYPTION = ( [ TYPE = 'AWS_CSE' ] [ MASTER_KEY = '' ] | [ TYPE = 'AWS_SSE_S3' ] | [ TYPE = 'AWS_SSE_KMS' [ KMS_KEY_ID = '' ] | [ TYPE = NONE ] )

Buat tabel:

CREATE [ OR REPLACE ] [ { [ LOCAL | GLOBAL ] TEMP[ORARY] | VOLATILE } | TRANSIENT ] TABLE [ IF NOT EXISTS ] <table_name> ( <col_name> <col_type> [ { DEFAULT <expr> | { AUTOINCREMENT | IDENTITY } [ ( <start_num> , <step_num> ) | START <num> INCREMENT <num> ] } ] /* AUTOINCREMENT / IDENTITY supported only for numeric data types (NUMBER, INT, etc.) */ [ inlineConstraint ] [ , <col_name> <col_type> ... ] [ , outoflineConstraint ] [ , ... ] ) [ CLUSTER BY ( <expr> [ , <expr> , ... ] ) ] [ STAGE_FILE_FORMAT = ( { FORMAT_NAME = '<file_format_name>' | TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] } ) ] [ STAGE_COPY_OPTIONS = ( copyOptions ) ] [ DATA_RETENTION_TIME_IN_DAYS = <num> ] [ COPY GRANTS ] [ COMMENT = '<string_literal>' ]

Tampilkan tahapan:

SHOW STAGES;

Buat pipa:

CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] [ AUTO_INGEST = [ TRUE | FALSE ] ] [ AWS_SNS_TOPIC = ] [ INTEGRATION = '' ] [ COMMENT = '' ] AS

Tampilkan pipa:

SHOW PIPES [ LIKE '<pattern>' ] [ IN { ACCOUNT | [ DATABASE ] <db_name> | [ SCHEMA ] <schema_name> } ]

Buat integrasi penyimpanan:

CREATE STORAGE INTEGRATION <integration_name> TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = S3 ENABLED = TRUE STORAGE_AWS_ROLE_ARN = '<iam_role>' STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') [ STORAGE_BLOCKED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') ]

Contoh:

create storage integration s3_int type = external_stage storage_provider = s3 enabled = true storage_aws_role_arn = 'arn:aws:iam::001234567890:role/myrole' storage_allowed_locations = ('s3://amzn-s3-demo-bucket1/mypath1/', 's3://amzn-s3-demo-bucket2/mypath2/') storage_blocked_locations = ('s3://amzn-s3-demo-bucket1/mypath1/sensitivedata/', 's3://amzn-s3-demo-bucket2/mypath2/sensitivedata/');

Untuk informasi selengkapnya tentang langkah ini, lihat Mengonfigurasi integrasi penyimpanan Snowflake untuk mengakses Amazon S3 dari dokumentasi Snowflake.

Jelaskan integrasi:

DESC INTEGRATION <integration_name>;

Kebijakan bucket S3:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:GetObjectVersion", "s3:DeleteObject", "s3:DeleteObjectVersion" ], "Resource": "arn:aws:s3::://*" }, { "Effect": "Allow", "Action": "s3:ListBucket", "Resource": "arn:aws:s3:::", "Condition": { "StringLike": { "s3:prefix": [ "/*" ] } } } ] }