Amazon RDS ゼロ ETL 統合の開始方法 - Amazon Relational Database Service

Amazon RDS ゼロ ETL 統合の開始方法

ゼロ ETL 統合を作成する前に、必要なパラメータとアクセス許可で RDS データベースとデータウェアハウスを設定します。セットアップ時には、以下の手順を完了します。

これらのタスクが完了したら、「Amazon Redshift との Amazon RDS ゼロ ETL 統合の作成」または「Amazon SageMaker Lakehouse との Amazon RDS ゼロ ETL 統合の作成」に進みます。

ヒント

RDS では、統合の作成中にこれらのセットアップ手順を手動で行わずに、自動的に完了させることができます。統合の作成をすぐに開始するには、「Amazon Redshift との Amazon RDS ゼロ ETL 統合の作成」を参照してください。

ステップ 3 では、必要に応じて、ターゲットデータウェアハウス (ステップ 3a) またはターゲットレイクハウス (ステップ 3b) のいずれかを作成できます。

  • SQL ベースの分析で従来のデータウェアハウス機能が必要な場合は、データウェアハウスを選択します。

  • 機械学習機能が必要であるとともに、データサイエンスと ML ワークフローに Lakehouse 機能を使用する場合は、Amazon SageMaker AI Lakehouse を選択します。

ステップ 1: カスタム DB のパラメータグループを作成する

Amazon RDS ゼロ ETL 統合では、データのレプリケーションを制御する特定の値が DB パラメータに必要です。特定のパラメータは、ソース DB エンジンによって異なります。これらのパラメータを設定するには、まずカスタム DB パラメータグループを作成して、これをソースデータベースに関連付ける必要があります。ソース DB エンジンに応じて、以下のパラメータ値を設定します。カスタムパラメータグループを作成するには、「Amazon RDS DB インスタンスの DB パラメータグループ」を参照してください。依存関係の問題を避けるため、すべてのパラメータ値を同じリクエスト内で設定することをお勧めします。

RDS for MySQL:

  • binlog_format=ROW

  • binlog_row_image=full

また、binlog_row_value_options パラメータが PARTIAL_JSON設定されていないことも確認してください。ソースデータベースがマルチ AZ DB クラスターの場合は、binlog_transaction_compression パラメータが ON に設定されていないことを確認します。

これらのパラメータの一部 (binlog_format など) は動的です。つまり、再起動をトリガーせずにパラメータに変更を適用できます。これは、一部の既存のセッションがパラメータの古い値を使用し続ける可能性があることを意味します。これに伴ってゼロ ETL 統合の作成時に問題が起きないようにするには、パフォーマンススキーマを有効にします。パフォーマンススキーマにより、ゼロ ETL 事前チェックが実行されるため、プロセスの早い段階で欠落しているパラメータを検出できます。

RDS for PostgreSQL:

  • rds.logical_replication = 1

  • rds.replica_identity_full = 1

  • session_replication_role = origin

  • wal_sender_timeout ≥ 20000 or = 0

  • max_wal_senders ≥ 20

  • max_replication_slots ≥ 20

複数の PostgreSQL 統合の場合、統合ごとに 1 つの論理レプリケーションスロットが使用されます。使用状況に基づいて max_replication_slots パラメータと max_wal_senders パラメータを確認します。

ゼロ ETL 統合で効率的なデータ同期を行うには、ソース DB インスタンスで rds.replica_identity_full を設定します。これにより、UPDATE および DELETE オペレーション中に、先行書き込みログ (WAL) にプライマリキー情報だけでなく、行全体のデータを記録するようデータベースに指示します。ゼロ ETL では、レプリケートされたすべてのテーブルに必要なプライマリキーがある場合でも、行全体のデータが必要です。クエリ時にどのデータを表示するかを決定するために、Amazon Redshift は特殊なアンチ結合戦略を使用して、データを内部の削除追跡テーブルと比較します。Amazon Redshift は、行全体のイメージをログに記録することで、これらのアンチ結合を効率的に実行できます。行全体のデータがない場合、Amazon Redshift は追加のルックアップを実行する必要があるため、Amazon Redshift が使用する列指向エンジンの高スループットオペレーション中にパフォーマンスが低下する可能性があります。

重要

