Amazon MWAA 的快速入门教程 - Amazon Managed Workflows for Apache Airflow

Amazon MWAA 的快速入门教程

本快速入门教程使用 AWS CloudFormation 模板来同时创建 Amazon VPC 基础设施、带 dags 文件夹的 Amazon S3 存储桶和 Amazon MWAA 环境。

在本教程中:

根据本教程使用三个 AWS Command Line Interface (AWS CLI) 命令,将 DAG 上传到 Amazon S3、在 Apache Airflow 中运行 DAG 以及在 CloudWatch 中访问日志。最后,您将学习为 Apache Airflow 开发团队创建 IAM 策略。

注意

此页面上的 CloudFormation 模板为 CloudFormation 中可用的最新版本 Apache Airflow 创建了 Amazon MWAA 环境。可用的最新版本是 Apache Airflow v3.0.6。

CloudFormation 模板创建以下各项:

  • VPC 基础设施。模板使用 通过互联网进行公共路由。它为 WebserverAccessMode: PUBLIC_ONLY 中的 Apache Airflow Web 服务器使用 公有网络访问模式

  • Amazon S3 桶。模板将创建带有 dags 文件夹的 Amazon S3 存储桶。将其配置为在启用存储桶版本控制的情况下阻止所有公共访问,如 为 Amazon MWAA 创建 Amazon S3 存储桶。 中所定义。

  • Amazon MWAA 环境。该模板创建了与 Amazon S3 存储桶上的 dags 文件夹关联的 Amazon MWAA 环境、拥有 Amazon MWAA 所用的 AWS 服务权限的执行角色以及使用 AWS 自有密钥进行加密的默认角色,如 创建 Amazon MWAA 环境 中所定义。

  • CloudWatch 日志 该模板启用 CloudWatch 中的信息及以上级别的 Apache Airflow 日志,用于 Airflow 计划程序日志组Airflow Web 服务器日志组Airflow 工作线程日志组Airflow DAG 处理日志组Airflow 任务日志组,如 在 Amazon CloudWatch 中访问 Airflow 日志 中所定义。

在本教程中,您将完成以下任务:

  • 上传并运行 DAG。将 Amazon MWAA 支持的最新 Apache Airflow 版本的 Apache Airflow 教程 DAG 上传到 Amazon S3,然后在 Apache Airflow UI 中运行,如 添加或更新 DAG 中所定义。

  • 访问日志:在 CloudWatch Logs 中访问 Airflow Web 服务器日志组,如 在 Amazon CloudWatch 中访问 Airflow 日志 中所定义。

  • 创建访问控制策略。在 IAM 中为 Apache Airflow 开发团队创建访问控制策略,如 访问 Amazon MWAA 环境 中所定义。

注意

在托管 Amazon MWAA 环境的 VPC 中,将所有连接的子网的 assignIpv6AddressOnCreation 设置为 true。此设置可确保自动将 Internet 协议版本 6 (IPv6) 地址分配给这些子网中的资源。

先决条件

AWS Command Line Interface (AWS CLI) 是一种开源工具,您可以用来在命令行 Shell 中使用命令与 AWS 服务进行交互。要完成本节中的步骤,您需要以下满足以下条件:

