새로운 Amazon MWAA 환경으로 마이그레이션 - Amazon Managed Workflows for Apache Airflow

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

새로운 Amazon MWAA 환경으로 마이그레이션

기존 Apache Airflow 워크로드를 새로운 Amazon MWAA 환경으로 마이그레이션하려면 다음 단계를 알아보세요. 이러한 단계를 사용하여 이전 버전의 Amazon MWAA에서 새 버전 릴리스로 마이그레이션하거나 자체 관리형 Apache Airflow 배포를 Amazon MWAA로 마이그레이션할 수 있습니다. 이 튜토리얼에서는 기존 Apache Airflow v1.10.12에서 Apache Airflow v2.5.1을 실행하는 새 Amazon MWAA로 마이그레이션한다고 가정하지만, 동일한 절차를 사용하여 다른 Apache Airflow 버전에서 또는 다른 Apache Airflow 버전으로 마이그레이션할 수 있습니다.

사전 조건

단계를 완료하고 환경을 마이그레이션하려면 다음이 필요합니다.

1단계: 지원되는 최신 Apache Airflow 버전을 실행하는 새 Amazon MWAA 환경 생성

Amazon MWAA 사용 설명서의 Amazon MWAA 시작하기의 세부 단계를 사용하거나 AWS CloudFormation 템플릿을 사용하여 환경을 생성할 수 있습니다. 기존 Amazon MWAA 환경에서 마이그레이션하고 AWS CloudFormation 템플릿을 사용하여 이전 환경을 생성하는 경우 AirflowVersion 속성을 변경하여 새 버전을 지정할 수 있습니다.

MwaaEnvironment: Type: AWS::MWAA::Environment DependsOn: MwaaExecutionPolicy Properties: Name: !Sub "${AWS::StackName}-MwaaEnvironment" SourceBucketArn: !GetAtt EnvironmentBucket.Arn ExecutionRoleArn: !GetAtt MwaaExecutionRole.Arn AirflowVersion: 2.5.1 DagS3Path: dags NetworkConfiguration: SecurityGroupIds: - !GetAtt SecurityGroup.GroupId SubnetIds: - !Ref PrivateSubnet1 - !Ref PrivateSubnet2 WebserverAccessMode: PUBLIC_ONLY MaxWorkers: !Ref MaxWorkerNodes LoggingConfiguration: DagProcessingLogs: LogLevel: !Ref DagProcessingLogs Enabled: true SchedulerLogs: LogLevel: !Ref SchedulerLogsLevel Enabled: true TaskLogs: LogLevel: !Ref TaskLogsLevel Enabled: true WorkerLogs: LogLevel: !Ref WorkerLogsLevel Enabled: true WebserverLogs: LogLevel: !Ref WebserverLogsLevel Enabled: true

또는, 기존 Amazon MWAA 환경에서 마이그레이션하는 경우 AWS SDK for Python(Boto3)을 사용하는 다음 Python 스크립트를 복사하여 환경을 복제할 수 있습니다. 스크립트를 다운로드할 수 있습니다.

