Conceitos básicos sobre Integrações ETL zero do Aurora
Antes de criar uma Integração ETL zero, configure o cluster de banco de dados do Aurora e o data warehouse com as permissões e os parâmetros necessários. Durante a configuração, você realizará as seguintes etapas:
Depois de concluir essas tarefas, prossiga para Criar integrações ETL zero do Amazon Aurora com o Amazon Redshift ou Criar Integrações ETL zero entre o Aurora e um lakehouse do Amazon SageMaker.
É possível usar os SDKs da AWS para automatizar o processo de configuração. Para obter mais informações, consulte Configurar uma integração usando SDKs da AWS.
Para a Etapa 3, você pode optar por criar um data warehouse de destino (Etapa 3a) ou um lakehouse de destino (Etapa 3b), dependendo de suas necessidades:
-
Escolha um data warehouse se precisar de recursos tradicionais de armazenamento de dados com analytics baseada em SQL.
-
Escolha um lakehouse do Amazon SageMaker AI se precisar de recursos de machine learning e quiser usar os recursos do lakehouse para fluxos de trabalho de ciência de dados e ML.
Etapa 1: criar um grupo de parâmetros de cluster de banco de dados personalizado
As Integrações ETL zero do Aurora exigem valores específicos para os parâmetros de cluster de banco de dados do Aurora que controlam a replicação. Mais especificamente, o Aurora MySQL requer o log binário aprimorado (aurora_enhanced_binlog) e o Aurora PostgreSQL requer replicação lógica aprimorada (aurora.enhanced_logical_replication).
Para configurar o registro em log binário ou a replicação lógica, primeiro é necessário criar um grupo de parâmetros de cluster de banco de dados personalizado e, depois, associá-lo ao cluster de banco de dados de origem.
Aurora MySQL (família aurora-mysql):
Além disso, verifique se o parâmetro binlog_transaction_compression não está definido como ON e se o parâmetrobinlog_row_value_options não está definido como PARTIAL_JSON.
Para ter mais informações sobre o log binário aprimorado do Aurora MySQL, consulte Configurar o log binário avançado para Aurora MySQL.
Aurora PostgreSQL (família aurora-postgresql):
-
rds.logical_replication=1
-
aurora.enhanced_logical_replication=1
-
aurora.logical_replication_backup=0
-
aurora.logical_replication_globaldb=0
Habilitar a replicação lógica aprimorada (aurora.enhanced_logical_replication) sempre gravará todos os valores de coluna no log de gravação antecipada (WAL), mesmo que REPLICA IDENTITY FULL não esteja habilitada. Isso pode aumentar as IOPS do cluster de banco de dados de origem.
Se você desabilitar o parâmetro de cluster de banco de dados aurora.enhanced_logical_replication a instância de banco de dados primária invalidará todos os slots de replicação lógica. Isso interrompe a replicação da origem no destino, e você deve recriar os slots de replicação na instância de banco de dados primária. Para evitar interrupções, mantenha o estado do parâmetro consistente durante a replicação.
Etapa 2: Selecionar ou criar um cluster de banco de dados de origem
Depois de criar um grupo de parâmetros de cluster de banco de dados personalizado, escolha ou crie um cluster de banco de dados do Aurora. Esse cluster será a origem da replicação de dados para o data warehouse de destino. Você pode especificar um cluster de banco de dados que usa instâncias de banco de dados provisionadas ou instâncias de banco de dados do Aurora Serverless v2. Consulte instruções para criar um cluster de banco de dados, consulte Criar um cluster de bancos de dados do Amazon Aurora ou Criar um cluster de banco de dados que usa o Aurora Serverless v2.
O banco de dados deve estar executando uma versão de mecanismo de banco de dados compatível. Para conferir uma lista de versões compatíveis, consulte Regiões e mecanismos de banco de dados do Aurora compatíveis com Integrações ETL zero.
Ao criar o banco de dados, em Configuração adicional, altere o grupo de parâmetros de cluster de banco de dados padrão para o grupo de parâmetros personalizado que você criou na etapa anterior.
Etapa 3a: criar um data warehouse de destino
Depois de criar o cluster de banco de dados, será necessário criar e configurar um data warehouse de destino. O data warehouse deve cumprir os seguintes requisitos:
Para obter instruções sobre como criar um data warehouse, consulte Criar um cluster para clusters provisionados ou Criar um grupo de trabalho com um namespace para o Redshift Serverless.
Ative a distinção entre maiúsculas e minúsculas no data warehouse
Para que a integração seja bem-sucedida, o parâmetro de diferenciação de maiúsculas e minúsculas (enable_case_sensitive_identifier) deve estar ativado para o data warehouse. Por padrão, a distinção entre maiúsculas e minúsculas é desativada em todos os clusters provisionados e grupos de trabalho do Redshift Serverless.
Para ativar a distinção entre maiúsculas e minúsculas, execute as seguintes etapas, dependendo do tipo de data warehouse:
-
Cluster provisionado: para ativar a distinção entre maiúsculas e minúsculas em um cluster provisionado, crie um grupo de parâmetros personalizado com o parâmetro enable_case_sensitive_identifier ativado. Em seguida, associe o grupo de parâmetros ao cluster. Para obter instruções, consulte Gerenciar grupos de parâmetros usando o console ou Configurar valores de parâmetros usando a AWS CLI.
Lembre-se de reinicializar o cluster depois de associar o grupo de parâmetros personalizado a ele.
-
Grupo de trabalho de tecnologia sem servidor: para ativar a distinção entre maiúsculas e minúsculas em um grupo de trabalho do Redshift Serverless, você deve usar a AWS CLI. Atualmente, o console do Amazon Redshift não é compatível com a modificação dos valores dos parâmetros do Redshift Serverless. Envie a seguinte solicitação de atualização do grupo de trabalho:
aws redshift-serverless update-workgroup \
--workgroup-name target-workgroup \
--config-parameters parameterKey=enable_case_sensitive_identifier,parameterValue=true
Não é necessário reinicializar um grupo de trabalho após modificar seus valores de parâmetros.
Configurar a autorização para o data warehouse
Depois de criar um data warehouse, você deve configurar o cluster de banco de dados do Aurora de origem como uma origem de integração autorizada. Para obter instruções, consulte Configurar a autorização para o data warehouse do Amazon Redshift.
Configurar uma integração usando SDKs da AWS
Em vez de configurar cada recurso manualmente, é possível executar o script Python a seguir para configurar automaticamente os recursos necessários. O exemplo de código usa o AWS SDK para Python (Boto3) para criar um cluster de banco de dados do Amazon Aurora de origem e um data warehouse de destino, cada um com os valores de parâmetro necessários. Depois, ele espera que os bancos de dados estejam disponíveis antes de criar uma integração ETL zero entre eles. É possível comentar diferentes funções, dependendo dos recursos que você precisa configurar.
Execute os comandos a seguir para instalar as dependências necessárias:
pip install boto3
pip install time
No script, modifique opcionalmente os nomes dos grupos de parâmetros, a origem e o destino. A função final cria uma integração denominada my-integration após a configuração dos recursos.
- Aurora MySQL
-
import boto3
import time
# Build the client using the default credential configuration.
# You can use the CLI and run 'aws configure' to set access key, secret
# key, and default Region.
rds = boto3.client('rds')
redshift = boto3.client('redshift')
sts = boto3.client('sts')
source_cluster_name = 'my-source-cluster' # A name for the source cluster
source_param_group_name = 'my-source-param-group' # A name for the source parameter group
target_cluster_name = 'my-target-cluster' # A name for the target cluster
target_param_group_name = 'my-target-param-group' # A name for the target parameter group
def create_source_cluster(*args):
"""Creates a source Aurora MySQL DB cluster"""
response = rds.create_db_cluster_parameter_group(
DBClusterParameterGroupName=source_param_group_name,
DBParameterGroupFamily='aurora-mysql8.0',
Description='For Aurora MySQL binary logging'
)
print('Created source parameter group: ' + response['DBClusterParameterGroup']['DBClusterParameterGroupName'])
response = rds.modify_db_cluster_parameter_group(
DBClusterParameterGroupName=source_param_group_name,
Parameters=[
{
'ParameterName': 'aurora_enhanced_binlog',
'ParameterValue': '1',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'binlog_backup',
'ParameterValue': '0',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'binlog_format',
'ParameterValue': 'ROW',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'binlog_replication_globaldb',
'ParameterValue': '0',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'binlog_row_image',
'ParameterValue': 'full',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'binlog_row_metadata',
'ParameterValue': 'full',
'ApplyMethod': 'pending-reboot'
}
]
)
print('Modified source parameter group: ' + response['DBClusterParameterGroupName'])
response = rds.create_db_cluster(
DBClusterIdentifier=source_cluster_name,
DBClusterParameterGroupName=source_param_group_name,
Engine='aurora-mysql',
EngineVersion='8.0.mysql_aurora.3.05.2',
DatabaseName='myauroradb',
MasterUsername='username',
MasterUserPassword='Password01**'
)
print('Creating source cluster: ' + response['DBCluster']['DBClusterIdentifier'])
source_arn = (response['DBCluster']['DBClusterArn'])
create_target_cluster(target_cluster_name, source_arn, target_param_group_name)
response = rds.create_db_instance(
DBInstanceClass='db.r6g.2xlarge',
DBClusterIdentifier=source_cluster_name,
DBInstanceIdentifier=source_cluster_name + '-instance',
Engine='aurora-mysql'
)
return(response)
def create_target_cluster(target_cluster_name, source_arn, target_param_group_name):
"""Creates a target Redshift cluster"""
response = redshift.create_cluster_parameter_group(
ParameterGroupName=target_param_group_name,
ParameterGroupFamily='redshift-1.0',
Description='For Aurora MySQL zero-ETL integrations'
)
print('Created target parameter group: ' + response['ClusterParameterGroup']['ParameterGroupName'])
response = redshift.modify_cluster_parameter_group(
ParameterGroupName=target_param_group_name,
Parameters=[
{
'ParameterName': 'enable_case_sensitive_identifier',
'ParameterValue': 'true'
}
]
)
print('Modified target parameter group: ' + response['ParameterGroupName'])
response = redshift.create_cluster(
ClusterIdentifier=target_cluster_name,
NodeType='ra3.4xlarge',
NumberOfNodes=2,
Encrypted=True,
MasterUsername='username',
MasterUserPassword='Password01**',
ClusterParameterGroupName=target_param_group_name
)
print('Creating target cluster: ' + response['Cluster']['ClusterIdentifier'])
# Retrieve the target cluster ARN
response = redshift.describe_clusters(
ClusterIdentifier=target_cluster_name
)
target_arn = response['Clusters'][0]['ClusterNamespaceArn']
# Retrieve the current user's account ID
response = sts.get_caller_identity()
account_id = response['Account']
# Create a resource policy specifying cluster ARN and account ID
response = redshift.put_resource_policy(
ResourceArn=target_arn,
Policy='''
{
\"Version\":\"2012-10-17\",
\"Statement\":[
{\"Effect\":\"Allow\",
\"Principal\":{
\"Service\":\"redshift.amazonaws.com\"
},
\"Action\":[\"redshift:AuthorizeInboundIntegration\"],
\"Condition\":{
\"StringEquals\":{
\"aws:SourceArn\":\"%s\"}
}
},
{\"Effect\":\"Allow\",
\"Principal\":{
\"AWS\":\"arn:aws:iam::%s:root\"},
\"Action\":\"redshift:CreateInboundIntegration\"}
]
}
''' % (source_arn, account_id)
)
return(response)
def wait_for_cluster_availability(*args):
"""Waits for both clusters to be available"""
print('Waiting for clusters to be available...')
response = rds.describe_db_clusters(
DBClusterIdentifier=source_cluster_name
)
source_status = response['DBClusters'][0]['Status']
source_arn = response['DBClusters'][0]['DBClusterArn']
response = rds.describe_db_instances(
DBInstanceIdentifier=source_cluster_name + '-instance'
)
source_instance_status = response['DBInstances'][0]['DBInstanceStatus']
response = redshift.describe_clusters(
ClusterIdentifier=target_cluster_name
)
target_status = response['Clusters'][0]['ClusterStatus']
target_arn = response['Clusters'][0]['ClusterNamespaceArn']
# Every 60 seconds, check whether the clusters are available.
if source_status != 'available' or target_status != 'available' or source_instance_status != 'available':
time.sleep(60)
response = wait_for_cluster_availability(
source_cluster_name, target_cluster_name)
else:
print('Clusters available. Ready to create zero-ETL integration.')
create_integration(source_arn, target_arn)
return
def create_integration(source_arn, target_arn):
"""Creates a zero-ETL integration using the source and target clusters"""
response = rds.create_integration(
SourceArn=source_arn,
TargetArn=target_arn,
IntegrationName='my-integration'
)
print('Creating integration: ' + response['IntegrationName'])
def main():
"""main function"""
create_source_cluster(source_cluster_name, source_param_group_name)
wait_for_cluster_availability(source_cluster_name, target_cluster_name)
if __name__ == "__main__":
main()
- Aurora PostgreSQL
-
import boto3
import time
# Build the client using the default credential configuration.
# You can use the CLI and run 'aws configure' to set access key, secret
# key, and default Region.
rds = boto3.client('rds')
redshift = boto3.client('redshift')
sts = boto3.client('sts')
source_cluster_name = 'my-source-cluster' # A name for the source cluster
source_param_group_name = 'my-source-param-group' # A name for the source parameter group
target_cluster_name = 'my-target-cluster' # A name for the target cluster
target_param_group_name = 'my-target-param-group' # A name for the target parameter group
def create_source_cluster(*args):
"""Creates a source Aurora PostgreSQL DB cluster"""
response = rds.create_db_cluster_parameter_group(
DBClusterParameterGroupName=source_param_group_name,
DBParameterGroupFamily='aurora-postgresql16',
Description='For Aurora PostgreSQL logical replication'
)
print('Created source parameter group: ' + response['DBClusterParameterGroup']['DBClusterParameterGroupName'])
response = rds.modify_db_cluster_parameter_group(
DBClusterParameterGroupName=source_param_group_name,
Parameters=[
{
'ParameterName': 'rds.logical_replication',
'ParameterValue': '1',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'aurora.enhanced_logical_replication',
'ParameterValue': '1',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'aurora.logical_replication_backup',
'ParameterValue': '0',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'aurora.logical_replication_globaldb',
'ParameterValue': '0',
'ApplyMethod': 'pending-reboot'
}
]
)
print('Modified source parameter group: ' + response['DBClusterParameterGroupName'])
response = rds.create_db_cluster(
DBClusterIdentifier=source_cluster_name,
DBClusterParameterGroupName=source_param_group_name,
Engine='aurora-postgresql',
EngineVersion='16.4.aurora-postgresql',
DatabaseName='mypostgresdb',
MasterUsername='username',
MasterUserPassword='Password01**'
)
print('Creating source cluster: ' + response['DBCluster']['DBClusterIdentifier'])
source_arn = (response['DBCluster']['DBClusterArn'])
create_target_cluster(target_cluster_name, source_arn, target_param_group_name)
response = rds.create_db_instance(
DBInstanceClass='db.r6g.2xlarge',
DBClusterIdentifier=source_cluster_name,
DBInstanceIdentifier=source_cluster_name + '-instance',
Engine='aurora-postgresql'
)
return(response)
def create_target_cluster(target_cluster_name, source_arn, target_param_group_name):
"""Creates a target Redshift cluster"""
response = redshift.create_cluster_parameter_group(
ParameterGroupName=target_param_group_name,
ParameterGroupFamily='redshift-1.0',
Description='For Aurora PostgreSQL zero-ETL integrations'
)
print('Created target parameter group: ' + response['ClusterParameterGroup']['ParameterGroupName'])
response = redshift.modify_cluster_parameter_group(
ParameterGroupName=target_param_group_name,
Parameters=[
{
'ParameterName': 'enable_case_sensitive_identifier',
'ParameterValue': 'true'
}
]
)
print('Modified target parameter group: ' + response['ParameterGroupName'])
response = redshift.create_cluster(
ClusterIdentifier=target_cluster_name,
NodeType='ra3.4xlarge',
NumberOfNodes=2,
Encrypted=True,
MasterUsername='username',
MasterUserPassword='Password01**',
ClusterParameterGroupName=target_param_group_name
)
print('Creating target cluster: ' + response['Cluster']['ClusterIdentifier'])
# Retrieve the target cluster ARN
response = redshift.describe_clusters(
ClusterIdentifier=target_cluster_name
)
target_arn = response['Clusters'][0]['ClusterNamespaceArn']
# Retrieve the current user's account ID
response = sts.get_caller_identity()
account_id = response['Account']
# Create a resource policy specifying cluster ARN and account ID
response = redshift.put_resource_policy(
ResourceArn=target_arn,
Policy='''
{
\"Version\":\"2012-10-17\",
\"Statement\":[
{\"Effect\":\"Allow\",
\"Principal\":{
\"Service\":\"redshift.amazonaws.com\"
},
\"Action\":[\"redshift:AuthorizeInboundIntegration\"],
\"Condition\":{
\"StringEquals\":{
\"aws:SourceArn\":\"%s\"}
}
},
{\"Effect\":\"Allow\",
\"Principal\":{
\"AWS\":\"arn:aws:iam::%s:root\"},
\"Action\":\"redshift:CreateInboundIntegration\"}
]
}
''' % (source_arn, account_id)
)
return(response)
def wait_for_cluster_availability(*args):
"""Waits for both clusters to be available"""
print('Waiting for clusters to be available...')
response = rds.describe_db_clusters(
DBClusterIdentifier=source_cluster_name
)
source_status = response['DBClusters'][0]['Status']
source_arn = response['DBClusters'][0]['DBClusterArn']
response = rds.describe_db_instances(
DBInstanceIdentifier=source_cluster_name + '-instance'
)
source_instance_status = response['DBInstances'][0]['DBInstanceStatus']
response = redshift.describe_clusters(
ClusterIdentifier=target_cluster_name
)
target_status = response['Clusters'][0]['ClusterStatus']
target_arn = response['Clusters'][0]['ClusterNamespaceArn']
# Every 60 seconds, check whether the clusters are available.
if source_status != 'available' or target_status != 'available' or source_instance_status != 'available':
time.sleep(60)
response = wait_for_cluster_availability(
source_cluster_name, target_cluster_name)
else:
print('Clusters available. Ready to create zero-ETL integration.')
create_integration(source_arn, target_arn)
return
def create_integration(source_arn, target_arn):
"""Creates a zero-ETL integration using the source and target clusters"""
response = rds.create_integration(
SourceArn=source_arn,
TargetArn=target_arn,
IntegrationName='my-integration'
)
print('Creating integration: ' + response['IntegrationName'])
def main():
"""main function"""
create_source_cluster(source_cluster_name, source_param_group_name)
wait_for_cluster_availability(source_cluster_name, target_cluster_name)
if __name__ == "__main__":
main()
Etapa 3b: criar um catálogo do AWS Glue para uma Integração ETL zero do Amazon SageMaker AI
Ao criar uma Integração ETL zero com um lakehouse do Amazon SageMaker AI, você deve criar um catálogo gerenciado pelo AWS Glue no AWS Lake Formation. O catálogo de destino deve ser um catálogo gerenciado pelo Amazon Redshift. Para criar um catálogo gerenciado pelo Amazon Redshift, primeiro crie um perfil vinculado ao serviço AWSServiceRoleForRedshift. No console do Lake Formation, adicione o AWSServiceRoleForRedshift como administrador somente para leitura.
Para ter mais informações sobre as tarefas anteriores, consulte os tópicos a seguir.
Configurar permissões para o catálogo do AWS Glue
Antes de criar um catálogo de destino para uma Integração ETL zero, você deve criar o perfil de criação de destino do Lake Formation e o perfil de transferência de dados do AWS Glue. Use o perfil de criação de destino do Lake Formation para criar o catálogo de destino. Ao criar o catálogo de destino, insira um perfil de transferência de dados do Glue no campo Perfil do IAM na seção Acesso por meio de mecanismos.
O perfil de criação de destino deve ser administrador do Lake Formation e requer as permissões a seguir.
- JSON
-
-
{
"Version":"2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": "lakeformation:RegisterResource",
"Resource": "*"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"s3:PutEncryptionConfiguration",
"iam:PassRole",
"glue:CreateCatalog",
"glue:GetCatalog",
"s3:PutBucketTagging",
"s3:PutLifecycleConfiguration",
"s3:PutBucketPolicy",
"s3:CreateBucket",
"redshift-serverless:CreateNamespace",
"s3:DeleteBucket",
"s3:PutBucketVersioning",
"redshift-serverless:CreateWorkgroup"
],
"Resource": [
"arn:aws:glue:*:111122223333:catalog",
"arn:aws:glue:*:111122223333:catalog/*",
"arn:aws:s3:::*",
"arn:aws:redshift-serverless:*:111122223333:workgroup/*",
"arn:aws:redshift-serverless:*:111122223333:namespace/*",
"arn:aws:iam::111122223333:role/GlueDataCatalogDataTransferRole"
]
}
]
}
O perfil de criação de destino deve ter a relação de confiança a seguir.
- JSON
-
-
{
"Version":"2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "glue.amazonaws.com"
},
"Action": "sts:AssumeRole"
},
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::111122223333:user/Username"
},
"Action": "sts:AssumeRole"
}
]
}
O perfil de transferência de dados do Glue é necessário para operações de catálogo do MySQL e deve ter as permissões a seguir.
- JSON
-
-
{
"Version":"2012-10-17",
"Statement": [
{
"Sid": "DataTransferRolePolicy",
"Effect": "Allow",
"Action": [
"kms:GenerateDataKey",
"kms:Decrypt",
"glue:GetCatalog",
"glue:GetDatabase"
],
"Resource": [
"*"
]
}
]
}
O perfil de transferência de dados do Glue deve ter a relação de confiança a seguir.
- JSON
-
-
{
"Version":"2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": [
"glue.amazonaws.com",
"redshift.amazonaws.com"
]
},
"Action": "sts:AssumeRole"
}
]
}
Próximas etapas
Com um cluster de banco de dados do Aurora de origem e um data warehouse de destino do Amazon Redshift ou um lakehouse do Amazon SageMaker AI, você pode criar uma Integração ETL zero e começar a replicar dados. Para instruções, consulte Criar integrações ETL zero do Amazon Aurora com o Amazon Redshift.