行全体をログに記録するようにレプリカ ID を設定すると、WAL ボリュームが増加します。これに伴って、特に幅の広いテーブルや頻繁な更新において、書き込み増幅と I/O 使用量が高くなる可能性があります。このような影響に備えるには、ストレージ容量と I/O 要件を計画し、WAL の増加をモニタリングして、書き込みが多いワークロードのレプリケーション遅延を追跡します。

RDS for Oracle:

RDS for Oracle ではパラメータの変更は不要です。

ステップ 2: ソースデータベースを選択または作成する

カスタム DB のパラメータグループを作成したら、RDS DB インスタンスを選択または作成します。このデータベースは、ターゲットデータウェアハウスへのデータレプリケーションのソースになります。シングル AZ DB インスタンスまたはマルチ AZ DB インスタンスの作成手順については、Amazon RDS DB インスタンスの作成を参照してください。マルチ AZ DB クラスター (RDS for MySQL のみ) を作成する手順については、「Amazon RDS 用のマルチ AZ DB クラスターの作成」を参照してください

データベースは、サポートされている DB エンジンバージョンを実行している必要があります。サポートされているバージョンのリストについては、「Amazon RDS ゼロ ETL 統合でサポートされているリージョンと DB エンジン」を参照してください。

データベースを作成するには、[追加設定] で、デフォルトの DB パラメータグループを、前のステップで作成したカスタムパラメータグループに変更します。

注記

データベースの作成にパラメータグループをデータベースに関連付ける場合は、ゼロ ETL 統合を作成する前にデータベースを再起動して変更を適用する必要があります。手順については、「 DB インスタンスの再起動」または「Amazon RDS のマルチ AZ DB クラスターとリーダー DB インスタンスの再起動」を参照してください。

さらに、データベースで自動バックアップが有効になっていることを確認してください。詳細については、「自動バックアップの有効化」を参照してください。

ステップ 3a: ターゲットデータウェアハウスを作成する

ソースデータベースを作成した後、ターゲットのデータウェアハウスを作成して設定する必要があります。データウェアハウスは、以下の要件を満たしている必要があります。

  • 少なくとも 2 つのノードがある RA3 ノードタイプまたは Redshift Serverless を使用している。

  • 暗号化されている (プロビジョニングされたクラスターを使用している場合) 詳細については、「Amazon Redshift データベースの暗号化」を参照してください。

データウェアハウスを作成する手順については、プロビジョニングされたクラスター用の「クラスターの作成」またはRedshift Serverless 用の「名前空間を使用したワークグループの作成」を参照してください。

データウェアハウスで大文字と小文字の区別を有効にします。

統合を正常に行うには、データウェアハウスで大文字と小文字を区別するパラメータ (enable_case_sensitive_identifier) を有効にする必要があります。デフォルトでは、プロビジョニング済みクラスターと Redshift Serverless ワークグループの大文字と小文字の区別は無効になっています。

大文字と小文字の区別を有効にするには、データウェアハウスのタイプに応じて以下の手順を実行します。

  • プロビジョニングされたクラスター — プロビジョニングされたクラスターで大文字と小文字の区別を有効にするには、enable_case_sensitive_identifier パラメータを有効にしたカスタムパラメータグループを作成します。次に、そのパラメータグループとクラスターを関連付けます。手順については、「コンソールを使用したパラメータグループの管理」または「AWS CLI を使用したパラメータ値の設定」を参照してください。

    注記

    クラスターにパラメータグループを関連付けたら、クラスターを再起動することを忘れないでください。

  • Serverless ワークグループ — Redshift Serverless ワークグループで大文字と小文字の区別を有効にするには、AWS CLI を使用する必要があります。Amazon Redshift コンソールは現在、Redshift Serverless パラメータ値の変更をサポートしていません。次の update-workgroup リクエストを送信します。

    aws redshift-serverless update-workgroup \ --workgroup-name target-workgroup \ --config-parameters parameterKey=enable_case_sensitive_identifier,parameterValue=true

    パラメータ値を変更した後、ワークグループを再起動する必要はありません。

データウェアハウスの認可を設定します。

データウェアハウスを作成したら、ソース RDS データベースを承認済みの統合ソースとして設定する必要があります。手順については、「Amazon Redshift データウェアハウスの認可を設定する」を参照してください。

AWS を使用して統合をセットアップする