# This Python file uses the following encoding: utf-8 ''' Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0 Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ''' from __future__ import print_function import argparse import json import socket import time import re import sys from datetime import timedelta from datetime import datetime import boto3 from botocore.exceptions import ClientError, ProfileNotFound from boto3.session import Session ENV_NAME = "" REGION = "" def verify_boto3(boto3_current_version): ''' check if boto3 version is valid, must be 1.17.80 and up return true if all dependenceis are valid, false otherwise ''' valid_starting_version = '1.17.80' if boto3_current_version == valid_starting_version: return True ver1 = boto3_current_version.split('.') ver2 = valid_starting_version.split('.') for i in range(max(len(ver1), len(ver2))): num1 = int(ver1[i]) if i < len(ver1) else 0 num2 = int(ver2[i]) if i < len(ver2) else 0 if num1 > num2: return True elif num1 < num2: return False return False def get_account_id(env_info): ''' Given the environment metadata, fetch the account id from the environment ARN ''' return env_info['Arn'].split(":")[4] def validate_envname(env_name): ''' verify environment name doesn't have path to files or unexpected input ''' if re.match(r"^[a-zA-Z][0-9a-zA-Z-_]*$", env_name): return env_name raise argparse.ArgumentTypeError("%s is an invalid environment name value" % env_name) def validation_region(input_region): ''' verify environment name doesn't have path to files or unexpected input REGION: example is us-east-1 ''' session = Session() mwaa_regions = session.get_available_regions('mwaa') if input_region in mwaa_regions: return input_region raise argparse.ArgumentTypeError("%s is an invalid REGION value" % input_region) def validation_profile(profile_name): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r"^[a-zA-Z0-9]*$", profile_name): return profile_name raise argparse.ArgumentTypeError("%s is an invalid profile name value" % profile_name) def validation_version(version_name): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r"[1-2].\d.\d", version_name): return version_name raise argparse.ArgumentTypeError("%s is an invalid version name value" % version_name) def validation_execution_role(execution_role_arn): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r'(?i)\b((?:[a-z][\w-]+:(?:/{1,3}|[a-z0-9%])|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:\'".,<>?«»“”‘’]))', execution_role_arn): return execution_role_arn raise argparse.ArgumentTypeError("%s is an invalid execution role ARN" % execution_role_arn) def create_new_env(env): ''' method to duplicate env ''' mwaa = boto3.client('mwaa', region_name=REGION) print('Source Environment') print(env) if (env['AirflowVersion']=="1.10.12") and (VERSION=="2.2.2"): if env['AirflowConfigurationOptions']['secrets.backend']=='airflow.contrib.secrets.aws_secrets_manager.SecretsManagerBackend': print('swapping',env['AirflowConfigurationOptions']['secrets.backend']) env['AirflowConfigurationOptions']['secrets.backend']='airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend' env['LoggingConfiguration']['DagProcessingLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['SchedulerLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['TaskLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['WebserverLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['WorkerLogs'].pop('CloudWatchLogGroupArn') env['AirflowVersion']=VERSION env['ExecutionRoleArn']=EXECUTION_ROLE_ARN env['Name']=ENV_NAME_NEW env.pop('Arn') env.pop('CreatedAt') env.pop('LastUpdate') env.pop('ServiceRoleArn') env.pop('Status') env.pop('WebserverUrl') if not env['Tags']: env.pop('Tags') print('Destination Environment') print(env) return mwaa.create_environment(**env) def get_mwaa_env(input_env_name): # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mwaa.html#MWAA.Client.get_environment mwaa = boto3.client('mwaa', region_name=REGION) environment = mwaa.get_environment( Name=input_env_name )['Environment'] return environment def print_err_msg(c_err): '''short method to handle printing an error message if there is one''' print('Error Message: {}'.format(c_err.response['Error']['Message'])) print('Request ID: {}'.format(c_err.response['ResponseMetadata']['RequestId'])) print('Http code: {}'.format(c_err.response['ResponseMetadata']['HTTPStatusCode'])) # # Main # # Usage: # python3 clone_environment.py --envname MySourceEnv --envnamenew MyDestEnv --region us-west-2 --execution_role AmazonMWAA-MyDestEnv-ExecutionRole --version 2.2.2 # # based on https://github.com/awslabs/aws-support-tools/blob/master/MWAA/verify_env/verify_env.py # if __name__ == '__main__': if sys.version_info[0] < 3: print("python2 detected, please use python3. Will try to run anyway") if not verify_boto3(boto3.__version__): print("boto3 version ", boto3.__version__, "is not valid for this script. Need 1.17.80 or higher") print("please run pip install boto3 --upgrade --user") sys.exit(1) parser = argparse.ArgumentParser() parser.add_argument('--envname', type=validate_envname, required=True, help="name of the source MWAA environment") parser.add_argument('--region', type=validation_region, default=boto3.session.Session().region_name, required=False, help="region, Ex: us-east-1") parser.add_argument('--profile', type=validation_profile, default=None, required=False, help="AWS CLI profile, Ex: dev") parser.add_argument('--version', type=validation_version, default="2.2.2", required=False, help="Airflow destination version, Ex: 2.2.2") parser.add_argument('--execution_role', type=validation_execution_role, default=None, required=True, help="New environment execution role ARN, Ex: arn:aws:iam::112233445566:role/service-role/AmazonMWAA-MyEnvironment-ExecutionRole") parser.add_argument('--envnamenew', type=validate_envname, required=True, help="name of the destination MWAA environment") args, _ = parser.parse_known_args() ENV_NAME = args.envname REGION = args.region PROFILE = args.profile VERSION = args.version EXECUTION_ROLE_ARN = args.execution_role ENV_NAME_NEW = args.envnamenew try: print("PROFILE",PROFILE) if PROFILE: boto3.setup_default_session(profile_name=PROFILE) env = get_mwaa_env(ENV_NAME) response = create_new_env(env) print(response) except ClientError as client_error: if client_error.response['Error']['Code'] == 'LimitExceededException': print_err_msg(client_error) print('please retry the script') elif client_error.response['Error']['Code'] in ['AccessDeniedException', 'NotAuthorized']: print_err_msg(client_error) print('please verify permissions used have permissions documented in readme') elif client_error.response['Error']['Code'] == 'InternalFailure': print_err_msg(client_error) print('please retry the script') else: print_err_msg(client_error) except ProfileNotFound as profile_not_found: print('profile', PROFILE, 'does not exist; check the profile name') except IndexError as error: print("Error:", error)