步骤 1:将 CloudFormation 模板保存到本地

  • 复制以下模板的内容并将其作为 mwaa-public-network.yml 保存在本地中。您也可以使用下载模板

    AWSTemplateFormatVersion: "2010-09-09" Parameters: EnvironmentName: Description: An environment name that is prefixed to resource names Type: String Default: MWAAEnvironment VpcCIDR: Description: The IP range (CIDR notation) for this VPC Type: String Default: 10.192.0.0/16 PublicSubnet1CIDR: Description: The IP range (CIDR notation) for the public subnet in the first Availability Zone Type: String Default: 10.192.10.0/24 PublicSubnet2CIDR: Description: The IP range (CIDR notation) for the public subnet in the second Availability Zone Type: String Default: 10.192.11.0/24 PrivateSubnet1CIDR: Description: The IP range (CIDR notation) for the private subnet in the first Availability Zone Type: String Default: 10.192.20.0/24 PrivateSubnet2CIDR: Description: The IP range (CIDR notation) for the private subnet in the second Availability Zone Type: String Default: 10.192.21.0/24 MaxWorkerNodes: Description: The maximum number of workers that can run in the environment Type: Number Default: 2 DagProcessingLogs: Description: Log level for DagProcessing Type: String Default: INFO SchedulerLogsLevel: Description: Log level for SchedulerLogs Type: String Default: INFO TaskLogsLevel: Description: Log level for TaskLogs Type: String Default: INFO WorkerLogsLevel: Description: Log level for WorkerLogs Type: String Default: INFO WebserverLogsLevel: Description: Log level for WebserverLogs Type: String Default: INFO Resources: ##################################################################################################################### # CREATE VPC ##################################################################################################################### VPC: Type: AWS::EC2::VPC Properties: CidrBlock: !Ref VpcCIDR EnableDnsSupport: true EnableDnsHostnames: true Tags: - Key: Name Value: MWAAEnvironment InternetGateway: Type: AWS::EC2::InternetGateway Properties: Tags: - Key: Name Value: MWAAEnvironment InternetGatewayAttachment: Type: AWS::EC2::VPCGatewayAttachment Properties: InternetGatewayId: !Ref InternetGateway VpcId: !Ref VPC PublicSubnet1: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 0, !GetAZs '' ] CidrBlock: !Ref PublicSubnet1CIDR MapPublicIpOnLaunch: true Tags: - Key: Name Value: !Sub ${EnvironmentName} Public Subnet (AZ1) PublicSubnet2: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 1, !GetAZs '' ] CidrBlock: !Ref PublicSubnet2CIDR MapPublicIpOnLaunch: true Tags: - Key: Name Value: !Sub ${EnvironmentName} Public Subnet (AZ2) PrivateSubnet1: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 0, !GetAZs '' ] CidrBlock: !Ref PrivateSubnet1CIDR MapPublicIpOnLaunch: false Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Subnet (AZ1) PrivateSubnet2: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 1, !GetAZs '' ] CidrBlock: !Ref PrivateSubnet2CIDR MapPublicIpOnLaunch: false Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Subnet (AZ2) NatGateway1EIP: Type: AWS::EC2::EIP DependsOn: InternetGatewayAttachment Properties: Domain: vpc NatGateway2EIP: Type: AWS::EC2::EIP DependsOn: InternetGatewayAttachment Properties: Domain: vpc NatGateway1: Type: AWS::EC2::NatGateway Properties: AllocationId: !GetAtt NatGateway1EIP.AllocationId SubnetId: !Ref PublicSubnet1 NatGateway2: Type: AWS::EC2::NatGateway Properties: AllocationId: !GetAtt NatGateway2EIP.AllocationId SubnetId: !Ref PublicSubnet2 PublicRouteTable: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${EnvironmentName} Public Routes DefaultPublicRoute: Type: AWS::EC2::Route DependsOn: InternetGatewayAttachment Properties: RouteTableId: !Ref PublicRouteTable DestinationCidrBlock: 0.0.0.0/0 GatewayId: !Ref InternetGateway PublicSubnet1RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PublicRouteTable SubnetId: !Ref PublicSubnet1 PublicSubnet2RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PublicRouteTable SubnetId: !Ref PublicSubnet2 PrivateRouteTable1: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Routes (AZ1) DefaultPrivateRoute1: Type: AWS::EC2::Route Properties: RouteTableId: !Ref PrivateRouteTable1 DestinationCidrBlock: 0.0.0.0/0 NatGatewayId: !Ref NatGateway1 PrivateSubnet1RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PrivateRouteTable1 SubnetId: !Ref PrivateSubnet1 PrivateRouteTable2: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Routes (AZ2) DefaultPrivateRoute2: Type: AWS::EC2::Route Properties: RouteTableId: !Ref PrivateRouteTable2 DestinationCidrBlock: 0.0.0.0/0 NatGatewayId: !Ref NatGateway2 PrivateSubnet2RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PrivateRouteTable2 SubnetId: !Ref PrivateSubnet2 SecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupName: "mwaa-security-group" GroupDescription: "Security group with a self-referencing inbound rule." VpcId: !Ref VPC SecurityGroupIngress: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: !Ref SecurityGroup IpProtocol: "-1" SourceSecurityGroupId: !Ref SecurityGroup EnvironmentBucket: Type: AWS::S3::Bucket Properties: VersioningConfiguration: Status: Enabled PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true ##################################################################################################################### # CREATE MWAA ##################################################################################################################### MwaaEnvironment: Type: AWS::MWAA::Environment DependsOn: MwaaExecutionPolicy Properties: Name: !Sub "${AWS::StackName}-MwaaEnvironment" SourceBucketArn: !GetAtt EnvironmentBucket.Arn ExecutionRoleArn: !GetAtt MwaaExecutionRole.Arn DagS3Path: dags/ NetworkConfiguration: SecurityGroupIds: - !GetAtt SecurityGroup.GroupId SubnetIds: - !Ref PrivateSubnet1 - !Ref PrivateSubnet2 WebserverAccessMode: PUBLIC_ONLY MaxWorkers: !Ref MaxWorkerNodes LoggingConfiguration: DagProcessingLogs: LogLevel: !Ref DagProcessingLogs Enabled: true SchedulerLogs: LogLevel: !Ref SchedulerLogsLevel Enabled: true TaskLogs: LogLevel: !Ref TaskLogsLevel Enabled: true WorkerLogs: LogLevel: !Ref WorkerLogsLevel Enabled: true WebserverLogs: LogLevel: !Ref WebserverLogsLevel Enabled: true MwaaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17&TCX5-2025-waiver; Statement: - Effect: Allow Principal: Service: - airflow-env.amazonaws.com - airflow.amazonaws.com Action: - "sts:AssumeRole" Path: "/service-role/" MwaaExecutionPolicy: DependsOn: EnvironmentBucket Type: AWS::IAM::ManagedPolicy Properties: Roles: - !Ref MwaaExecutionRole PolicyDocument: Version: 2012-10-17&TCX5-2025-waiver; Statement: - Effect: Allow Action: airflow:PublishMetrics Resource: - !Sub "arn:aws:airflow:${AWS::Region}:${AWS::AccountId}:environment/${EnvironmentName}" - Effect: Deny Action: s3:ListAllMyBuckets Resource: - !Sub "${EnvironmentBucket.Arn}" - !Sub "${EnvironmentBucket.Arn}/*" - Effect: Allow Action: - "s3:GetObject*" - "s3:GetBucket*" - "s3:List*" Resource: - !Sub "${EnvironmentBucket.Arn}" - !Sub "${EnvironmentBucket.Arn}/*" - Effect: Allow Action: - logs:DescribeLogGroups Resource: "*" - Effect: Allow Action: - logs:CreateLogStream - logs:CreateLogGroup - logs:PutLogEvents - logs:GetLogEvents - logs:GetLogRecord - logs:GetLogGroupFields - logs:GetQueryResults - logs:DescribeLogGroups Resource: - !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:airflow-${AWS::StackName}*" - Effect: Allow Action: cloudwatch:PutMetricData Resource: "*" - Effect: Allow Action: - sqs:ChangeMessageVisibility - sqs:DeleteMessage - sqs:GetQueueAttributes - sqs:GetQueueUrl - sqs:ReceiveMessage - sqs:SendMessage Resource: - !Sub "arn:aws:sqs:${AWS::Region}:*:airflow-celery-*" - Effect: Allow Action: - kms:Decrypt - kms:DescribeKey - "kms:GenerateDataKey*" - kms:Encrypt NotResource: !Sub "arn:aws:kms:*:${AWS::AccountId}:key/*" Condition: StringLike: "kms:ViaService": - !Sub "sqs.${AWS::Region}.amazonaws.com" Outputs: VPC: Description: A reference to the created VPC Value: !Ref VPC PublicSubnets: Description: A list of the public subnets Value: !Join [ ",", [ !Ref PublicSubnet1, !Ref PublicSubnet2 ]] PrivateSubnets: Description: A list of the private subnets Value: !Join [ ",", [ !Ref PrivateSubnet1, !Ref PrivateSubnet2 ]] PublicSubnet1: Description: A reference to the public subnet in the 1st Availability Zone Value: !Ref PublicSubnet1 PublicSubnet2: Description: A reference to the public subnet in the 2nd Availability Zone Value: !Ref PublicSubnet2 PrivateSubnet1: Description: A reference to the private subnet in the 1st Availability Zone Value: !Ref PrivateSubnet1 PrivateSubnet2: Description: A reference to the private subnet in the 2nd Availability Zone Value: !Ref PrivateSubnet2 SecurityGroupIngress: Description: Security group with self-referencing inbound rule Value: !Ref SecurityGroupIngress MwaaApacheAirflowUI: Description: MWAA Environment Value: !Sub "https://${MwaaEnvironment.WebserverUrl}"

