Automatização da ingestão de fluxos de dados em um banco de dados do Snowflake usando o Snowflake Snowpipe, o Amazon S3, o Amazon SNS e o Amazon Data Firehose - Recomendações da AWS

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Automatização da ingestão de fluxos de dados em um banco de dados do Snowflake usando o Snowflake Snowpipe, o Amazon S3, o Amazon SNS e o Amazon Data Firehose

Bikash Chandra Rout, Amazon Web Services

Resumo

Este padrão descreve como você pode usar serviços na Nuvem Amazon Web Services (AWS) para processar um fluxo contínuo de dados e carregá-lo em um banco de dados do Snowflake. O padrão usa o Amazon Data Firehose para fornecer os dados ao Amazon Simple Storage Service (Amazon S3), o Amazon Simple Notification Service (Amazon SNS) para enviar notificações quando novos dados são recebidos, e o Snowflake Snowpipe para carregar os dados em um banco de dados do Snowflake.

Com este padrão, é possível disponibilizar dados gerados de forma contínua para análise em segundos, eliminar a necessidade de vários comandos COPY manuais e contar com suporte total a dados semiestruturados durante o carregamento.

Pré-requisitos e limitações

Pré-requisitos

  • Um ativo Conta da AWS.

  • Uma fonte de dados que envia continuamente informações para um fluxo de entrega do Firehose.

  • Um bucket do S3 existente, responsável por receber os dados do fluxo de entrega do Firehose.

  • Uma conta ativa do Snowflake.

Limitações

  • O Snowflake Snowpipe não estabelece conexão direta com o Firehose.

Arquitetura

Os dados ingeridos pelo Firehose são enviados para o Amazon S3, o Amazon SNS, o Snowflake Snowpipe e o banco de dados do Snowflake.

Pilha de tecnologia

  • Amazon Data Firehose

  • Amazon SNS

  • Amazon S3

  • Snowflake

  • Banco de dados do Snowflake

Ferramentas

  • O Amazon Data Firehose é um serviço totalmente gerenciado para fornecer dados de streaming em tempo real para destinos como Amazon S3, Amazon Redshift, OpenSearch Amazon Service, Splunk e qualquer endpoint HTTP personalizado ou endpoints HTTP de propriedade de provedores de serviços terceirizados compatíveis.

  • O Amazon Simple Storage Service (Amazon S3) é um serviço de armazenamento para a internet.

  • O Amazon Simple Notification Service (Amazon SNS) é um serviço da Web que coordena e gerencia a entrega ou o envio de mensagens a endpoints ou clientes inscritos.

  • Snowflake — O Snowflake é um data warehouse analítico fornecido como (SaaS). Software-as-a-Service

  • Snowflake Snowpipe: o Snowpipe carrega dados dos arquivos assim que eles estão disponíveis em um estágio do Snowflake.

Épicos

TarefaDescriptionHabilidades necessárias

Crie um arquivo CSV no Snowflake.

Faça login no Snowflake e execute o comando CREATE FILE FORMAT para criar um arquivo CSV com um delimitador de campo especificado. Para obter mais informações sobre este e outros comandos do Snowflake, consulte a seção Informações adicionais.

Desenvolvedor

Crie um estágio externo do Snowflake.

Execute o comando CREATE STAGE para criar um estágio externo no Snowflake que faça referência ao arquivo CSV criado anteriormente. Importante: você precisará da URL do bucket do S3, da sua chave de AWS acesso e da sua chave de acesso AWS secreta. Execute o comando SHOW STAGES para verificar se o estágio do Snowflake foi criado.

Desenvolvedor

Crie a tabela de destino do Snowflake.

Execute o comando CREATE TABLE para criar a tabela do Snowflake.

Desenvolvedor

Crie um pipe.

Execute o comando CREATE PIPE e certifique-se de que o valor auto_ingest=true esteja incluído no comando. Execute o comando SHOW PIPES para verificar se o pipe foi criado. Copie e salve o valor da coluna notification_channel. Esse valor será usado para configurar notificações de eventos do Amazon S3.

Desenvolvedor
TarefaDescriptionHabilidades necessárias

Crie uma política de ciclo de vida de 30 dias para o bucket do S3.

