Aurora零 ETL 整合入門 - Amazon Aurora

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Aurora零 ETL 整合入門

建立零 ETL 整合之前,請使用必要的參數和許可來設定 Aurora 資料庫叢集和資料倉儲。安裝期間,您將完成以下步驟:

完成這些任務後,請繼續 建立與 Amazon Redshift 的 Aurora 零 ETL 整合建立與 Lakehouse 的 Aurora零 ETL Amazon SageMaker 整合

您可以使用 AWS SDKs 為您自動化設定程序。如需詳細資訊,請參閱使用 AWS SDKs設定整合

提示

您可以在建立整合時讓 RDS 為您完成這些設定步驟,而不是手動執行這些步驟。若要立即開始建立整合,請參閱 建立與 Amazon Redshift 的 Aurora 零 ETL 整合

對於步驟 3,您可以選擇根據您的需求建立目標資料倉儲 (步驟 3a) 或目標湖房 (步驟 3b):

  • 如果您需要具有 SQL 型分析的傳統資料倉儲功能,請選擇資料倉儲。

  • 如果您需要機器學習功能,並想要將Amazon SageMaker湖房功能用於資料科學和 ML 工作流程,請選擇湖房。

步驟 1:建立自訂資料庫叢集參數群組。

Aurora 零 ETL 整合需要控制複寫之資料庫叢集參數的特定值。具體而言,Aurora MySQL 需要增強型 Binlog (aurora_enhanced_binlog),而 Aurora PostgreSQL 需要增強型邏輯複寫 (aurora.enhanced_logical_replication)。

若要設定二進位記錄或邏輯複寫,您必須先建立自訂資料庫叢集參數群組,然後將其與來源資料庫叢集建立關聯。

Aurora MySQL (aurora-mysql8.0 系列)

  • aurora_enhanced_binlog=1

  • binlog_backup=0

  • binlog_format=ROW

  • binlog_replication_globaldb=0

  • binlog_row_image=full

  • binlog_row_metadata=full

此外,請確定 binlog_transaction_compression 參數設定為 ON,且 binlog_row_value_options 參數設定為 PARTIAL_JSON

如需 Aurora MySQL 增強型 binlog 的詳細資訊,請參閱 設定 Aurora MySQL 的增強型 binlog

Aurora PostgreSQL (aurora-postgresql16 系列):

  • rds.logical_replication=1

  • aurora.enhanced_logical_replication=1

  • aurora.logical_replication_backup=0

  • aurora.logical_replication_globaldb=0

即使REPLICA IDENTITY FULL未啟用,啟用增強型邏輯複寫 (aurora.enhanced_logical_replication) 仍會一律將所有資料欄值寫入預寫日誌 (WAL)。這可能會增加來源資料庫叢集的 IOPS。

重要

如果您啟用或停用aurora.enhanced_logical_replication資料庫叢集參數,主要資料庫執行個體會讓所有邏輯複寫槽失效。這會停止從來源到目標的複寫,而且您必須在主要資料庫執行個體上重新建立複寫槽。為了防止中斷,請在複寫期間保持參數狀態一致。

步驟 2:選取或建立來源資料庫叢集

建立自訂資料庫叢集參數群組之後,請選擇或建立 Aurora 資料庫叢集。此叢集將是資料複寫至目標資料倉儲的來源。您可以指定使用佈建資料庫執行個體或資料庫執行個體做為來源的Aurora Serverless v2資料庫叢集。如需建立資料庫叢集的指示,請參閱 建立 Amazon Aurora 資料庫叢集建立使用 的資料庫叢集 Aurora Serverless v2

資料庫必須執行支援的資料庫引擎版本。如需支援的版本的清單,請參閱零 ETL 整合支援的 區域和 Aurora 資料庫引擎

當您建立資料庫時,在其他組態下,將預設資料庫叢集參數群組變更為您在上一個步驟中建立的自訂參數群組。

注意

如果您在建立叢集之後將參數群組與資料庫叢集建立關聯,則必須先重新啟動叢集中的主要資料庫執行個體,才能套用變更,才能建立零 ETL 整合。如需相關指示,請參閱重新啟動 Amazon Aurora 資料庫叢集或 Amazon Aurora 資料庫執行個體

