Tutorial de inicio rápido de Amazon Managed Workflows para Apache Airflow
En este tutorial de inicio rápido, se usa una plantilla de AWS CloudFormation que crea la infraestructura de Amazon VPC, un bucket de Amazon S3 con una carpeta dags y un entorno de Amazon Managed Workflows para Apache Airflow al mismo tiempo.
Temas
En este tutorial:
En este tutorial, se explican tres comandos de la AWS Command Line Interface (AWS CLI) para cargar un DAG en Amazon S3, ejecutarlo en Apache Airflow y ver los registros en CloudWatch. Por último, aprenderá a crear una política de IAM para un equipo de desarrollo de Apache Airflow.
nota
La plantilla de CloudFormation de esta página crea un entorno de Amazon Managed Workflows para Apache Airflow para la última versión de Apache Airflow disponible en CloudFormation. La última versión disponible es Apache Airflow v3.0.6.
La plantilla de CloudFormation crea los siguientes recursos:
-
Infraestructura de VPC. La plantilla utiliza Enrutamiento público a través de Internet. Usa el Modo de acceso mediante red pública para el servidor web Apache Airflow en
WebserverAccessMode: PUBLIC_ONLY. -
Bucket de Amazon S. La plantilla crea un bucket de Amazon S3 con una carpeta
dags. Está configurada para bloquear todo el acceso público, con el control de versiones de buckets activado, tal y como se define en Creación de un bucket de Amazon S3 para Amazon MWAA. -
Entorno de Amazon MWAA. La plantilla crea un entorno de Amazon MWAA asociado a la carpeta
dagsdel bucket de Amazon S3, un rol de ejecución con permiso para los servicios de AWS usados por Amazon MWAA y el cifrado predeterminado mediante una clave de AWS propia, tal y como se define en Creación de entornos de Amazon MWAA. -
Registros de CloudWatch. La plantilla permite registros de Apache Airflow en CloudWatch en el nivel INFO y superior para el grupo de registros del programador de Airflow, el grupo de registros del servidor web de Airflow, el grupo de registros del proceso de trabajo de Airflow, el grupo de registros de procesamiento del DAG de Airflow y el grupo de registros de tareas de Airflow, tal como se define en Visualización de registros en Amazon CloudWatch.
Este tutorial guía por los siguientes pasos:
-
Carga y ejecutar un DAG. Cargue el DAG tutorial de Apache Airflow de la última versión de Apache Airflow compatible con Amazon MWAA en Amazon S3 y, a continuación, ejecútelo en la interfaz de usuario de Apache Airflow, tal y como se define en Cómo añadir o actualizar DAG.
-
Registros de acceso. Vea el grupo de registros del servidor web de Airflow en CloudWatch Logs, tal y como se define en Visualización de registros en Amazon CloudWatch.
-
Crear una política de control de acceso. Cree una política de control de acceso en IAM para su equipo de desarrollo de Apache Airflow, tal como se define en Acceso a un entorno de Amazon MWAA.
nota
En la VPC que aloja el entorno de Amazon MWAA, establezca assignIpv6AddressOnCreation en true para todas las subredes conectadas. Esta configuración garantiza la asignación automática de direcciones del protocolo de Internet versión 6 (IPv6) a los recursos de estas subredes.
Requisitos previos
La AWS Command Line Interface (AWS CLI) es una herramienta de código abierto que le permite interactuar con los servicios de AWS mediante el uso de comandos en el shell de la línea de comandos. Para completar los pasos de esta página, necesita lo siguiente:
Paso uno: guarde la plantilla de CloudFormation localmente
-
Copie el contenido de la siguiente plantilla y guárdelo localmente como
mwaa-public-network.yml. También puede descargar la plantilla.AWSTemplateFormatVersion: "2010-09-09" Parameters: EnvironmentName: Description: An environment name that is prefixed to resource names Type: String Default: MWAAEnvironment VpcCIDR: Description: The IP range (CIDR notation) for this VPC Type: String Default: 10.192.0.0/16 PublicSubnet1CIDR: Description: The IP range (CIDR notation) for the public subnet in the first Availability Zone Type: String Default: 10.192.10.0/24 PublicSubnet2CIDR: Description: The IP range (CIDR notation) for the public subnet in the second Availability Zone Type: String Default: 10.192.11.0/24 PrivateSubnet1CIDR: Description: The IP range (CIDR notation) for the private subnet in the first Availability Zone Type: String Default: 10.192.20.0/24 PrivateSubnet2CIDR: Description: The IP range (CIDR notation) for the private subnet in the second Availability Zone Type: String Default: 10.192.21.0/24 MaxWorkerNodes: Description: The maximum number of workers that can run in the environment Type: Number Default: 2 DagProcessingLogs: Description: Log level for DagProcessing Type: String Default: INFO SchedulerLogsLevel: Description: Log level for SchedulerLogs Type: String Default: INFO TaskLogsLevel: Description: Log level for TaskLogs Type: String Default: INFO WorkerLogsLevel: Description: Log level for WorkerLogs Type: String Default: INFO WebserverLogsLevel: Description: Log level for WebserverLogs Type: String Default: INFO Resources: ##################################################################################################################### # CREATE VPC ##################################################################################################################### VPC: Type: AWS::EC2::VPC Properties: CidrBlock: !Ref VpcCIDR EnableDnsSupport: true EnableDnsHostnames: true Tags: - Key: Name Value: MWAAEnvironment InternetGateway: Type: AWS::EC2::InternetGateway Properties: Tags: - Key: Name Value: MWAAEnvironment InternetGatewayAttachment: Type: AWS::EC2::VPCGatewayAttachment Properties: InternetGatewayId: !Ref InternetGateway VpcId: !Ref VPC PublicSubnet1: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 0, !GetAZs '' ] CidrBlock: !Ref PublicSubnet1CIDR MapPublicIpOnLaunch: true Tags: - Key: Name Value: !Sub ${EnvironmentName} Public Subnet (AZ1) PublicSubnet2: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 1, !GetAZs '' ] CidrBlock: !Ref PublicSubnet2CIDR MapPublicIpOnLaunch: true Tags: - Key: Name Value: !Sub ${EnvironmentName} Public Subnet (AZ2) PrivateSubnet1: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 0, !GetAZs '' ] CidrBlock: !Ref PrivateSubnet1CIDR MapPublicIpOnLaunch: false Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Subnet (AZ1) PrivateSubnet2: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 1, !GetAZs '' ] CidrBlock: !Ref PrivateSubnet2CIDR MapPublicIpOnLaunch: false Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Subnet (AZ2) NatGateway1EIP: Type: AWS::EC2::EIP DependsOn: InternetGatewayAttachment Properties: Domain: vpc NatGateway2EIP: Type: AWS::EC2::EIP DependsOn: InternetGatewayAttachment Properties: Domain: vpc NatGateway1: Type: AWS::EC2::NatGateway Properties: AllocationId: !GetAtt NatGateway1EIP.AllocationId SubnetId: !Ref PublicSubnet1 NatGateway2: Type: AWS::EC2::NatGateway Properties: AllocationId: !GetAtt NatGateway2EIP.AllocationId SubnetId: !Ref PublicSubnet2 PublicRouteTable: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${EnvironmentName} Public Routes DefaultPublicRoute: Type: AWS::EC2::Route DependsOn: InternetGatewayAttachment Properties: RouteTableId: !Ref PublicRouteTable DestinationCidrBlock: 0.0.0.0/0 GatewayId: !Ref InternetGateway PublicSubnet1RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PublicRouteTable SubnetId: !Ref PublicSubnet1 PublicSubnet2RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PublicRouteTable SubnetId: !Ref PublicSubnet2 PrivateRouteTable1: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Routes (AZ1) DefaultPrivateRoute1: Type: AWS::EC2::Route Properties: RouteTableId: !Ref PrivateRouteTable1 DestinationCidrBlock: 0.0.0.0/0 NatGatewayId: !Ref NatGateway1 PrivateSubnet1RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PrivateRouteTable1 SubnetId: !Ref PrivateSubnet1 PrivateRouteTable2: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Routes (AZ2) DefaultPrivateRoute2: Type: AWS::EC2::Route Properties: RouteTableId: !Ref PrivateRouteTable2 DestinationCidrBlock: 0.0.0.0/0 NatGatewayId: !Ref NatGateway2 PrivateSubnet2RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PrivateRouteTable2 SubnetId: !Ref PrivateSubnet2 SecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupName: "mwaa-security-group" GroupDescription: "Security group with a self-referencing inbound rule." VpcId: !Ref VPC SecurityGroupIngress: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: !Ref SecurityGroup IpProtocol: "-1" SourceSecurityGroupId: !Ref SecurityGroup EnvironmentBucket: Type: AWS::S3::Bucket Properties: VersioningConfiguration: Status: Enabled PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true ##################################################################################################################### # CREATE MWAA ##################################################################################################################### MwaaEnvironment: Type: AWS::MWAA::Environment DependsOn: MwaaExecutionPolicy Properties: Name: !Sub "${AWS::StackName}-MwaaEnvironment" SourceBucketArn: !GetAtt EnvironmentBucket.Arn ExecutionRoleArn: !GetAtt MwaaExecutionRole.Arn DagS3Path: dags/ NetworkConfiguration: SecurityGroupIds: - !GetAtt SecurityGroup.GroupId SubnetIds: - !Ref PrivateSubnet1 - !Ref PrivateSubnet2 WebserverAccessMode: PUBLIC_ONLY MaxWorkers: !Ref MaxWorkerNodes LoggingConfiguration: DagProcessingLogs: LogLevel: !Ref DagProcessingLogs Enabled: true SchedulerLogs: LogLevel: !Ref SchedulerLogsLevel Enabled: true TaskLogs: LogLevel: !Ref TaskLogsLevel Enabled: true WorkerLogs: LogLevel: !Ref WorkerLogsLevel Enabled: true WebserverLogs: LogLevel: !Ref WebserverLogsLevel Enabled: true MwaaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17&TCX5-2025-waiver; Statement: - Effect: Allow Principal: Service: - airflow-env.amazonaws.com - airflow.amazonaws.com Action: - "sts:AssumeRole" Path: "/service-role/" MwaaExecutionPolicy: DependsOn: EnvironmentBucket Type: AWS::IAM::ManagedPolicy Properties: Roles: - !Ref MwaaExecutionRole PolicyDocument: Version: 2012-10-17&TCX5-2025-waiver; Statement: - Effect: Allow Action: airflow:PublishMetrics Resource: - !Sub "arn:aws:airflow:${AWS::Region}:${AWS::AccountId}:environment/${EnvironmentName}" - Effect: Deny Action: s3:ListAllMyBuckets Resource: - !Sub "${EnvironmentBucket.Arn}" - !Sub "${EnvironmentBucket.Arn}/*" - Effect: Allow Action: - "s3:GetObject*" - "s3:GetBucket*" - "s3:List*" Resource: - !Sub "${EnvironmentBucket.Arn}" - !Sub "${EnvironmentBucket.Arn}/*" - Effect: Allow Action: - logs:DescribeLogGroups Resource: "*" - Effect: Allow Action: - logs:CreateLogStream - logs:CreateLogGroup - logs:PutLogEvents - logs:GetLogEvents - logs:GetLogRecord - logs:GetLogGroupFields - logs:GetQueryResults - logs:DescribeLogGroups Resource: - !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:airflow-${AWS::StackName}*" - Effect: Allow Action: cloudwatch:PutMetricData Resource: "*" - Effect: Allow Action: - sqs:ChangeMessageVisibility - sqs:DeleteMessage - sqs:GetQueueAttributes - sqs:GetQueueUrl - sqs:ReceiveMessage - sqs:SendMessage Resource: - !Sub "arn:aws:sqs:${AWS::Region}:*:airflow-celery-*" - Effect: Allow Action: - kms:Decrypt - kms:DescribeKey - "kms:GenerateDataKey*" - kms:Encrypt NotResource: !Sub "arn:aws:kms:*:${AWS::AccountId}:key/*" Condition: StringLike: "kms:ViaService": - !Sub "sqs.${AWS::Region}.amazonaws.com" Outputs: VPC: Description: A reference to the created VPC Value: !Ref VPC PublicSubnets: Description: A list of the public subnets Value: !Join [ ",", [ !Ref PublicSubnet1, !Ref PublicSubnet2 ]] PrivateSubnets: Description: A list of the private subnets Value: !Join [ ",", [ !Ref PrivateSubnet1, !Ref PrivateSubnet2 ]] PublicSubnet1: Description: A reference to the public subnet in the 1st Availability Zone Value: !Ref PublicSubnet1 PublicSubnet2: Description: A reference to the public subnet in the 2nd Availability Zone Value: !Ref PublicSubnet2 PrivateSubnet1: Description: A reference to the private subnet in the 1st Availability Zone Value: !Ref PrivateSubnet1 PrivateSubnet2: Description: A reference to the private subnet in the 2nd Availability Zone Value: !Ref PrivateSubnet2 SecurityGroupIngress: Description: Security group with self-referencing inbound rule Value: !Ref SecurityGroupIngress MwaaApacheAirflowUI: Description: MWAA Environment Value: !Sub "https://${MwaaEnvironment.WebserverUrl}"
Paso dos: cree la pila con la AWS CLI
-
En el símbolo del sistema, vaya hasta el directorio en el que está almacenado
mwaa-public-network.yml. Por ejemplo:cd mwaaproject -
Utilice el comando
aws cloudformation create-stackpara crear la pila con la AWS CLI.aws cloudformation create-stack --stack-name mwaa-environment-public-network --template-body file://mwaa-public-network.yml --capabilities CAPABILITY_IAMnota
Toma más de 30 minutos crear la infraestructura de Amazon VPC, el bucket de Amazon S3 y el entorno de Amazon MWAA.
Paso tres: cargue un DAG en Amazon S3 y ejecútelo en la interfaz de usuario de Apache Airflow
-
Copie el contenido del archivo
tutorial.pyde la última versión compatible de Apache Airflowy guárdelo localmente como tutorial.py. -
En el símbolo del sistema, vaya hasta el directorio en el que está almacenado
tutorial.py. Por ejemplo:cd mwaaproject -
Use el siguiente comando para obtener una lista de todos los buckets de Amazon S3.
aws s3 ls -
Utilice el comando siguiente para enumerar los archivos y las carpetas del bucket de Amazon S3 para su entorno.
aws s3 ls s3://YOUR_S3_BUCKET_NAME -
Utilice el siguiente script para cargar el archivo
tutorial.pyen su carpetadags. Reemplace el valor de muestra enamzn-s3-demo-bucket.aws s3 cp tutorial.py s3://amzn-s3-demo-bucket/dags/ -
Abra la página Entornos
en la consola de Amazon MWAA. -
Seleccione un entorno.
-
Elija Abrir interfaz de usuario de Airflow.
-
En la interfaz de usuario de Apache Airflow, de la lista de DAG disponibles, elija el DAG tutorial.
-
En la página de detalles del DAG, seleccione la opción Pausar/Reanudar DAG que aparece junto al nombre del DAG para reanudar el DAG.
-
Elija Desencadenar DAG.
Paso cuatro: vea los registros en CloudWatch Logs
Puede ver los registros de Apache Airflow en la consola de CloudWatch para todos los registros de Apache Airflow que estaban habilitados por la pila CloudFormation. En la siguiente sección, se muestra cómo ver los registros del grupo de registros del servidor web de Airflow.
-
Abra la página Entornos
en la consola de Amazon MWAA. -
Seleccione un entorno.
-
Elija el grupo de registro del servidor web de Airflow en el panel de monitoreo.
-
Seleccione el registro
webserver_console_ipen los flujos de registro.
Siguientes pasos
-
Obtenga más información sobre cómo cargar los DAG, especificar las dependencias de Python en
requirements.txty personalizar plugins en unplugins.zipen Trabajo con DAG en Amazon MWAA. -
Obtenga más información sobre las mejores prácticas que recomendamos para ajustar el rendimiento de su entorno Ajuste del desempeño de Apache Airflow en Amazon MWAA.
-
Cree un panel de supervisión para su entorno en Monitorización de paneles y alarmas en Amazon MWAA.
-
Ejecute algunos de los ejemplos de código de DAG en Códigos de ejemplo de Amazon Managed Workflows para Apache Airflow.