2단계: 워크플로 리소스 마이그레이션

Apache Airflow v2는 메이저 버전 릴리스입니다. Apache Airflow v1에서 마이그레이션하는 경우 워크플로 리소스를 준비하고 DAG, 요구 사항 및 플러그인에 대한 변경 사항을 확인해야 합니다. 이렇게 하려면 Docker와 Amazon MWAA 로컬 러너를 사용하여 로컬 운영 체제에서 브리지 버전의 Apache Airflow를 구성하는 것이 좋습니다. Amazon MWAA 로컬 러너는 Amazon MWAA 환경을 로컬로 복제하는 명령줄 인터페이스(CLI) 유틸리티를 제공합니다.

Apache Airflow 버전을 변경할 때마다 requirements.txt에서 올바른 --constraint URL을 참조하는지 확인합니다.

워크플로 리소스를 마이그레이션하려면
  1. aws-mwaa-local-runner 리포지토리의 포크를 생성하고 Amazon MWAA 로컬 러너의 사본을 복제합니다.

  2. aws-mwaa-local-runner 리포지토리의 v1.10.15 브랜치를 확인합니다. Apache Airflow는 Apache Airflow v2로 마이그레이션하는 데 도움이 되는 브리지 릴리스로 v1.10.15를 릴리스했으며, Amazon MWAA가 v1.10.15를 지원하지 않지만 Amazon MWAA 로컬 러너를 사용하여 리소스를 테스트할 수 있습니다.

  3. Amazon MWAA 로컬 러너 CLI 도구를 사용하여 도커 이미지를 빌드하고 로컬에서 Apache Airflow를 실행할 수 있습니다. 자세한 내용은 GitHub 리포지토리에서 로컬 러너 README를 참조하십시오.

  4. Apache Airflow를 사용해 로컬에서 실행하려면 Apache Airflow 설명서 웹 사이트의 1.10에서 2로 업그레이드에 설명된 단계를 따릅니다.

    1. requirements.txt을(를) 업데이트하려면 Amazon MWAA 사용 설명서Python 종속성 관리에서 권장하는 모범 사례를 따르십시오.

    2. 사용자 지정 운영자 및 센서를 기존 Apache Airflow v1.10.12 환경용 플러그인과 함께 번들링한 경우 이를 DAG 폴더로 옮깁니다. Apache Airflow v2+의 모듈 관리 모범 사례에 대한 자세한 내용은 Apache Airflow 설명서 웹 사이트의 모듈 관리를 참조하세요.

  5. 워크플로 리소스에 대한 필수 변경 사항을 실시한 후에는 aws-mwaa-local-runner 리포지토리의 v2.5.1 브랜치를 체크아웃하고 업데이트된 워크플로우 DAG, 요구 사항 및 사용자 지정 플러그인을 로컬에서 테스트합니다. 다른 Apache Airflow 버전으로 마이그레이션하는 경우 해당 버전에 적합한 로컬 러너 브랜치를 대신 사용할 수 있습니다.

  6. 워크플로 리소스를 성공적으로 테스트한 후 DAG, requirements.txt 및 플러그인을 새 Amazon MWAA 환경으로 구성한 Amazon S3 버킷에 복사합니다.