步驟 3a:建立目標資料倉儲

建立來源資料庫叢集之後,您必須建立和設定目標資料倉儲。資料倉儲必須符合下列需求:

  • 使用具有至少兩個節點的 RA3 節點類型,或 Redshift Serverless。

  • 已加密 (如果使用已佈建的叢集)。如需詳細資訊,請參閱 Amazon Redshift 資料庫加密

如需建立資料倉儲的指示,請參閱建立叢集 (適用於佈建的叢集),或使用命名空間建立工作群組 (適用於 Redshift Serverless)。

在資料倉儲上啟用區分大小寫

若要成功整合,必須為資料倉儲啟用區分大小寫參數 (enable_case_sensitive_identifier)。依預設,所有佈建的叢集和 Redshift Serverless 工作群組上都會停用區分大小寫。

若要啟用區分大小寫,請根據您的資料倉儲類型執行下列步驟:

  • 佈建的叢集 – 若要在佈建的叢集上啟用區分大小寫,請建立已啟用 enable_case_sensitive_identifier 參數的自訂參數群組。接著,將該參數群組與叢集建立關聯。如需指示,請參閱使用主控台管理參數群組使用 AWS CLI設定參數值

    注意

    在將自訂參數群組與叢集建立關聯之後,請記得重新啟動該叢集。

  • 無伺服器工作群組 - 若要在 Redshift Serverless 工作群組上啟用區分大小寫,您必須使用 AWS CLI。Amazon Redshift 主控台目前不支援修改 Redshift Serverless 參數值。傳送下列 update-workgroup 請求:

    aws redshift-serverless update-workgroup \ --workgroup-name target-workgroup \ --config-parameters parameterKey=enable_case_sensitive_identifier,parameterValue=true

    在修改工作群組的參數值之後,您不需要重新啟動該工作群組。

設定資料倉儲的授權

建立資料倉儲之後,您必須將來源 Aurora 資料庫叢集設定為授權的整合來源。如需指示,請參閱設定 Amazon Redshift 資料倉儲的授權

使用 AWS SDKs設定整合

您可以執行下列 Python 指令碼來自動為您設定所需的資源,而不是手動設定每個資源。程式碼範例使用 適用於 Python (Boto3) 的 AWS SDK建立來源 Amazon Aurora 資料庫叢集和目標資料倉儲,每個都具有必要的參數值。然後,在資料庫之間建立零 ETL 整合之前,它會等待資料庫可用。您可以根據您需要設定的資源來註解不同的函數。

若要安裝所需的相依性,請執行下列命令:

pip install boto3 pip install time

在指令碼中,選擇性地修改來源、目標和參數群組的名稱。最終函數會在設定資源my-integration後建立名為 的整合。