Faça login no Console de gerenciamento da AWS e abra o console do Amazon S3. Selecione o bucket do S3 que contém os dados do Firehose. Em seguida, selecione a guia Gerenciamento no bucket do S3 e escolha Adicionar regra de ciclo de vida. Insira um nome para sua regra na caixa de diálogo Regra de ciclo de vida e configure uma regra de ciclo de vida de 30 dias para seu bucket. Para obter ajuda com esse e outros artigos, consulte a seção Recursos relacionados.

Administrador do sistema, desenvolvedor

Crie uma política do IAM para o bucket do S3.

Abra o console AWS Identity and Access Management (IAM) e escolha Políticas. Escolha Criar política e escolha a guia JSON. Copie e cole a política apresentada na seção Informações adicionais no campo JSON. Esta política concederá permissões PutObject e DeleteObject, assim como permissões GetObject, GetObjectVersion e ListBucket. Selecione Revisar política, insira um nome para a política e, em seguida, clique em Criar política.

Administrador do sistema, desenvolvedor

Atribua a política a um perfil do IAM.

Abra o console do IAM, escolha Perfis e, em seguida, selecione Criar perfil. Selecione Outra conta da AWS como a entidade confiável. Insira sua Conta da AWS ID e escolha Exigir ID externa. Insira um ID de espaço reservado que você alterará posteriormente. Selecione Próximo e atribua a política do IAM que você criou anteriormente. Então, crie o perfil do IAM.

Administrador do sistema, desenvolvedor

Copie o nome do recurso da Amazon (ARN) do perfil do IAM.

Abra o console do IAM e selecione Perfis. Escolha o perfil do IAM que você criou anteriormente e, em seguida, copie e armazene o ARN do perfil.

Administrador do sistema, desenvolvedor
TarefaDescriptionHabilidades necessárias

Crie uma integração de armazenamento no Snowflake.

Faça login no Snowflake e execute o comando CREATE STORAGE INTEGRATION. Isso modificará o relacionamento confiável, concederá acesso ao Snowflake e fornecerá a ID externa do seu estágio do Snowflake.

Administrador do sistema, desenvolvedor

Recupere o perfil do IAM da conta do Snowflake.

Execute o comando DESC INTEGRATION para recuperar o ARN do perfil do IAM.

Importante

<integration_ name> corresponde ao nome da integração de armazenamento do Snowflake criada por você anteriormente.

Administrador do sistema, desenvolvedor

Registre os valores de duas colunas.

Copie e salve os valores das colunas storage_aws_iam_user_arn e storage_aws_external_id.

Administrador do sistema, desenvolvedor
TarefaDescriptionHabilidades necessárias

Modifique a política de perfil do IAM.

Abra o console do IAM e selecione Funções. Selecione o perfil do IAM que você criou anteriormente e escolha a guia Relações de confiança. Selecione Editar relação de confiança. Substitua snowflake_external_id pelo valor de storage_aws_external_id, que foi copiado anteriormente. Substitua snowflake_user_arn pelo valor de storage_aws_iam_user_arn, que foi copiado anteriormente. Em seguida, escolha Atualizar política de confiança.

Administrador do sistema, desenvolvedor
TarefaDescriptionHabilidades necessárias

Ative as notificações de eventos para o bucket do S3.

Abra o console do Amazon S3 e selecione seu bucket. Selecione Propriedades e, em Configurações avançadas, escolha Eventos. Selecione Adicionar notificação e insira um nome para esse evento. Se você não inserir um nome, um Identificador Exclusivo Globalmente (GUID) será usado.

Administrador do sistema, desenvolvedor

Configurar notificações do Amazon SNS para o bucket do S3.

Em Eventos, escolha ObjectCreate (Tudo) e, em seguida, escolha SQS Queue na lista suspensa Enviar para. Na lista SNS, selecione Adicionar ARN da fila do SQS e cole o valor notification_channel que você copiou anteriormente. Em seguida, escolha Salvar.

Administrador do sistema, desenvolvedor

Assinar a fila do Snowflake SQS; para um tópico do SNS.

Assinar a fila do Snowflake SQS; para um tópico do SNS que você criou. Para obter ajuda com esta etapa, consulte a seção Recursos relacionados.

