本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
Aurora零 ETL 整合入門
建立零 ETL 整合之前,請使用必要的參數和許可來設定 Aurora 資料庫叢集和資料倉儲。安裝期間,您將完成以下步驟:
完成這些任務後,請繼續 建立與 Amazon Redshift 的 Aurora 零 ETL 整合或 建立與 Lakehouse 的 Aurora零 ETL Amazon SageMaker 整合。
您可以使用 AWS SDKs 為您自動化設定程序。如需詳細資訊,請參閱使用 AWS SDKs設定整合。
對於步驟 3,您可以選擇根據您的需求建立目標資料倉儲 (步驟 3a) 或目標湖房 (步驟 3b):
步驟 1:建立自訂資料庫叢集參數群組。
Aurora 零 ETL 整合需要控制複寫之資料庫叢集參數的特定值。具體而言,Aurora MySQL 需要增強型 Binlog (aurora_enhanced_binlog
),而 Aurora PostgreSQL 需要增強型邏輯複寫 (aurora.enhanced_logical_replication
)。
若要設定二進位記錄或邏輯複寫,您必須先建立自訂資料庫叢集參數群組,然後將其與來源資料庫叢集建立關聯。
Aurora MySQL (aurora-mysql8.0 系列):
此外,請確定 binlog_transaction_compression
參數未設定為 ON
,且 binlog_row_value_options
參數未設定為 PARTIAL_JSON
。
如需 Aurora MySQL 增強型 binlog 的詳細資訊,請參閱 設定 Aurora MySQL 的增強型 binlog。
Aurora PostgreSQL (aurora-postgresql16 系列):
-
rds.logical_replication=1
-
aurora.enhanced_logical_replication=1
-
aurora.logical_replication_backup=0
-
aurora.logical_replication_globaldb=0
即使REPLICA IDENTITY FULL
未啟用,啟用增強型邏輯複寫 (aurora.enhanced_logical_replication
) 仍會一律將所有資料欄值寫入預寫日誌 (WAL)。這可能會增加來源資料庫叢集的 IOPS。
如果您啟用或停用aurora.enhanced_logical_replication
資料庫叢集參數,主要資料庫執行個體會讓所有邏輯複寫槽失效。這會停止從來源到目標的複寫,而且您必須在主要資料庫執行個體上重新建立複寫槽。為了防止中斷,請在複寫期間保持參數狀態一致。
步驟 2:選取或建立來源資料庫叢集
建立自訂資料庫叢集參數群組之後,請選擇或建立 Aurora 資料庫叢集。此叢集將是資料複寫至目標資料倉儲的來源。您可以指定使用佈建資料庫執行個體或資料庫執行個體做為來源的Aurora Serverless v2資料庫叢集。如需建立資料庫叢集的指示,請參閱 建立 Amazon Aurora 資料庫叢集或 建立使用 的資料庫叢集 Aurora Serverless v2。
資料庫必須執行支援的資料庫引擎版本。如需支援的版本的清單,請參閱零 ETL 整合支援的 區域和 Aurora 資料庫引擎。
當您建立資料庫時,在其他組態下,將預設資料庫叢集參數群組變更為您在上一個步驟中建立的自訂參數群組。
步驟 3a:建立目標資料倉儲
建立來源資料庫叢集之後,您必須建立和設定目標資料倉儲。資料倉儲必須符合下列需求:
如需建立資料倉儲的指示,請參閱建立叢集 (適用於佈建的叢集),或使用命名空間建立工作群組 (適用於 Redshift Serverless)。
在資料倉儲上啟用區分大小寫
若要成功整合,必須為資料倉儲啟用區分大小寫參數 (enable_case_sensitive_identifier
)。依預設,所有佈建的叢集和 Redshift Serverless 工作群組上都會停用區分大小寫。
若要啟用區分大小寫,請根據您的資料倉儲類型執行下列步驟:
-
佈建的叢集 – 若要在佈建的叢集上啟用區分大小寫,請建立已啟用 enable_case_sensitive_identifier
參數的自訂參數群組。接著,將該參數群組與叢集建立關聯。如需指示,請參閱使用主控台管理參數群組或使用 AWS CLI設定參數值。
在將自訂參數群組與叢集建立關聯之後,請記得重新啟動該叢集。
-
無伺服器工作群組 - 若要在 Redshift Serverless 工作群組上啟用區分大小寫,您必須使用 AWS CLI。Amazon Redshift 主控台目前不支援修改 Redshift Serverless 參數值。傳送下列 update-workgroup 請求:
aws redshift-serverless update-workgroup \
--workgroup-name target-workgroup
\
--config-parameters parameterKey=enable_case_sensitive_identifier,parameterValue=true
在修改工作群組的參數值之後,您不需要重新啟動該工作群組。
設定資料倉儲的授權
建立資料倉儲之後,您必須將來源 Aurora 資料庫叢集設定為授權的整合來源。如需指示,請參閱設定 Amazon Redshift 資料倉儲的授權。
使用 AWS SDKs設定整合
您可以執行下列 Python 指令碼來自動為您設定所需的資源,而不是手動設定每個資源。程式碼範例使用 適用於 Python (Boto3) 的 AWS SDK建立來源 Amazon Aurora 資料庫叢集和目標資料倉儲,每個都具有必要的參數值。然後,在資料庫之間建立零 ETL 整合之前,它會等待資料庫可用。您可以根據您需要設定的資源來註解不同的函數。
若要安裝所需的相依性,請執行下列命令:
pip install boto3
pip install time
在指令碼中,選擇性地修改來源、目標和參數群組的名稱。最終函數會在設定資源my-integration
後建立名為 的整合。
- Aurora MySQL
-
import boto3
import time
# Build the client using the default credential configuration.
# You can use the CLI and run 'aws configure' to set access key, secret
# key, and default Region.
rds = boto3.client('rds')
redshift = boto3.client('redshift')
sts = boto3.client('sts')
source_cluster_name = 'my-source-cluster' # A name for the source cluster
source_param_group_name = 'my-source-param-group' # A name for the source parameter group
target_cluster_name = 'my-target-cluster' # A name for the target cluster
target_param_group_name = 'my-target-param-group' # A name for the target parameter group
def create_source_cluster(*args):
"""Creates a source Aurora MySQL DB cluster"""
response = rds.create_db_cluster_parameter_group(
DBClusterParameterGroupName=source_param_group_name,
DBParameterGroupFamily='aurora-mysql8.0',
Description='For Aurora MySQL binary logging'
)
print('Created source parameter group: ' + response['DBClusterParameterGroup']['DBClusterParameterGroupName'])
response = rds.modify_db_cluster_parameter_group(
DBClusterParameterGroupName=source_param_group_name,
Parameters=[
{
'ParameterName': 'aurora_enhanced_binlog',
'ParameterValue': '1',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'binlog_backup',
'ParameterValue': '0',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'binlog_format',
'ParameterValue': 'ROW',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'binlog_replication_globaldb',
'ParameterValue': '0',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'binlog_row_image',
'ParameterValue': 'full',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'binlog_row_metadata',
'ParameterValue': 'full',
'ApplyMethod': 'pending-reboot'
}
]
)
print('Modified source parameter group: ' + response['DBClusterParameterGroupName'])
response = rds.create_db_cluster(
DBClusterIdentifier=source_cluster_name,
DBClusterParameterGroupName=source_param_group_name,
Engine='aurora-mysql',
EngineVersion='8.0.mysql_aurora.3.05.2',
DatabaseName='myauroradb',
MasterUsername='username
',
MasterUserPassword='Password01**
'
)
print('Creating source cluster: ' + response['DBCluster']['DBClusterIdentifier'])
source_arn = (response['DBCluster']['DBClusterArn'])
create_target_cluster(target_cluster_name, source_arn, target_param_group_name)
response = rds.create_db_instance(
DBInstanceClass='db.r6g.2xlarge',
DBClusterIdentifier=source_cluster_name,
DBInstanceIdentifier=source_cluster_name + '-instance',
Engine='aurora-mysql'
)
return(response)
def create_target_cluster(target_cluster_name, source_arn, target_param_group_name):
"""Creates a target Redshift cluster"""
response = redshift.create_cluster_parameter_group(
ParameterGroupName=target_param_group_name,
ParameterGroupFamily='redshift-1.0',
Description='For Aurora MySQL zero-ETL integrations'
)
print('Created target parameter group: ' + response['ClusterParameterGroup']['ParameterGroupName'])
response = redshift.modify_cluster_parameter_group(
ParameterGroupName=target_param_group_name,
Parameters=[
{
'ParameterName': 'enable_case_sensitive_identifier',
'ParameterValue': 'true'
}
]
)
print('Modified target parameter group: ' + response['ParameterGroupName'])
response = redshift.create_cluster(
ClusterIdentifier=target_cluster_name,
NodeType='ra3.4xlarge',
NumberOfNodes=2,
Encrypted=True,
MasterUsername='username
',
MasterUserPassword='Password01**
',
ClusterParameterGroupName=target_param_group_name
)
print('Creating target cluster: ' + response['Cluster']['ClusterIdentifier'])
# Retrieve the target cluster ARN
response = redshift.describe_clusters(
ClusterIdentifier=target_cluster_name
)
target_arn = response['Clusters'][0]['ClusterNamespaceArn']
# Retrieve the current user's account ID
response = sts.get_caller_identity()
account_id = response['Account']
# Create a resource policy specifying cluster ARN and account ID
response = redshift.put_resource_policy(
ResourceArn=target_arn,
Policy='''
{
\"Version\":\"2012-10-17\",
\"Statement\":[
{\"Effect\":\"Allow\",
\"Principal\":{
\"Service\":\"redshift.amazonaws.com\"
},
\"Action\":[\"redshift:AuthorizeInboundIntegration\"],
\"Condition\":{
\"StringEquals\":{
\"aws:SourceArn\":\"%s\"}
}
},
{\"Effect\":\"Allow\",
\"Principal\":{
\"AWS\":\"arn:aws:iam::%s:root\"},
\"Action\":\"redshift:CreateInboundIntegration\"}
]
}
''' % (source_arn, account_id)
)
return(response)
def wait_for_cluster_availability(*args):
"""Waits for both clusters to be available"""
print('Waiting for clusters to be available...')
response = rds.describe_db_clusters(
DBClusterIdentifier=source_cluster_name
)
source_status = response['DBClusters'][0]['Status']
source_arn = response['DBClusters'][0]['DBClusterArn']
response = rds.describe_db_instances(
DBInstanceIdentifier=source_cluster_name + '-instance'
)
source_instance_status = response['DBInstances'][0]['DBInstanceStatus']
response = redshift.describe_clusters(
ClusterIdentifier=target_cluster_name
)
target_status = response['Clusters'][0]['ClusterStatus']
target_arn = response['Clusters'][0]['ClusterNamespaceArn']
# Every 60 seconds, check whether the clusters are available.
if source_status != 'available' or target_status != 'available' or source_instance_status != 'available':
time.sleep(60)
response = wait_for_cluster_availability(
source_cluster_name, target_cluster_name)
else:
print('Clusters available. Ready to create zero-ETL integration.')
create_integration(source_arn, target_arn)
return
def create_integration(source_arn, target_arn):
"""Creates a zero-ETL integration using the source and target clusters"""
response = rds.create_integration(
SourceArn=source_arn,
TargetArn=target_arn,
IntegrationName='my-integration
'
)
print('Creating integration: ' + response['IntegrationName'])
def main():
"""main function"""
create_source_cluster(source_cluster_name, source_param_group_name)
wait_for_cluster_availability(source_cluster_name, target_cluster_name)
if __name__ == "__main__":
main()
- Aurora PostgreSQL
-
import boto3
import time
# Build the client using the default credential configuration.
# You can use the CLI and run 'aws configure' to set access key, secret
# key, and default Region.
rds = boto3.client('rds')
redshift = boto3.client('redshift')
sts = boto3.client('sts')
source_cluster_name = 'my-source-cluster' # A name for the source cluster
source_param_group_name = 'my-source-param-group' # A name for the source parameter group
target_cluster_name = 'my-target-cluster' # A name for the target cluster
target_param_group_name = 'my-target-param-group' # A name for the target parameter group
def create_source_cluster(*args):
"""Creates a source Aurora PostgreSQL DB cluster"""
response = rds.create_db_cluster_parameter_group(
DBClusterParameterGroupName=source_param_group_name,
DBParameterGroupFamily='aurora-postgresql16',
Description='For Aurora PostgreSQL logical replication'
)
print('Created source parameter group: ' + response['DBClusterParameterGroup']['DBClusterParameterGroupName'])
response = rds.modify_db_cluster_parameter_group(
DBClusterParameterGroupName=source_param_group_name,
Parameters=[
{
'ParameterName': 'rds.logical_replication',
'ParameterValue': '1',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'aurora.enhanced_logical_replication',
'ParameterValue': '1',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'aurora.logical_replication_backup',
'ParameterValue': '0',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'aurora.logical_replication_globaldb',
'ParameterValue': '0',
'ApplyMethod': 'pending-reboot'
}
]
)
print('Modified source parameter group: ' + response['DBClusterParameterGroupName'])
response = rds.create_db_cluster(
DBClusterIdentifier=source_cluster_name,
DBClusterParameterGroupName=source_param_group_name,
Engine='aurora-postgresql',
EngineVersion='16.4.aurora-postgresql',
DatabaseName='mypostgresdb',
MasterUsername='username
',
MasterUserPassword='Password01
**'
)
print('Creating source cluster: ' + response['DBCluster']['DBClusterIdentifier'])
source_arn = (response['DBCluster']['DBClusterArn'])
create_target_cluster(target_cluster_name, source_arn, target_param_group_name)
response = rds.create_db_instance(
DBInstanceClass='db.r6g.2xlarge',
DBClusterIdentifier=source_cluster_name,
DBInstanceIdentifier=source_cluster_name + '-instance',
Engine='aurora-postgresql'
)
return(response)
def create_target_cluster(target_cluster_name, source_arn, target_param_group_name):
"""Creates a target Redshift cluster"""
response = redshift.create_cluster_parameter_group(
ParameterGroupName=target_param_group_name,
ParameterGroupFamily='redshift-1.0',
Description='For Aurora PostgreSQL zero-ETL integrations'
)
print('Created target parameter group: ' + response['ClusterParameterGroup']['ParameterGroupName'])
response = redshift.modify_cluster_parameter_group(
ParameterGroupName=target_param_group_name,
Parameters=[
{
'ParameterName': 'enable_case_sensitive_identifier',
'ParameterValue': 'true'
}
]
)
print('Modified target parameter group: ' + response['ParameterGroupName'])
response = redshift.create_cluster(
ClusterIdentifier=target_cluster_name,
NodeType='ra3.4xlarge',
NumberOfNodes=2,
Encrypted=True,
MasterUsername='username
',
MasterUserPassword='Password01**
',
ClusterParameterGroupName=target_param_group_name
)
print('Creating target cluster: ' + response['Cluster']['ClusterIdentifier'])
# Retrieve the target cluster ARN
response = redshift.describe_clusters(
ClusterIdentifier=target_cluster_name
)
target_arn = response['Clusters'][0]['ClusterNamespaceArn']
# Retrieve the current user's account ID
response = sts.get_caller_identity()
account_id = response['Account']
# Create a resource policy specifying cluster ARN and account ID
response = redshift.put_resource_policy(
ResourceArn=target_arn,
Policy='''
{
\"Version\":\"2012-10-17\",
\"Statement\":[
{\"Effect\":\"Allow\",
\"Principal\":{
\"Service\":\"redshift.amazonaws.com\"
},
\"Action\":[\"redshift:AuthorizeInboundIntegration\"],
\"Condition\":{
\"StringEquals\":{
\"aws:SourceArn\":\"%s\"}
}
},
{\"Effect\":\"Allow\",
\"Principal\":{
\"AWS\":\"arn:aws:iam::%s:root\"},
\"Action\":\"redshift:CreateInboundIntegration\"}
]
}
''' % (source_arn, account_id)
)
return(response)
def wait_for_cluster_availability(*args):
"""Waits for both clusters to be available"""
print('Waiting for clusters to be available...')
response = rds.describe_db_clusters(
DBClusterIdentifier=source_cluster_name
)
source_status = response['DBClusters'][0]['Status']
source_arn = response['DBClusters'][0]['DBClusterArn']
response = rds.describe_db_instances(
DBInstanceIdentifier=source_cluster_name + '-instance'
)
source_instance_status = response['DBInstances'][0]['DBInstanceStatus']
response = redshift.describe_clusters(
ClusterIdentifier=target_cluster_name
)
target_status = response['Clusters'][0]['ClusterStatus']
target_arn = response['Clusters'][0]['ClusterNamespaceArn']
# Every 60 seconds, check whether the clusters are available.
if source_status != 'available' or target_status != 'available' or source_instance_status != 'available':
time.sleep(60)
response = wait_for_cluster_availability(
source_cluster_name, target_cluster_name)
else:
print('Clusters available. Ready to create zero-ETL integration.')
create_integration(source_arn, target_arn)
return
def create_integration(source_arn, target_arn):
"""Creates a zero-ETL integration using the source and target clusters"""
response = rds.create_integration(
SourceArn=source_arn,
TargetArn=target_arn,
IntegrationName='my-integration
'
)
print('Creating integration: ' + response['IntegrationName'])
def main():
"""main function"""
create_source_cluster(source_cluster_name, source_param_group_name)
wait_for_cluster_availability(source_cluster_name, target_cluster_name)
if __name__ == "__main__":
main()
步驟 3b:建立Amazon SageMaker零 ETL 整合的 AWS Glue 目錄
建立與 Lakehouse Amazon SageMaker 的零 ETL 整合時,您必須在其中建立 AWS Glue 受管目錄 AWS Lake Formation。目標目錄必須是 Amazon Redshift 受管目錄。若要建立 Amazon Redshift 受管目錄,請先建立AWSServiceRoleForRedshift
服務連結角色。在 Lake Formation 主控台中,將 新增AWSServiceRoleForRedshift
為唯讀管理員。
如需先前任務的詳細資訊,請參閱下列主題。
設定目標 AWS Glue 目錄的許可
在建立零 ETL 整合的目標目錄之前,您必須建立 Lake Formation 目標建立角色和 AWS Glue 資料傳輸角色。使用 Lake Formation 目標建立角色來建立目標目錄。建立目標目錄時,請在從引擎存取區段的 IAM 角色欄位中輸入 Glue 資料傳輸角色。
目標建立角色必須是 Lake Formation 管理員,且需要下列許可。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": "lakeformation:RegisterResource",
"Resource": "*"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"s3:PutEncryptionConfiguration",
"iam:PassRole",
"glue:CreateCatalog",
"glue:GetCatalog",
"s3:PutBucketTagging",
"s3:PutLifecycleConfiguration",
"s3:PutBucketPolicy",
"s3:CreateBucket",
"redshift-serverless:CreateNamespace",
"s3:DeleteBucket",
"s3:PutBucketVersioning",
"redshift-serverless:CreateWorkgroup"
],
"Resource": [
"arn:aws:glue:*:account-id
:catalog",
"arn:aws:glue:*:account-id
:catalog/*",
"arn:aws:s3:::*",
"arn:aws:redshift-serverless:*:account-id
:workgroup/*",
"arn:aws:redshift-serverless:*:account-id
:namespace/*",
"arn:aws:iam::account-id
:role/GlueDataCatalogDataTransferRole"
]
}
]
}
目標建立角色必須具有下列信任關係。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "glue.amazonaws.com"
},
"Action": "sts:AssumeRole"
},
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::account-id
:user/Username"
},
"Action": "sts:AssumeRole"
}
]
}
MySQL 目錄操作需要 Glue 資料傳輸角色,且必須具有下列許可。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DataTransferRolePolicy",
"Effect": "Allow",
"Action": [
"kms:GenerateDataKey",
"kms:Decrypt",
"glue:GetCatalog",
"glue:GetDatabase"
],
"Resource": [
"*"
]
}
]
}
Glue 資料傳輸角色必須具有下列信任關係。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": [
"glue.amazonaws.com",
"redshift.amazonaws.com"
]
},
"Action": "sts:AssumeRole"
}
]
}
後續步驟
使用來源 Aurora 資料庫叢集和 Amazon Redshift Amazon SageMaker 目標資料倉儲或 Lakehouse,您可以建立零 ETL 整合並複寫資料。如需說明,請參閱建立與 Amazon Redshift 的 Aurora 零 ETL 整合。