Aurora MySQL
import boto3 import time # Build the client using the default credential configuration. # You can use the CLI and run 'aws configure' to set access key, secret # key, and default Region. rds = boto3.client('rds') redshift = boto3.client('redshift') sts = boto3.client('sts') source_cluster_name = 'my-source-cluster' # A name for the source cluster source_param_group_name = 'my-source-param-group' # A name for the source parameter group target_cluster_name = 'my-target-cluster' # A name for the target cluster target_param_group_name = 'my-target-param-group' # A name for the target parameter group def create_source_cluster(*args): """Creates a source Aurora MySQL DB cluster""" response = rds.create_db_cluster_parameter_group( DBClusterParameterGroupName=source_param_group_name, DBParameterGroupFamily='aurora-mysql8.0', Description='For Aurora MySQL binary logging' ) print('Created source parameter group: ' + response['DBClusterParameterGroup']['DBClusterParameterGroupName']) response = rds.modify_db_cluster_parameter_group( DBClusterParameterGroupName=source_param_group_name, Parameters=[ { 'ParameterName': 'aurora_enhanced_binlog', 'ParameterValue': '1', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_backup', 'ParameterValue': '0', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_format', 'ParameterValue': 'ROW', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_replication_globaldb', 'ParameterValue': '0', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_row_image', 'ParameterValue': 'full', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_row_metadata', 'ParameterValue': 'full', 'ApplyMethod': 'pending-reboot' } ] ) print('Modified source parameter group: ' + response['DBClusterParameterGroupName']) response = rds.create_db_cluster( DBClusterIdentifier=source_cluster_name, DBClusterParameterGroupName=source_param_group_name, Engine='aurora-mysql', EngineVersion='8.0.mysql_aurora.3.05.2', DatabaseName='myauroradb', MasterUsername='username', MasterUserPassword='Password01**' ) print('Creating source cluster: ' + response['DBCluster']['DBClusterIdentifier']) source_arn = (response['DBCluster']['DBClusterArn']) create_target_cluster(target_cluster_name, source_arn, target_param_group_name) response = rds.create_db_instance( DBInstanceClass='db.r6g.2xlarge', DBClusterIdentifier=source_cluster_name, DBInstanceIdentifier=source_cluster_name + '-instance', Engine='aurora-mysql' ) return(response) def create_target_cluster(target_cluster_name, source_arn, target_param_group_name): """Creates a target Redshift cluster""" response = redshift.create_cluster_parameter_group( ParameterGroupName=target_param_group_name, ParameterGroupFamily='redshift-1.0', Description='For Aurora MySQL zero-ETL integrations' ) print('Created target parameter group: ' + response['ClusterParameterGroup']['ParameterGroupName']) response = redshift.modify_cluster_parameter_group( ParameterGroupName=target_param_group_name, Parameters=[ { 'ParameterName': 'enable_case_sensitive_identifier', 'ParameterValue': 'true' } ] ) print('Modified target parameter group: ' + response['ParameterGroupName']) response = redshift.create_cluster( ClusterIdentifier=target_cluster_name, NodeType='ra3.4xlarge', NumberOfNodes=2, Encrypted=True, MasterUsername='username', MasterUserPassword='Password01**', ClusterParameterGroupName=target_param_group_name ) print('Creating target cluster: ' + response['Cluster']['ClusterIdentifier']) # Retrieve the target cluster ARN response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Retrieve the current user's account ID response = sts.get_caller_identity() account_id = response['Account'] # Create a resource policy specifying cluster ARN and account ID response = redshift.put_resource_policy( ResourceArn=target_arn, Policy=''' { \"Version\":\"2012-10-17\", \"Statement\":[ {\"Effect\":\"Allow\", \"Principal\":{ \"Service\":\"redshift.amazonaws.com\" }, \"Action\":[\"redshift:AuthorizeInboundIntegration\"], \"Condition\":{ \"StringEquals\":{ \"aws:SourceArn\":\"%s\"} } }, {\"Effect\":\"Allow\", \"Principal\":{ \"AWS\":\"arn:aws:iam::%s:root\"}, \"Action\":\"redshift:CreateInboundIntegration\"} ] } ''' % (source_arn, account_id) ) return(response) def wait_for_cluster_availability(*args): """Waits for both clusters to be available""" print('Waiting for clusters to be available...') response = rds.describe_db_clusters( DBClusterIdentifier=source_cluster_name ) source_status = response['DBClusters'][0]['Status'] source_arn = response['DBClusters'][0]['DBClusterArn'] response = rds.describe_db_instances( DBInstanceIdentifier=source_cluster_name + '-instance' ) source_instance_status = response['DBInstances'][0]['DBInstanceStatus'] response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_status = response['Clusters'][0]['ClusterStatus'] target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Every 60 seconds, check whether the clusters are available. if source_status != 'available' or target_status != 'available' or source_instance_status != 'available': time.sleep(60) response = wait_for_cluster_availability( source_cluster_name, target_cluster_name) else: print('Clusters available. Ready to create zero-ETL integration.') create_integration(source_arn, target_arn) return def create_integration(source_arn, target_arn): """Creates a zero-ETL integration using the source and target clusters""" response = rds.create_integration( SourceArn=source_arn, TargetArn=target_arn, IntegrationName='my-integration' ) print('Creating integration: ' + response['IntegrationName']) def main(): """main function""" create_source_cluster(source_cluster_name, source_param_group_name) wait_for_cluster_availability(source_cluster_name, target_cluster_name) if __name__ == "__main__": main()
Aurora PostgreSQL
import boto3 import time # Build the client using the default credential configuration. # You can use the CLI and run 'aws configure' to set access key, secret # key, and default Region. rds = boto3.client('rds') redshift = boto3.client('redshift') sts = boto3.client('sts') source_cluster_name = 'my-source-cluster' # A name for the source cluster source_param_group_name = 'my-source-param-group' # A name for the source parameter group target_cluster_name = 'my-target-cluster' # A name for the target cluster target_param_group_name = 'my-target-param-group' # A name for the target parameter group def create_source_cluster(*args): """Creates a source Aurora PostgreSQL DB cluster""" response = rds.create_db_cluster_parameter_group( DBClusterParameterGroupName=source_param_group_name, DBParameterGroupFamily='aurora-postgresql16', Description='For Aurora PostgreSQL logical replication' ) print('Created source parameter group: ' + response['DBClusterParameterGroup']['DBClusterParameterGroupName']) response = rds.modify_db_cluster_parameter_group( DBClusterParameterGroupName=source_param_group_name, Parameters=[ { 'ParameterName': 'rds.logical_replication', 'ParameterValue': '1', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'aurora.enhanced_logical_replication', 'ParameterValue': '1', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'aurora.logical_replication_backup', 'ParameterValue': '0', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'aurora.logical_replication_globaldb', 'ParameterValue': '0', 'ApplyMethod': 'pending-reboot' } ] ) print('Modified source parameter group: ' + response['DBClusterParameterGroupName']) response = rds.create_db_cluster( DBClusterIdentifier=source_cluster_name, DBClusterParameterGroupName=source_param_group_name, Engine='aurora-postgresql', EngineVersion='16.4.aurora-postgresql', DatabaseName='mypostgresdb', MasterUsername='username', MasterUserPassword='Password01**' ) print('Creating source cluster: ' + response['DBCluster']['DBClusterIdentifier']) source_arn = (response['DBCluster']['DBClusterArn']) create_target_cluster(target_cluster_name, source_arn, target_param_group_name) response = rds.create_db_instance( DBInstanceClass='db.r6g.2xlarge', DBClusterIdentifier=source_cluster_name, DBInstanceIdentifier=source_cluster_name + '-instance', Engine='aurora-postgresql' ) return(response) def create_target_cluster(target_cluster_name, source_arn, target_param_group_name): """Creates a target Redshift cluster""" response = redshift.create_cluster_parameter_group( ParameterGroupName=target_param_group_name, ParameterGroupFamily='redshift-1.0', Description='For Aurora PostgreSQL zero-ETL integrations' ) print('Created target parameter group: ' + response['ClusterParameterGroup']['ParameterGroupName']) response = redshift.modify_cluster_parameter_group( ParameterGroupName=target_param_group_name, Parameters=[ { 'ParameterName': 'enable_case_sensitive_identifier', 'ParameterValue': 'true' } ] ) print('Modified target parameter group: ' + response['ParameterGroupName']) response = redshift.create_cluster( ClusterIdentifier=target_cluster_name, NodeType='ra3.4xlarge', NumberOfNodes=2, Encrypted=True, MasterUsername='username', MasterUserPassword='Password01**', ClusterParameterGroupName=target_param_group_name ) print('Creating target cluster: ' + response['Cluster']['ClusterIdentifier']) # Retrieve the target cluster ARN response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Retrieve the current user's account ID response = sts.get_caller_identity() account_id = response['Account'] # Create a resource policy specifying cluster ARN and account ID response = redshift.put_resource_policy( ResourceArn=target_arn, Policy=''' { \"Version\":\"2012-10-17\", \"Statement\":[ {\"Effect\":\"Allow\", \"Principal\":{ \"Service\":\"redshift.amazonaws.com\" }, \"Action\":[\"redshift:AuthorizeInboundIntegration\"], \"Condition\":{ \"StringEquals\":{ \"aws:SourceArn\":\"%s\"} } }, {\"Effect\":\"Allow\", \"Principal\":{ \"AWS\":\"arn:aws:iam::%s:root\"}, \"Action\":\"redshift:CreateInboundIntegration\"} ] } ''' % (source_arn, account_id) ) return(response) def wait_for_cluster_availability(*args): """Waits for both clusters to be available""" print('Waiting for clusters to be available...') response = rds.describe_db_clusters( DBClusterIdentifier=source_cluster_name ) source_status = response['DBClusters'][0]['Status'] source_arn = response['DBClusters'][0]['DBClusterArn'] response = rds.describe_db_instances( DBInstanceIdentifier=source_cluster_name + '-instance' ) source_instance_status = response['DBInstances'][0]['DBInstanceStatus'] response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_status = response['Clusters'][0]['ClusterStatus'] target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Every 60 seconds, check whether the clusters are available. if source_status != 'available' or target_status != 'available' or source_instance_status != 'available': time.sleep(60) response = wait_for_cluster_availability( source_cluster_name, target_cluster_name) else: print('Clusters available. Ready to create zero-ETL integration.') create_integration(source_arn, target_arn) return def create_integration(source_arn, target_arn): """Creates a zero-ETL integration using the source and target clusters""" response = rds.create_integration( SourceArn=source_arn, TargetArn=target_arn, IntegrationName='my-integration' ) print('Creating integration: ' + response['IntegrationName']) def main(): """main function""" create_source_cluster(source_cluster_name, source_param_group_name) wait_for_cluster_availability(source_cluster_name, target_cluster_name) if __name__ == "__main__": main()