Administrador do sistema, desenvolvedor
TarefaDescriptionHabilidades necessárias

Verifique e teste o Snowpipe.

Faça login no Snowflake e abra o estágio do Snowflake. Coloque os arquivos em seu bucket do S3 e verifique se a tabela do Snowflake os carrega. O Amazon S3 enviará notificações do SNS para o Snowpipe quando novos objetos aparecerem no bucket do S3.

Administrador do sistema, desenvolvedor

Recursos relacionados

Mais informações

Criar um formato de arquivo:

CREATE FILE FORMAT <name> TYPE = 'CSV' FIELD_DELIMITER = '|' SKIP_HEADER = 1;

Criar um estágio externo:

externalStageParams (for Amazon S3) ::= URL = 's3://[//]' [ { STORAGE_INTEGRATION = } | { CREDENTIALS = ( { { AWS_KEY_ID = `` AWS_SECRET_KEY = `` [ AWS_TOKEN = `` ] } | AWS_ROLE = `` } ) ) }` ] [ ENCRYPTION = ( [ TYPE = 'AWS_CSE' ] [ MASTER_KEY = '' ] | [ TYPE = 'AWS_SSE_S3' ] | [ TYPE = 'AWS_SSE_KMS' [ KMS_KEY_ID = '' ] | [ TYPE = NONE ] )

Criar uma tabela:

CREATE [ OR REPLACE ] [ { [ LOCAL | GLOBAL ] TEMP[ORARY] | VOLATILE } | TRANSIENT ] TABLE [ IF NOT EXISTS ] <table_name> ( <col_name> <col_type> [ { DEFAULT <expr> | { AUTOINCREMENT | IDENTITY } [ ( <start_num> , <step_num> ) | START <num> INCREMENT <num> ] } ] /* AUTOINCREMENT / IDENTITY supported only for numeric data types (NUMBER, INT, etc.) */ [ inlineConstraint ] [ , <col_name> <col_type> ... ] [ , outoflineConstraint ] [ , ... ] ) [ CLUSTER BY ( <expr> [ , <expr> , ... ] ) ] [ STAGE_FILE_FORMAT = ( { FORMAT_NAME = '<file_format_name>' | TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] } ) ] [ STAGE_COPY_OPTIONS = ( copyOptions ) ] [ DATA_RETENTION_TIME_IN_DAYS = <num> ] [ COPY GRANTS ] [ COMMENT = '<string_literal>' ]

Exibir etapas:

SHOW STAGES;

Criar um pipe:

CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] [ AUTO_INGEST = [ TRUE | FALSE ] ] [ AWS_SNS_TOPIC = ] [ INTEGRATION = '' ] [ COMMENT = '' ] AS

Exibir pipes:

SHOW PIPES [ LIKE '<pattern>' ] [ IN { ACCOUNT | [ DATABASE ] <db_name> | [ SCHEMA ] <schema_name> } ]

Criar uma integração de armazenamento:

CREATE STORAGE INTEGRATION <integration_name> TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = S3 ENABLED = TRUE STORAGE_AWS_ROLE_ARN = '<iam_role>' STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') [ STORAGE_BLOCKED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') ]

Exemplo:

create storage integration s3_int type = external_stage storage_provider = s3 enabled = true storage_aws_role_arn = 'arn:aws:iam::001234567890:role/myrole' storage_allowed_locations = ('s3://amzn-s3-demo-bucket1/mypath1/', 's3://amzn-s3-demo-bucket2/mypath2/') storage_blocked_locations = ('s3://amzn-s3-demo-bucket1/mypath1/sensitivedata/', 's3://amzn-s3-demo-bucket2/mypath2/sensitivedata/');

Para obter mais informações sobre essa etapa, consulte Configuração de uma integração de armazenamento do Snowflake para acessar o Amazon S3 na documentação do Snowflake.

Descreva uma integração:

DESC INTEGRATION <integration_name>;

Política de bucket do S3:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:GetObjectVersion", "s3:DeleteObject", "s3:DeleteObjectVersion" ], "Resource": "arn:aws:s3::://*" }, { "Effect": "Allow", "Action": "s3:ListBucket", "Resource": "arn:aws:s3:::", "Condition": { "StringLike": { "s3:prefix": [ "/*" ] } } } ] }