各リソースを手動でセットアップするのではなく、次の Python スクリプトを実行して、必要なリソースを自動的にセットアップできます。このコード例では AWS SDK for Python (Boto3) を使用してソース RDS for MySQL DB インスタンスとターゲットデータウェアハウスを作成し、それぞれに必要なパラメータ値を指定します。次に、データベースが使用可能になるまで待ってから、データベース間にゼロ ETL 統合を作成します。設定する必要があるリソースに応じて、さまざまな関数をコメントアウトできます。

必要な従属関係をインストールには、次のコマンドを実行します。

pip install boto3 pip install time

スクリプト内で、オプションでソース、ターゲット、パラメータグループの名前を変更します。最後の関数は、リソースのセットアップ後に my-integration という名前の統合を作成します。

import boto3 import time # Build the client using the default credential configuration. # You can use the CLI and run 'aws configure' to set access key, secret # key, and default Region. rds = boto3.client('rds') redshift = boto3.client('redshift') sts = boto3.client('sts') source_db_name = 'my-source-db' # A name for the source database source_param_group_name = 'my-source-param-group' # A name for the source parameter group target_cluster_name = 'my-target-cluster' # A name for the target cluster target_param_group_name = 'my-target-param-group' # A name for the target parameter group def create_source_db(*args): """Creates a source RDS for MySQL DB instance""" response = rds.create_db_parameter_group( DBParameterGroupName=source_param_group_name, DBParameterGroupFamily='mysql8.0', Description='RDS for MySQL zero-ETL integrations' ) print('Created source parameter group: ' + response['DBParameterGroup']['DBParameterGroupName']) response = rds.modify_db_parameter_group( DBParameterGroupName=source_param_group_name, Parameters=[ { 'ParameterName': 'binlog_format', 'ParameterValue': 'ROW', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_row_image', 'ParameterValue': 'full', 'ApplyMethod': 'pending-reboot' } ] ) print('Modified source parameter group: ' + response['DBParameterGroupName']) response = rds.create_db_instance( DBInstanceIdentifier=source_db_name, DBParameterGroupName=source_param_group_name, Engine='mysql', EngineVersion='8.0.32', DBName='mydb', DBInstanceClass='db.m5.large', AllocatedStorage=15, MasterUsername='username', MasterUserPassword='Password01**' ) print('Creating source database: ' + response['DBInstance']['DBInstanceIdentifier']) source_arn = (response['DBInstance']['DBInstanceArn']) create_target_cluster(target_cluster_name, source_arn, target_param_group_name) return(response) def create_target_cluster(target_cluster_name, source_arn, target_param_group_name): """Creates a target Redshift cluster""" response = redshift.create_cluster_parameter_group( ParameterGroupName=target_param_group_name, ParameterGroupFamily='redshift-1.0', Description='RDS for MySQL zero-ETL integrations' ) print('Created target parameter group: ' + response['ClusterParameterGroup']['ParameterGroupName']) response = redshift.modify_cluster_parameter_group( ParameterGroupName=target_param_group_name, Parameters=[ { 'ParameterName': 'enable_case_sensitive_identifier', 'ParameterValue': 'true' } ] ) print('Modified target parameter group: ' + response['ParameterGroupName']) response = redshift.create_cluster( ClusterIdentifier=target_cluster_name, NodeType='ra3.4xlarge', NumberOfNodes=2, Encrypted=True, MasterUsername='username', MasterUserPassword='Password01**', ClusterParameterGroupName=target_param_group_name ) print('Creating target cluster: ' + response['Cluster']['ClusterIdentifier']) # Retrieve the target cluster ARN response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Retrieve the current user's account ID response = sts.get_caller_identity() account_id = response['Account'] # Create a resource policy granting access to source database and account ID response = redshift.put_resource_policy( ResourceArn=target_arn, Policy=''' { \"Version\":\"2012-10-17\", \"Statement\":[ {\"Effect\":\"Allow\", \"Principal\":{ \"Service\":\"redshift.amazonaws.com\" }, \"Action\":[\"redshift:AuthorizeInboundIntegration\"], \"Condition\":{ \"StringEquals\":{ \"aws:SourceArn\":\"%s\"} } }, {\"Effect\":\"Allow\", \"Principal\":{ \"AWS\":\"arn:aws:iam::%s:root\"}, \"Action\":\"redshift:CreateInboundIntegration\"} ] } ''' % (source_arn, account_id) ) return(response) def wait_for_db_availability(*args): """Waits for both databases to be available""" print('Waiting for source and target to be available...') response = rds.describe_db_instances( DBInstanceIdentifier=source_db_name ) source_status = response['DBInstances'][0]['DBInstanceStatus'] source_arn = response['DBInstances'][0]['DBInstanceArn'] response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_status = response['Clusters'][0]['ClusterStatus'] target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Every 60 seconds, check whether the databases are available if source_status != 'available' or target_status != 'available': time.sleep(60) response = wait_for_db_availability( source_db_name, target_cluster_name) else: print('Databases available. Ready to create zero-ETL integration.') create_integration(source_arn, target_arn) return def create_integration(source_arn, target_arn): """Creates a zero-ETL integration using the source and target databases""" response = rds.create_integration( SourceArn=source_arn, TargetArn=target_arn, IntegrationName='my-integration' ) print('Creating integration: ' + response['IntegrationName']) def main(): """main function""" create_source_db(source_db_name, source_param_group_name) wait_for_db_availability(source_db_name, target_cluster_name) if __name__ == "__main__": main()