步骤 2:使用 AWS CLI 创建堆栈

  1. 在命令提示符下,导航到存储 mwaa-public-network.yml 的目录。例如:

    cd mwaaproject
  2. 输入 aws cloudformation create-stack 命令来使用 AWS CLI 创建堆栈。

    aws cloudformation create-stack --stack-name mwaa-environment-public-network --template-body file://mwaa-public-network.yml --capabilities CAPABILITY_IAM
    注意

    创建 Amazon VPC 基础设施、Amazon S3 存储桶和 Amazon MWAA 环境需要 30 多分钟。

步骤 3:将 DAG 上传到 Amazon S3 并在 Apache Airflow UI 中运行

  1. 复制支持的最新 Apache Airflow 版本tutorial.py 文件内容,然后在本地另存为 tutorial.py

  2. 在命令提示符下,导航到存储 tutorial.py 的目录。例如:

    cd mwaaproject
  3. 以下示例列出所有 Amazon S3 存储桶。

    aws s3 ls
  4. 使用以下命令列出 Amazon S3 存储桶中适合环境的文件和文件夹。

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  5. 使用以下脚本将 tutorial.py 文件上传到 dags 文件夹。替换 amzn-s3-demo-bucket 中的样本值。

    aws s3 cp tutorial.py s3://amzn-s3-demo-bucket/dags/
  6. 在 Amazon MWAA 控制台上打开环境页面

  7. 选择环境。

  8. 选择打开 Airflow UI

  9. 在 Apache Airflow UI 上,从可用 DAG 列表中选择教程 DAG。

  10. 在 DAG 详细信息页面上,选择 DAG 名称旁边的暂停/取消暂停 DAG 开关以取消暂停 DAG。

  11. 选择触发 DAG

步骤 4:在 CloudWatch Logs 中访问日志

您可以在 CloudWatch 控制台中访问 Apache Airflow 日志,了解 CloudFormation 堆栈启用的所有 Apache Airflow 日志。下一节介绍如何访问 Airflow Web 服务器日志组的日志。

  1. 在 Amazon MWAA 控制台上打开环境页面

  2. 选择环境。

  3. 监控窗格上选择 Airflow Web 服务器日志组

  4. 日志流中选择 webserver_console_ip 日志。

接下来做什么?