3단계: 기존 환경에서 메타데이터 내보내기

업데이트된 DAG 파일을 환경의 Amazon S3 버킷에 복사하고 스케줄러가 이를 구문 분석하면 dag, dag_tagdag_code와 같은 Apache Airflow 메타데이터 테이블이 자동으로 채워집니다. 권한 관련 테이블도 사용자의 IAM 실행 역할 권한에 따라 자동으로 채워집니다. 이를 마이그레이션할 필요는 없습니다.

DAG 기록, variable, slot_pool, sla_miss와 관련된 데이터 및 필요한 경우 xcom, joblog 테이블과 관련된 데이터를 마이그레이션할 수 있습니다. 작업 인스턴스 로그는 airflow-{environment_name} 로그 그룹 아래의 CloudWatch Logs에 저장됩니다. 이전 실행의 작업 인스턴스 로그를 보려면 해당 로그를 새 환경 로그 그룹에 복사해야 합니다. 관련 비용을 줄이기 위해 며칠 분량의 로그만 옮기는 것이 좋습니다.

기존 Amazon MWAA 환경에서 마이그레이션하는 경우 메타데이터 데이터베이스에 직접 액세스할 수 없습니다. DAG를 실행하여 기존 Amazon MWAA 환경에서 선택한 Amazon S3 버킷으로 메타데이터를 내보내야 합니다. 자체 관리형 환경에서 마이그레이션하는 경우 다음 단계를 사용하여 Apache Airflow 메타데이터를 내보낼 수도 있습니다.

데이터를 내보낸 후에는 새 환경에서 DAG를 실행하여 데이터를 가져올 수 있습니다. 내보내기 및 가져오기 프로세스 중에 다른 모든 DAG는 일시 중지됩니다.

기존 환경에서 메타데이터를 내보내려면
  1. 를 사용하여 Amazon S3 버킷을 생성 AWS CLI 하여 내보낸 데이터를 저장합니다. UUIDregion를 사용자의 정보로 교체합니다.

    aws s3api create-bucket \ --bucket mwaa-migration-{UUID}\ --region {region}
    참고

    변수에 저장하는 연결과 같이 민감한 데이터를 마이그레이션하는 경우 Amazon S3 버킷의 기본 암호화를 활성화하는 것이 좋습니다.

  2. 참고

    자체 관리형 환경에서 마이그레이션하는 경우에는 적용되지 않습니다.

    기존 환경의 실행 역할을 수정하고 다음 정책을 추가하여 1단계에서 생성한 버킷에 쓰기 액세스 권한을 부여합니다.

    JSON
    { "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject*" ], "Resource": [ "arn:aws:s3:::mwaa-migration-{UUID}/*" ] } ] }
  3. amazon-mwaa-examples 리포지토리를 복제하고 마이그레이션 시나리오의 metadata-migration 하위 디렉터리로 이동합니다.

    git clone https://github.com/aws-samples/amazon-mwaa-examples.git cd amazon-mwaa-examples/usecases/metadata-migration/existing-version-new-version/
  4. export_data.py에서 내보낸 메타데이터를 저장하기 위해 S3_BUCKET에 대한 문자열 값을 생성한 Amazon S3 버킷으로 교체합니다.

    S3_BUCKET = 'mwaa-migration-{UUID}'
  5. metadata-migration 디렉터리에서 requirements.txt 파일을 찾습니다. 기존 환경에 대한 요구 사항 파일이 이미 있는 경우 requirements.txt에 지정된 추가 요구 사항을 사용자 파일에 추가합니다. 기존 요구 사항 파일이 없는 경우 metadata-migration 디렉터리에 제공된 파일을 사용하면 됩니다.

  6. 기존 환경에 연결된 Amazon S3 버킷의 DAG 디렉터리에 export_data.py을 복사합니다. 자체 관리형 환경에서 마이그레이션하는 경우 사용자 /dags 폴더에 export_data.py을 복사합니다.

  7. 업데이트된 requirements.txt을 기존 환경과 연결된 Amazon S3 버킷으로 복사한 다음 환경을 편집하여 새 requirements.txt 버전을 지정합니다.

  8. 환경이 업데이트된 후 Apache Airflow UI에 액세스하여 db_export DAG의 일시 중지를 해제하고 워크플로가 실행되도록 트리거합니다.

  9. 메타데이터가 mwaa-migration-{UUID} Amazon S3 버킷의 data/migration/existing-version_to_new-version/export/로 내보내졌고 각 테이블이 전용 파일에 포함되어 있는지 확인합니다.