ステップ 3b: Amazon SageMaker AI ゼロ ETL 統合用の AWS Glue カタログを作成する

Amazon SageMaker AI Lakehouse とのゼロ ETL 統合を作成する場合は、AWS Lake Formation で AWS Glue マネージドカタログを作成する必要があります。ターゲットカタログは Amazon Redshift マネージドカタログである必要があります。Amazon Redshift マネージドカタログを作成するには、まず AWSServiceRoleForRedshift サービスにリンクされたロールを作成します。Lake Formation コンソールで、読み取り専用管理者として AWSServiceRoleForRedshift を追加します。

その前のタスクの詳細については、次のトピックを参照してください。

ターゲット AWS Glue カタログのアクセス許可を設定する

ゼロ ETL 統合のターゲットカタログを作成する前に、Lake Formation ターゲット作成ロールと AWS Glue データ転送ロールを作成する必要があります。Lake Formation ターゲット作成ロールを使用して、ターゲットカタログを作成します。ターゲットカタログを作成するときは、[エンジンからのアクセス] セクションの [IAM ロール] フィールドに Glue データ転送ロールを入力します。

ターゲット作成ロールは Lake Formation 管理者であり、次のアクセス許可が必要です。

JSON
{ "Version":"2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": "lakeformation:RegisterResource", "Resource": "*" }, { "Sid": "VisualEditor1", "Effect": "Allow", "Action": [ "s3:PutEncryptionConfiguration", "iam:PassRole", "glue:CreateCatalog", "glue:GetCatalog", "s3:PutBucketTagging", "s3:PutLifecycleConfiguration", "s3:PutBucketPolicy", "s3:CreateBucket", "redshift-serverless:CreateNamespace", "s3:DeleteBucket", "s3:PutBucketVersioning", "redshift-serverless:CreateWorkgroup" ], "Resource": [ "arn:aws:glue:*:111122223333:catalog", "arn:aws:glue:*:111122223333:catalog/*", "arn:aws:s3:::*", "arn:aws:redshift-serverless:*:111122223333:workgroup/*", "arn:aws:redshift-serverless:*:111122223333:namespace/*", "arn:aws:iam::111122223333:role/GlueDataCatalogDataTransferRole" ] } ] }

ターゲット作成ロールには、以下の信頼関係が必要です。

JSON
{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "glue.amazonaws.com" }, "Action": "sts:AssumeRole" }, { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::111122223333:user/Username" }, "Action": "sts:AssumeRole" } ] }

Glue データ転送ロールは MySQL カタログオペレーションに必要であり、次のアクセス許可が必要です。

JSON
{ "Version":"2012-10-17", "Statement": [ { "Sid": "DataTransferRolePolicy", "Effect": "Allow", "Action": [ "kms:GenerateDataKey", "kms:Decrypt", "glue:GetCatalog", "glue:GetDatabase" ], "Resource": [ "*" ] } ] }

Glue データ転送ロールには、以下の信頼関係が必要です。

JSON
{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "glue.amazonaws.com", "redshift.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }

次のステップ

ソースの RDS データベースと、Amazon Redshift ターゲットデータウェアハウスまたは Amazon SageMaker AI Lakehouse により、ゼロ ETL 統合を作成してデータをレプリケートできます。手順については、「Amazon Redshift との Amazon RDS ゼロ ETL 統合の作成」を参照してください。