Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Connessione ad Amazon ECS tramite ECSOperator
Questo argomento descrive come utilizzare il ECSOperator per connettersi a un container Amazon Elastic Container Service (Amazon ECS) da Amazon MWAA. Nei passaggi seguenti, aggiungerai le autorizzazioni necessarie al ruolo di esecuzione del tuo ambiente, utilizzerai un AWS CloudFormation modello per creare un cluster Amazon ECS Fargate e infine creerai e caricherai un DAG che si connette al tuo nuovo cluster.
Versione
È possibile utilizzare l'esempio di codice in questa pagina con Apache Airflow v2 in Python 3.10 e Apache Airflow v3in Python 3.11
Prerequisiti
Per utilizzare il codice di esempio in questa pagina, avrai bisogno di quanto segue:
-
Un ambiente Amazon MWAA.
Autorizzazioni
-
Il ruolo di esecuzione per il tuo ambiente richiede l'autorizzazione per eseguire attività in Amazon ECS. Puoi allegare la politica FullAccess AWS gestita da AmazonECS_
al tuo ruolo di esecuzione oppure creare e allegare la seguente politica al tuo ruolo di esecuzione. -
Oltre ad aggiungere le autorizzazioni necessarie per eseguire attività in Amazon ECS, devi anche modificare l'informativa sulla politica CloudWatch Logs nel tuo ruolo di esecuzione Amazon MWAA per consentire l'accesso al gruppo di log di attività di Amazon ECS come elencato di seguito. Il gruppo di log di Amazon ECS viene creato dal AWS CloudFormation modello inCrea un cluster Amazon ECS.
{ "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:CreateLogGroup", "logs:PutLogEvents", "logs:GetLogEvents", "logs:GetLogRecord", "logs:GetLogGroupFields", "logs:GetQueryResults" ], "Resource": [ "arn:aws:logs:us-east-1:123456789012:log-group:airflow-environment-name-*", "arn:aws:logs:*:*:log-group:ecs-mwaa-group:*" ] }
Per ulteriori informazioni sul ruolo di esecuzione di Amazon MWAA e su come allegare una policy, consulta. Ruolo di esecuzione
Crea un cluster Amazon ECS
Utilizzando il AWS CloudFormation modello seguente, creerai un cluster Amazon ECS Fargate da utilizzare con il tuo flusso di lavoro Amazon MWAA. Per ulteriori informazioni, consulta la sezione Creazione di una definizione di attività nella Amazon Elastic Container Service Developer Guide.
-
Crea un file JSON con il codice seguente e salvalo con nome.
ecs-mwaa-cfn.json{ "AWSTemplateFormatVersion": "2010-09-09", "Description": "This template deploys an ECS Fargate cluster with an Amazon Linux image as a test for MWAA.", "Parameters": { "VpcId": { "Type": "AWS::EC2::VPC::Id", "Description": "Select a VPC that allows instances access to ECR, as used with MWAA." }, "SubnetIds": { "Type": "List<AWS::EC2::Subnet::Id>", "Description": "Select at two private subnets in your selected VPC, as used with MWAA." }, "SecurityGroups": { "Type": "List<AWS::EC2::SecurityGroup::Id>", "Description": "Select at least one security group in your selected VPC, as used with MWAA." } }, "Resources": { "Cluster": { "Type": "AWS::ECS::Cluster", "Properties": { "ClusterName": { "Fn::Sub": "${AWS::StackName}-cluster" } } }, "LogGroup": { "Type": "AWS::Logs::LogGroup", "Properties": { "LogGroupName": { "Ref": "AWS::StackName" }, "RetentionInDays": 30 } }, "ExecutionRole": { "Type": "AWS::IAM::Role", "Properties": { "AssumeRolePolicyDocument": { "Statement": [ { "Effect": "Allow", "Principal": { "Service": "ecs-tasks.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }, "ManagedPolicyArns": [ "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy" ] } }, "TaskDefinition": { "Type": "AWS::ECS::TaskDefinition", "Properties": { "Family": { "Fn::Sub": "${AWS::StackName}-task" }, "Cpu": 2048, "Memory": 4096, "NetworkMode": "awsvpc", "ExecutionRoleArn": { "Ref": "ExecutionRole" }, "ContainerDefinitions": [ { "Name": { "Fn::Sub": "${AWS::StackName}-container" }, "Image": "137112412989.dkr.ecr.us-east-1.amazonaws.com/amazonlinux:latest", "PortMappings": [ { "Protocol": "tcp", "ContainerPort": 8080, "HostPort": 8080 } ], "LogConfiguration": { "LogDriver": "awslogs", "Options": { "awslogs-region": { "Ref": "AWS::Region" }, "awslogs-group": { "Ref": "LogGroup" }, "awslogs-stream-prefix": "ecs" } } } ], "RequiresCompatibilities": [ "FARGATE" ] } }, "Service": { "Type": "AWS::ECS::Service", "Properties": { "ServiceName": { "Fn::Sub": "${AWS::StackName}-service" }, "Cluster": { "Ref": "Cluster" }, "TaskDefinition": { "Ref": "TaskDefinition" }, "DesiredCount": 1, "LaunchType": "FARGATE", "PlatformVersion": "1.3.0", "NetworkConfiguration": { "AwsvpcConfiguration": { "AssignPublicIp": "ENABLED", "Subnets": { "Ref": "SubnetIds" }, "SecurityGroups": { "Ref": "SecurityGroups" } } } } } } } -
Nel prompt dei comandi, usa il seguente AWS CLI comando per creare un nuovo stack. Devi sostituire i valori
SecurityGroupseSubnetIdscon i valori per i gruppi di sicurezza e le sottoreti del tuo ambiente Amazon MWAA.aws cloudformation create-stack \ --stack-namemy-ecs-stack--template-body file://ecs-mwaa-cfn.json \ --parameters ParameterKey=SecurityGroups,ParameterValue=your-mwaa-security-group\ ParameterKey=SubnetIds,ParameterValue=your-mwaa-subnet-1\\,your-mwaa-subnet-1\ --capabilities CAPABILITY_IAMIn alternativa, puoi utilizzare il seguente script di shell. Lo script recupera i valori richiesti per i gruppi di sicurezza e le sottoreti dell'ambiente utilizzando il
get-environmentAWS CLI comando, quindi crea lo stack di conseguenza. Per eseguire lo script, procedi come segue.-
Copiate e salvate lo script
ecs-stack-helper.shnella stessa directory del AWS CloudFormation modello.#!/bin/bash joinByString() { local separator="$1" shift local first="$1" shift printf "%s" "$first" "${@/#/$separator}" } response=$(aws mwaa get-environment --name $1) securityGroupId=$(echo "$response" | jq -r '.Environment.NetworkConfiguration.SecurityGroupIds[]') subnetIds=$(joinByString '\,' $(echo "$response" | jq -r '.Environment.NetworkConfiguration.SubnetIds[]')) aws cloudformation create-stack --stack-name $2 --template-body file://ecs-cfn.json \ --parameters ParameterKey=SecurityGroups,ParameterValue=$securityGroupId \ ParameterKey=SubnetIds,ParameterValue=$subnetIds \ --capabilities CAPABILITY_IAM -
Esegui lo script utilizzando i seguenti comandi. Sostituisci
environment-nameestack-namecon le tue informazioni.chmod +x ecs-stack-helper.sh./ecs-stack-helper.bashenvironment-namestack-name
In caso di successo, farai riferimento al seguente output che mostra il nuovo ID AWS CloudFormation dello stack.
{ "StackId": "arn:aws:cloudformation:us-west-2:123456789012:stack/my-ecs-stack/123456e7-8ab9-01cd-b2fb-36cce63786c9" } -
Una volta completato AWS CloudFormation lo stack e AWS aver effettuato il provisioning delle risorse Amazon ECS, sei pronto per creare e caricare il tuo DAG.
Esempio di codice
-
Apri un prompt dei comandi e accedi alla directory in cui è archiviato il codice DAG. Ad esempio:
cd dags -
Copia il contenuto del seguente esempio di codice e salvalo localmente con nome
mwaa-ecs-operator.py, quindi carica il nuovo DAG su Amazon S3.from http import client from airflow import DAG from airflow.providers.amazon.aws.operators.ecs import ECSOperator from airflow.utils.dates import days_ago import boto3 CLUSTER_NAME="mwaa-ecs-test-cluster" #Replace value for CLUSTER_NAME with your information. CONTAINER_NAME="mwaa-ecs-test-container" #Replace value for CONTAINER_NAME with your information. LAUNCH_TYPE="FARGATE" with DAG( dag_id = "ecs_fargate_dag", schedule_interval=None, catchup=False, start_date=days_ago(1) ) as dag: client=boto3.client('ecs') services=client.list_services(cluster=CLUSTER_NAME,launchType=LAUNCH_TYPE) service=client.describe_services(cluster=CLUSTER_NAME,services=services['serviceArns']) ecs_operator_task = ECSOperator( task_id = "ecs_operator_task", dag=dag, cluster=CLUSTER_NAME, task_definition=service['services'][0]['taskDefinition'], launch_type=LAUNCH_TYPE, overrides={ "containerOverrides":[ { "name":CONTAINER_NAME, "command":["ls", "-l", "/"], }, ], }, network_configuration=service['services'][0]['networkConfiguration'], awslogs_group="mwaa-ecs-zero", awslogs_stream_prefix=f"ecs/{CONTAINER_NAME}", )Nota
Nell'esempio DAG, for
awslogs_group, potrebbe essere necessario modificare il gruppo di log con il nome del gruppo di task log di Amazon ECS. L'esempio presuppone un gruppo di log denominato.mwaa-ecs-zeroPerawslogs_stream_prefix, usa il prefisso del flusso del task log di Amazon ECS. L'esempio presuppone un prefisso del flusso di log,.ecs -
Esegui il AWS CLI comando seguente per copiare il DAG nel bucket del tuo ambiente, quindi attiva il DAG utilizzando l'interfaccia utente di Apache Airflow.
aws s3 cpyour-dag.py s3://your-environment-bucket/dags/ -
In caso di successo, otterrete un output simile al seguente nei log delle attività del DAG:
ecs_operator_taskecs_fargate_dag[2022-01-01, 12:00:00 UTC] {{ecs.py:300}} INFO - Running ECS Task - Task definition: arn:aws:ecs:us-west-2:123456789012:task-definition/mwaa-ecs-test-task:1 - on cluster mwaa-ecs-test-cluster [2022-01-01, 12:00:00 UTC] {{ecs-operator-test.py:302}} INFO - ECSOperator overrides: {'containerOverrides': [{'name': 'mwaa-ecs-test-container', 'command': ['ls', '-l', '/']}]} . . . [2022-01-01, 12:00:00 UTC] {{ecs.py:379}} INFO - ECS task ID is: e012340b5e1b43c6a757cf012c635935 [2022-01-01, 12:00:00 UTC] {{ecs.py:313}} INFO - Starting ECS Task Log Fetcher [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] total 52 [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx 1 root root 7 Jun 13 18:51 bin -> usr/bin [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 2 root root 4096 Apr 9 2019 boot [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 5 root root 340 Jul 19 17:54 dev [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 1 root root 4096 Jul 19 17:54 etc [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 2 root root 4096 Apr 9 2019 home [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx 1 root root 7 Jun 13 18:51 lib -> usr/lib [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx 1 root root 9 Jun 13 18:51 lib64 -> usr/lib64 [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 2 root root 4096 Jun 13 18:51 local [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 2 root root 4096 Apr 9 2019 media [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 2 root root 4096 Apr 9 2019 mnt [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 2 root root 4096 Apr 9 2019 opt [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 103 root root 0 Jul 19 17:54 proc [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-x-\-\- 2 root root 4096 Apr 9 2019 root [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 2 root root 4096 Jun 13 18:52 run [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx 1 root root 8 Jun 13 18:51 sbin -> usr/sbin [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 2 root root 4096 Apr 9 2019 srv [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 13 root root 0 Jul 19 17:54 sys [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxrwxrwt 2 root root 4096 Jun 13 18:51 tmp [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 13 root root 4096 Jun 13 18:51 usr [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 18 root root 4096 Jun 13 18:52 var . . . [2022-01-01, 12:00:00 UTC] {{ecs.py:328}} INFO - ECS Task has been successfully executed