步驟 3b:建立Amazon SageMaker零 ETL 整合的 AWS Glue 目錄

建立與 Lakehouse Amazon SageMaker 的零 ETL 整合時,您必須在其中建立 AWS Glue 受管目錄 AWS Lake Formation。目標目錄必須是 Amazon Redshift 受管目錄。若要建立 Amazon Redshift 受管目錄,請先建立AWSServiceRoleForRedshift服務連結角色。在 Lake Formation 主控台中,將 新增AWSServiceRoleForRedshift為唯讀管理員。

如需先前任務的詳細資訊,請參閱下列主題。

設定目標 AWS Glue 目錄的許可

在建立零 ETL 整合的目標目錄之前,您必須建立 Lake Formation 目標建立角色和 AWS Glue 資料傳輸角色。使用 Lake Formation 目標建立角色來建立目標目錄。建立目標目錄時,請在從引擎存取區段IAM 角色欄位中輸入 Glue 資料傳輸角色。

目標建立角色必須是 Lake Formation 管理員,且需要下列許可。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": "lakeformation:RegisterResource", "Resource": "*" }, { "Sid": "VisualEditor1", "Effect": "Allow", "Action": [ "s3:PutEncryptionConfiguration", "iam:PassRole", "glue:CreateCatalog", "glue:GetCatalog", "s3:PutBucketTagging", "s3:PutLifecycleConfiguration", "s3:PutBucketPolicy", "s3:CreateBucket", "redshift-serverless:CreateNamespace", "s3:DeleteBucket", "s3:PutBucketVersioning", "redshift-serverless:CreateWorkgroup" ], "Resource": [ "arn:aws:glue:*:account-id:catalog", "arn:aws:glue:*:account-id:catalog/*", "arn:aws:s3:::*", "arn:aws:redshift-serverless:*:account-id:workgroup/*", "arn:aws:redshift-serverless:*:account-id:namespace/*", "arn:aws:iam::account-id:role/GlueDataCatalogDataTransferRole" ] } ] }

目標建立角色必須具有下列信任關係。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "glue.amazonaws.com" }, "Action": "sts:AssumeRole" }, { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::account-id:user/Username" }, "Action": "sts:AssumeRole" } ] }

MySQL 目錄操作需要 Glue 資料傳輸角色,且必須具有下列許可。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "DataTransferRolePolicy", "Effect": "Allow", "Action": [ "kms:GenerateDataKey", "kms:Decrypt", "glue:GetCatalog", "glue:GetDatabase" ], "Resource": [ "*" ] } ] }

Glue 資料傳輸角色必須具有下列信任關係。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "glue.amazonaws.com", "redshift.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }

後續步驟

使用來源 Aurora 資料庫叢集和 Amazon Redshift Amazon SageMaker 目標資料倉儲或 Lakehouse,您可以建立零 ETL 整合並複寫資料。如需說明,請參閱建立與 Amazon Redshift 的 Aurora 零 ETL 整合