4단계: 새 환경으로 메타데이터 가져오기

새 환경으로 메타데이터를 가져오려면
  1. import_data.py에서 다음에 대한 문자열 값을 사용자의 정보로 교체합니다.

    • 기존 Amazon MWAA 환경에서 마이그레이션하는 경우:

      S3_BUCKET = 'mwaa-migration-{UUID}' OLD_ENV_NAME='{old_environment_name}' NEW_ENV_NAME='{new_environment_name}' TI_LOG_MAX_DAYS = {number_of_days}

      MAX_DAYS는 워크플로가 새 환경에 복사하는 로그 파일의 일수를 제어합니다.

    • 자체 관리형 환경에서 마이그레이션하는 경우

      S3_BUCKET = 'mwaa-migration-{UUID}' NEW_ENV_NAME='{new_environment_name}'
  2. (선택 사항) import_data.py은 실패한 작업 로그만 복사합니다. 모든 작업 로그를 복사하려면 getDagTasks 함수를 수정하고 다음 코드 조각에 표시된 대로 ti.state = 'failed'를 삭제합니다.

    def getDagTasks(): session = settings.Session() dagTasks = session.execute(f"select distinct ti.dag_id, ti.task_id, date(r.execution_date) as ed \ from task_instance ti, dag_run r where r.execution_date > current_date - {TI_LOG_MAX_DAYS} and \ ti.dag_id=r.dag_id and ti.run_id = r.run_id order by ti.dag_id, date(r.execution_date);").fetchall() return dagTasks
  3. 새 환경의 실행 역할을 수정하고 다음 정책을 추가합니다. 권한 정책을 통해 Amazon MWAA는 Apache Airflow 메타데이터를 내보낸 Amazon S3 버킷에서 읽고 기존 로그 그룹에서 작업 인스턴스 로그를 복사할 수 있습니다. 모든 자리 표시자를 사용자 정보로 교체합니다.

    참고

    자체 관리형 환경에서 마이그레이션하는 경우 정책에서 CloudWatch Logs 관련 권한을 삭제해야 합니다.

    JSON
    { "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:GetLogEvents", "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:111122223333:log-group:airflow-{old_environment_name}*" ] }, { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::mwaa-migration-{UUID}", "arn:aws:s3:::mwaa-migration-{UUID}/*" ] } ] }
  4. 새 환경과 연결된 Amazon S3 버킷의 DAG 디렉터리에 import_data.py을 복사한 다음 Apache Airflow UI에 액세스하여 db_import DAG를 일시 중지 해제하고 워크플로를 트리거합니다. 몇 분 후에 Apache Airflow UI에 새 DAG가 나타납니다.

  5. DAG 실행이 완료되면 각 개별 DAG에 액세스하여 DAG 실행 기록이 복사되었는지 확인합니다.

다음 단계