Aurora ゼロ ETL 統合の開始方法
ゼロ ETL 統合を作成する前に、必要なパラメータとアクセス許可で Aurora DB クラスターとデータウェアハウスを設定します。セットアップ時には、以下の手順を完了します。
これらのタスクが完了したら、「Amazon Redshift との Amazon ゼロ ETL 統合の作成」または「Amazon SageMaker Lakehouse との Aurora ゼロ ETL 統合の作成」に進みます。
AWS SDK を使用して、セットアッププロセスを自動化できます。詳細については、「AWS を使用して統合をセットアップする」を参照してください。
ステップ 3 では、必要に応じて、ターゲットデータウェアハウス (ステップ 3a) またはターゲットレイクハウス (ステップ 3b) のいずれかを作成できます。
ステップ 1: カスタム DB クラスターのパラメータグループを作成する
Aurora ゼロ ETL 統合には、レプリケーションを制御する DB クラスターパラメータに特定の値が必要です。具体的には、Aurora MySQL には拡張バイナリログ (aurora_enhanced_binlog) が必要であり、Aurora PostgreSQL には拡張論理レプリケーション (aurora.enhanced_logical_replication) が必要です。
バイナリロギングまたは論理レプリケーションを設定するには、まずカスタム DB クラスターパラメータグループを作成し、それをソース DB クラスターに関連付ける必要があります。
Aurora MySQL (aurora-mysql8.0 ファミリー):
さらに、binlog_transaction_compressionパラメータが ON に設定されていないこと、および binlog_row_value_options パラメータが PARTIAL_JSON に設定されていないことを確認してください。
Aurora MySQL 拡張バイナリログの詳細については、「Aurora MySQL の拡張バイナリログの設定」を参照してください。
Aurora PostgreSQL (aurora-postgresql16 ファミリー):
-
rds.logical_replication=1
-
aurora.enhanced_logical_replication=1
-
aurora.logical_replication_backup=0
-
aurora.logical_replication_globaldb=0
拡張論理レプリケーション (aurora.enhanced_logical_replication) を有効にすると、REPLICA IDENTITY FULL が有効になっていない場合でも、すべての列値が常に書き込み先行ログ (WAL) に書き込まれます。これにより、ソース DB クラスターの IOPS が向上する場合があります。
aurora.enhanced_logical_replication DB クラスターパラメータを有効または無効にすると、プライマリ DB インスタンスはすべての論理レプリケーションスロットを無効にします。これにより、ソースからターゲットへのレプリケーションが停止されるため、プライマリ DB インスタンスでレプリケーションスロットを再作成する必要があります。中断を防ぐため、レプリケーション中はパラメータを一貫した状態に保ちます。
ステップ 2: ソース DB クラスターを選択または作成する
カスタム DB クラスターのパラメータグループを作成したら、Aurora DB クラスターを選択または作成します。このクラスターは、ターゲットデータウェアハウスへのデータレプリケーションのソースになります。プロビジョニングされた DB インスタンスまたは Aurora Serverless v2 DB インスタンスをソースとして使用する DB クラスターを指定できます。DB クラスターの作成手順については、「Amazon Aurora DB クラスターの作成」または「Aurora Serverless v2 を使用する DB クラスターの作成」を参照してください。。
データベースは、サポートされている DB エンジンバージョンを実行している必要があります。サポートされているバージョンのリストについては、「ゼロ ETL 統合でサポートされているリージョンと Aurora DB エンジン」を参照してください。
データベースを作成するには、[追加設定] で、デフォルトの DB クラスターパラメータグループを、前のステップで作成したカスタムパラメータグループに変更します。
ステップ 3a: ターゲットデータウェアハウスを作成する
ソースDB クラスターを作成した後、ターゲットのデータウェアハウスを作成して設定する必要があります。データウェアハウスは、以下の要件を満たしている必要があります。
データウェアハウスを作成する手順については、プロビジョニングされたクラスター用の「クラスターの作成」またはRedshift Serverless 用の「名前空間を使用したワークグループの作成」を参照してください。
データウェアハウスで大文字と小文字の区別を有効にします。
統合を正常に行うには、データウェアハウスで大文字と小文字を区別するパラメータ (enable_case_sensitive_identifier) を有効にする必要があります。デフォルトでは、プロビジョニング済みクラスターと Redshift Serverless ワークグループの大文字と小文字の区別は無効になっています。
大文字と小文字の区別を有効にするには、データウェアハウスのタイプに応じて以下の手順を実行します。
-
プロビジョニングされたクラスター — プロビジョニングされたクラスターで大文字と小文字の区別を有効にするには、enable_case_sensitive_identifier パラメータを有効にしたカスタムパラメータグループを作成します。次に、そのパラメータグループとクラスターを関連付けます。手順については、「コンソールを使用したパラメータグループの管理」または「AWS CLI を使用したパラメータ値の設定」を参照してください。
クラスターにパラメータグループを関連付けたら、クラスターを再起動することを忘れないでください。
-
Serverless ワークグループ — Redshift Serverless ワークグループで大文字と小文字の区別を有効にするには、AWS CLI を使用する必要があります。Amazon Redshift コンソールは現在、Redshift Serverless パラメータ値の変更をサポートしていません。次の update-workgroup リクエストを送信します。
aws redshift-serverless update-workgroup \
--workgroup-name target-workgroup \
--config-parameters parameterKey=enable_case_sensitive_identifier,parameterValue=true
パラメータ値を変更した後、ワークグループを再起動する必要はありません。
データウェアハウスの認可を設定します。
データウェアハウスを作成したら、ソース Aurora DB クラスターを承認済みの統合ソースとして設定する必要があります。手順については、「Amazon Redshift データウェアハウスの認可を設定する」を参照してください。
AWS を使用して統合をセットアップする
各リソースを手動でセットアップするのではなく、次の Python スクリプトを実行して、必要なリソースを自動的にセットアップできます。このコード例では AWS SDK for Python (Boto3) を使用してソース Amazon Aurora DB クラスターとターゲットデータウェアハウスを作成し、それぞれに必要なパラメータ値を指定します。次に、データベースが使用可能になるまで待ってから、データベース間にゼロ ETL 統合を作成します。設定する必要があるリソースに応じて、さまざまな関数をコメントアウトできます。
必要な従属関係をインストールには、次のコマンドを実行します。
pip install boto3
pip install time
スクリプト内で、オプションでソース、ターゲット、パラメータグループの名前を変更します。最後の関数は、リソースのセットアップ後に my-integration という名前の統合を作成します。
- Aurora MySQL
-
import boto3
import time
# Build the client using the default credential configuration.
# You can use the CLI and run 'aws configure' to set access key, secret
# key, and default Region.
rds = boto3.client('rds')
redshift = boto3.client('redshift')
sts = boto3.client('sts')
source_cluster_name = 'my-source-cluster' # A name for the source cluster
source_param_group_name = 'my-source-param-group' # A name for the source parameter group
target_cluster_name = 'my-target-cluster' # A name for the target cluster
target_param_group_name = 'my-target-param-group' # A name for the target parameter group
def create_source_cluster(*args):
"""Creates a source Aurora MySQL DB cluster"""
response = rds.create_db_cluster_parameter_group(
DBClusterParameterGroupName=source_param_group_name,
DBParameterGroupFamily='aurora-mysql8.0',
Description='For Aurora MySQL binary logging'
)
print('Created source parameter group: ' + response['DBClusterParameterGroup']['DBClusterParameterGroupName'])
response = rds.modify_db_cluster_parameter_group(
DBClusterParameterGroupName=source_param_group_name,
Parameters=[
{
'ParameterName': 'aurora_enhanced_binlog',
'ParameterValue': '1',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'binlog_backup',
'ParameterValue': '0',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'binlog_format',
'ParameterValue': 'ROW',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'binlog_replication_globaldb',
'ParameterValue': '0',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'binlog_row_image',
'ParameterValue': 'full',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'binlog_row_metadata',
'ParameterValue': 'full',
'ApplyMethod': 'pending-reboot'
}
]
)
print('Modified source parameter group: ' + response['DBClusterParameterGroupName'])
response = rds.create_db_cluster(
DBClusterIdentifier=source_cluster_name,
DBClusterParameterGroupName=source_param_group_name,
Engine='aurora-mysql',
EngineVersion='8.0.mysql_aurora.3.05.2',
DatabaseName='myauroradb',
MasterUsername='username',
MasterUserPassword='Password01**'
)
print('Creating source cluster: ' + response['DBCluster']['DBClusterIdentifier'])
source_arn = (response['DBCluster']['DBClusterArn'])
create_target_cluster(target_cluster_name, source_arn, target_param_group_name)
response = rds.create_db_instance(
DBInstanceClass='db.r6g.2xlarge',
DBClusterIdentifier=source_cluster_name,
DBInstanceIdentifier=source_cluster_name + '-instance',
Engine='aurora-mysql'
)
return(response)
def create_target_cluster(target_cluster_name, source_arn, target_param_group_name):
"""Creates a target Redshift cluster"""
response = redshift.create_cluster_parameter_group(
ParameterGroupName=target_param_group_name,
ParameterGroupFamily='redshift-1.0',
Description='For Aurora MySQL zero-ETL integrations'
)
print('Created target parameter group: ' + response['ClusterParameterGroup']['ParameterGroupName'])
response = redshift.modify_cluster_parameter_group(
ParameterGroupName=target_param_group_name,
Parameters=[
{
'ParameterName': 'enable_case_sensitive_identifier',
'ParameterValue': 'true'
}
]
)
print('Modified target parameter group: ' + response['ParameterGroupName'])
response = redshift.create_cluster(
ClusterIdentifier=target_cluster_name,
NodeType='ra3.4xlarge',
NumberOfNodes=2,
Encrypted=True,
MasterUsername='username',
MasterUserPassword='Password01**',
ClusterParameterGroupName=target_param_group_name
)
print('Creating target cluster: ' + response['Cluster']['ClusterIdentifier'])
# Retrieve the target cluster ARN
response = redshift.describe_clusters(
ClusterIdentifier=target_cluster_name
)
target_arn = response['Clusters'][0]['ClusterNamespaceArn']
# Retrieve the current user's account ID
response = sts.get_caller_identity()
account_id = response['Account']
# Create a resource policy specifying cluster ARN and account ID
response = redshift.put_resource_policy(
ResourceArn=target_arn,
Policy='''
{
\"Version\":\"2012-10-17\",
\"Statement\":[
{\"Effect\":\"Allow\",
\"Principal\":{
\"Service\":\"redshift.amazonaws.com\"
},
\"Action\":[\"redshift:AuthorizeInboundIntegration\"],
\"Condition\":{
\"StringEquals\":{
\"aws:SourceArn\":\"%s\"}
}
},
{\"Effect\":\"Allow\",
\"Principal\":{
\"AWS\":\"arn:aws:iam::%s:root\"},
\"Action\":\"redshift:CreateInboundIntegration\"}
]
}
''' % (source_arn, account_id)
)
return(response)
def wait_for_cluster_availability(*args):
"""Waits for both clusters to be available"""
print('Waiting for clusters to be available...')
response = rds.describe_db_clusters(
DBClusterIdentifier=source_cluster_name
)
source_status = response['DBClusters'][0]['Status']
source_arn = response['DBClusters'][0]['DBClusterArn']
response = rds.describe_db_instances(
DBInstanceIdentifier=source_cluster_name + '-instance'
)
source_instance_status = response['DBInstances'][0]['DBInstanceStatus']
response = redshift.describe_clusters(
ClusterIdentifier=target_cluster_name
)
target_status = response['Clusters'][0]['ClusterStatus']
target_arn = response['Clusters'][0]['ClusterNamespaceArn']
# Every 60 seconds, check whether the clusters are available.
if source_status != 'available' or target_status != 'available' or source_instance_status != 'available':
time.sleep(60)
response = wait_for_cluster_availability(
source_cluster_name, target_cluster_name)
else:
print('Clusters available. Ready to create zero-ETL integration.')
create_integration(source_arn, target_arn)
return
def create_integration(source_arn, target_arn):
"""Creates a zero-ETL integration using the source and target clusters"""
response = rds.create_integration(
SourceArn=source_arn,
TargetArn=target_arn,
IntegrationName='my-integration'
)
print('Creating integration: ' + response['IntegrationName'])
def main():
"""main function"""
create_source_cluster(source_cluster_name, source_param_group_name)
wait_for_cluster_availability(source_cluster_name, target_cluster_name)
if __name__ == "__main__":
main()
- Aurora PostgreSQL
-
import boto3
import time
# Build the client using the default credential configuration.
# You can use the CLI and run 'aws configure' to set access key, secret
# key, and default Region.
rds = boto3.client('rds')
redshift = boto3.client('redshift')
sts = boto3.client('sts')
source_cluster_name = 'my-source-cluster' # A name for the source cluster
source_param_group_name = 'my-source-param-group' # A name for the source parameter group
target_cluster_name = 'my-target-cluster' # A name for the target cluster
target_param_group_name = 'my-target-param-group' # A name for the target parameter group
def create_source_cluster(*args):
"""Creates a source Aurora PostgreSQL DB cluster"""
response = rds.create_db_cluster_parameter_group(
DBClusterParameterGroupName=source_param_group_name,
DBParameterGroupFamily='aurora-postgresql16',
Description='For Aurora PostgreSQL logical replication'
)
print('Created source parameter group: ' + response['DBClusterParameterGroup']['DBClusterParameterGroupName'])
response = rds.modify_db_cluster_parameter_group(
DBClusterParameterGroupName=source_param_group_name,
Parameters=[
{
'ParameterName': 'rds.logical_replication',
'ParameterValue': '1',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'aurora.enhanced_logical_replication',
'ParameterValue': '1',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'aurora.logical_replication_backup',
'ParameterValue': '0',
'ApplyMethod': 'pending-reboot'
},
{
'ParameterName': 'aurora.logical_replication_globaldb',
'ParameterValue': '0',
'ApplyMethod': 'pending-reboot'
}
]
)
print('Modified source parameter group: ' + response['DBClusterParameterGroupName'])
response = rds.create_db_cluster(
DBClusterIdentifier=source_cluster_name,
DBClusterParameterGroupName=source_param_group_name,
Engine='aurora-postgresql',
EngineVersion='16.4.aurora-postgresql',
DatabaseName='mypostgresdb',
MasterUsername='username',
MasterUserPassword='Password01**'
)
print('Creating source cluster: ' + response['DBCluster']['DBClusterIdentifier'])
source_arn = (response['DBCluster']['DBClusterArn'])
create_target_cluster(target_cluster_name, source_arn, target_param_group_name)
response = rds.create_db_instance(
DBInstanceClass='db.r6g.2xlarge',
DBClusterIdentifier=source_cluster_name,
DBInstanceIdentifier=source_cluster_name + '-instance',
Engine='aurora-postgresql'
)
return(response)
def create_target_cluster(target_cluster_name, source_arn, target_param_group_name):
"""Creates a target Redshift cluster"""
response = redshift.create_cluster_parameter_group(
ParameterGroupName=target_param_group_name,
ParameterGroupFamily='redshift-1.0',
Description='For Aurora PostgreSQL zero-ETL integrations'
)
print('Created target parameter group: ' + response['ClusterParameterGroup']['ParameterGroupName'])
response = redshift.modify_cluster_parameter_group(
ParameterGroupName=target_param_group_name,
Parameters=[
{
'ParameterName': 'enable_case_sensitive_identifier',
'ParameterValue': 'true'
}
]
)
print('Modified target parameter group: ' + response['ParameterGroupName'])
response = redshift.create_cluster(
ClusterIdentifier=target_cluster_name,
NodeType='ra3.4xlarge',
NumberOfNodes=2,
Encrypted=True,
MasterUsername='username',
MasterUserPassword='Password01**',
ClusterParameterGroupName=target_param_group_name
)
print('Creating target cluster: ' + response['Cluster']['ClusterIdentifier'])
# Retrieve the target cluster ARN
response = redshift.describe_clusters(
ClusterIdentifier=target_cluster_name
)
target_arn = response['Clusters'][0]['ClusterNamespaceArn']
# Retrieve the current user's account ID
response = sts.get_caller_identity()
account_id = response['Account']
# Create a resource policy specifying cluster ARN and account ID
response = redshift.put_resource_policy(
ResourceArn=target_arn,
Policy='''
{
\"Version\":\"2012-10-17\",
\"Statement\":[
{\"Effect\":\"Allow\",
\"Principal\":{
\"Service\":\"redshift.amazonaws.com\"
},
\"Action\":[\"redshift:AuthorizeInboundIntegration\"],
\"Condition\":{
\"StringEquals\":{
\"aws:SourceArn\":\"%s\"}
}
},
{\"Effect\":\"Allow\",
\"Principal\":{
\"AWS\":\"arn:aws:iam::%s:root\"},
\"Action\":\"redshift:CreateInboundIntegration\"}
]
}
''' % (source_arn, account_id)
)
return(response)
def wait_for_cluster_availability(*args):
"""Waits for both clusters to be available"""
print('Waiting for clusters to be available...')
response = rds.describe_db_clusters(
DBClusterIdentifier=source_cluster_name
)
source_status = response['DBClusters'][0]['Status']
source_arn = response['DBClusters'][0]['DBClusterArn']
response = rds.describe_db_instances(
DBInstanceIdentifier=source_cluster_name + '-instance'
)
source_instance_status = response['DBInstances'][0]['DBInstanceStatus']
response = redshift.describe_clusters(
ClusterIdentifier=target_cluster_name
)
target_status = response['Clusters'][0]['ClusterStatus']
target_arn = response['Clusters'][0]['ClusterNamespaceArn']
# Every 60 seconds, check whether the clusters are available.
if source_status != 'available' or target_status != 'available' or source_instance_status != 'available':
time.sleep(60)
response = wait_for_cluster_availability(
source_cluster_name, target_cluster_name)
else:
print('Clusters available. Ready to create zero-ETL integration.')
create_integration(source_arn, target_arn)
return
def create_integration(source_arn, target_arn):
"""Creates a zero-ETL integration using the source and target clusters"""
response = rds.create_integration(
SourceArn=source_arn,
TargetArn=target_arn,
IntegrationName='my-integration'
)
print('Creating integration: ' + response['IntegrationName'])
def main():
"""main function"""
create_source_cluster(source_cluster_name, source_param_group_name)
wait_for_cluster_availability(source_cluster_name, target_cluster_name)
if __name__ == "__main__":
main()
ステップ 3b: Amazon SageMaker AI ゼロ ETL 統合用の AWS Glue カタログを作成する
Amazon SageMaker AI Lakehouse とのゼロ ETL 統合を作成する場合は、AWS Lake Formation で AWS Glue マネージドカタログを作成する必要があります。ターゲットカタログは Amazon Redshift マネージドカタログである必要があります。Amazon Redshift マネージドカタログを作成するには、まず AWSServiceRoleForRedshift サービスにリンクされたロールを作成します。Lake Formation コンソールで、読み取り専用管理者として AWSServiceRoleForRedshift を追加します。
その前のタスクの詳細については、次のトピックを参照してください。
ターゲット AWS Glue カタログのアクセス許可を設定する
ゼロ ETL 統合のターゲットカタログを作成する前に、Lake Formation ターゲット作成ロールと AWS Glue データ転送ロールを作成する必要があります。Lake Formation ターゲット作成ロールを使用して、ターゲットカタログを作成します。ターゲットカタログを作成するときは、[エンジンからのアクセス] セクションの [IAM ロール] フィールドに Glue データ転送ロールを入力します。
ターゲット作成ロールは Lake Formation 管理者であり、次のアクセス許可が必要です。
- JSON
-
-
{
"Version":"2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": "lakeformation:RegisterResource",
"Resource": "*"
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"s3:PutEncryptionConfiguration",
"iam:PassRole",
"glue:CreateCatalog",
"glue:GetCatalog",
"s3:PutBucketTagging",
"s3:PutLifecycleConfiguration",
"s3:PutBucketPolicy",
"s3:CreateBucket",
"redshift-serverless:CreateNamespace",
"s3:DeleteBucket",
"s3:PutBucketVersioning",
"redshift-serverless:CreateWorkgroup"
],
"Resource": [
"arn:aws:glue:*:111122223333:catalog",
"arn:aws:glue:*:111122223333:catalog/*",
"arn:aws:s3:::*",
"arn:aws:redshift-serverless:*:111122223333:workgroup/*",
"arn:aws:redshift-serverless:*:111122223333:namespace/*",
"arn:aws:iam::111122223333:role/GlueDataCatalogDataTransferRole"
]
}
]
}
ターゲット作成ロールには、以下の信頼関係が必要です。
- JSON
-
-
{
"Version":"2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "glue.amazonaws.com"
},
"Action": "sts:AssumeRole"
},
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::111122223333:user/Username"
},
"Action": "sts:AssumeRole"
}
]
}
Glue データ転送ロールは MySQL カタログオペレーションに必要であり、次のアクセス許可が必要です。
- JSON
-
-
{
"Version":"2012-10-17",
"Statement": [
{
"Sid": "DataTransferRolePolicy",
"Effect": "Allow",
"Action": [
"kms:GenerateDataKey",
"kms:Decrypt",
"glue:GetCatalog",
"glue:GetDatabase"
],
"Resource": [
"*"
]
}
]
}
Glue データ転送ロールには、以下の信頼関係が必要です。
- JSON
-
-
{
"Version":"2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": [
"glue.amazonaws.com",
"redshift.amazonaws.com"
]
},
"Action": "sts:AssumeRole"
}
]
}
次のステップ
ソースの Aurora DB クラスターと、Amazon Redshift ターゲットデータウェアハウスまたは Amazon SageMaker AI Lakehouse により、ゼロ ETL 統合を作成してデータをレプリケートできます。手順については、「Amazon Redshift との Amazon ゼロ ETL 統合の作成」を参照してください。