Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Herstellen einer Verbindung zu Amazon ECS mithilfe der ECSOperator
In diesem Thema wird beschrieben, wie Sie den verwenden könnenECSOperator
, um eine Verbindung zu einem Amazon Elastic Container Service (Amazon ECS) -Container von Amazon MWAA herzustellen. In den folgenden Schritten fügen Sie der Ausführungsrolle Ihrer Umgebung die erforderlichen Berechtigungen hinzu, verwenden eine AWS CloudFormation Vorlage, um einen Amazon ECS Fargate-Cluster zu erstellen, und laden schließlich eine DAG hoch, die eine Verbindung zu Ihrem neuen Cluster herstellt.
Version
-
Sie können das Codebeispiel auf dieser Seite mit Apache Airflow v2 in Python 3.10
verwenden.
Voraussetzungen
Um den Beispielcode auf dieser Seite zu verwenden, benötigen Sie Folgendes:
-
Eine Amazon MWAA-Umgebung.
Berechtigungen
-
Die Ausführungsrolle für Ihre Umgebung benötigt die Erlaubnis, Aufgaben in Amazon ECS auszuführen. Sie können entweder die von AmazonECS_ FullAccess
AWS verwaltete Richtlinie an Ihre Ausführungsrolle anhängen oder die folgende Richtlinie erstellen und an Ihre Ausführungsrolle anhängen. { "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "ecs:RunTask", "ecs:DescribeTasks" ], "Resource": "*" }, { "Action": "iam:PassRole", "Effect": "Allow", "Resource": [ "*" ], "Condition": { "StringLike": { "iam:PassedToService": "ecs-tasks.amazonaws.com" } } } ] }
-
Sie müssen nicht nur die erforderlichen Berechtigungen für die Ausführung von Aufgaben in Amazon ECS hinzufügen, sondern auch die CloudWatch Protokollrichtlinie in Ihrer Amazon MWAA-Ausführungsrolle ändern, um den Zugriff auf die Amazon ECS-Aufgabenprotokollgruppe zu ermöglichen, wie im Folgenden dargestellt. Die Amazon ECS-Protokollgruppe wird durch die AWS CloudFormation Vorlage in erstelltErstellen Sie einen Amazon ECS-Cluster.
{ "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:CreateLogGroup", "logs:PutLogEvents", "logs:GetLogEvents", "logs:GetLogRecord", "logs:GetLogGroupFields", "logs:GetQueryResults" ], "Resource": [ "arn:aws:logs:
region
:account-id
:log-group:airflow-environment-name
-*", "arn:aws:logs:*:*:log-group:ecs-mwaa-group
:*" ] }
Weitere Informationen zur Amazon MWAA-Ausführungsrolle und zum Anhängen einer Richtlinie finden Sie unter. Ausführungsrolle
Erstellen Sie einen Amazon ECS-Cluster
Mithilfe der folgenden AWS CloudFormation Vorlage erstellen Sie einen Amazon ECS Fargate-Cluster zur Verwendung mit Ihrem Amazon MWAA-Workflow. Weitere Informationen finden Sie unter Erstellen einer Aufgabendefinition im Amazon Elastic Container Service Developer Guide.
-
Erstellen Sie eine JSON-Datei mit dem folgenden Code und speichern Sie sie unter
ecs-mwaa-cfn.json
.{ "AWSTemplateFormatVersion": "2010-09-09", "Description": "This template deploys an ECS Fargate cluster with an Amazon Linux image as a test for MWAA.", "Parameters": { "VpcId": { "Type": "AWS::EC2::VPC::Id", "Description": "Select a VPC that allows instances access to ECR, as used with MWAA." }, "SubnetIds": { "Type": "List<AWS::EC2::Subnet::Id>", "Description": "Select at two private subnets in your selected VPC, as used with MWAA." }, "SecurityGroups": { "Type": "List<AWS::EC2::SecurityGroup::Id>", "Description": "Select at least one security group in your selected VPC, as used with MWAA." } }, "Resources": { "Cluster": { "Type": "AWS::ECS::Cluster", "Properties": { "ClusterName": { "Fn::Sub": "${AWS::StackName}-cluster" } } }, "LogGroup": { "Type": "AWS::Logs::LogGroup", "Properties": { "LogGroupName": { "Ref": "AWS::StackName" }, "RetentionInDays": 30 } }, "ExecutionRole": { "Type": "AWS::IAM::Role", "Properties": { "AssumeRolePolicyDocument": { "Statement": [ { "Effect": "Allow", "Principal": { "Service": "ecs-tasks.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }, "ManagedPolicyArns": [ "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy" ] } }, "TaskDefinition": { "Type": "AWS::ECS::TaskDefinition", "Properties": { "Family": { "Fn::Sub": "${AWS::StackName}-task" }, "Cpu": 2048, "Memory": 4096, "NetworkMode": "awsvpc", "ExecutionRoleArn": { "Ref": "ExecutionRole" }, "ContainerDefinitions": [ { "Name": { "Fn::Sub": "${AWS::StackName}-container" }, "Image": "137112412989.dkr.ecr.us-east-1.amazonaws.com/amazonlinux:latest", "PortMappings": [ { "Protocol": "tcp", "ContainerPort": 8080, "HostPort": 8080 } ], "LogConfiguration": { "LogDriver": "awslogs", "Options": { "awslogs-region": { "Ref": "AWS::Region" }, "awslogs-group": { "Ref": "LogGroup" }, "awslogs-stream-prefix": "ecs" } } } ], "RequiresCompatibilities": [ "FARGATE" ] } }, "Service": { "Type": "AWS::ECS::Service", "Properties": { "ServiceName": { "Fn::Sub": "${AWS::StackName}-service" }, "Cluster": { "Ref": "Cluster" }, "TaskDefinition": { "Ref": "TaskDefinition" }, "DesiredCount": 1, "LaunchType": "FARGATE", "PlatformVersion": "1.3.0", "NetworkConfiguration": { "AwsvpcConfiguration": { "AssignPublicIp": "ENABLED", "Subnets": { "Ref": "SubnetIds" }, "SecurityGroups": { "Ref": "SecurityGroups" } } } } } } }
-
Verwenden Sie in Ihrer Befehlszeile den folgenden AWS CLI Befehl, um einen neuen Stack zu erstellen. Sie müssen die Werte
SecurityGroups
undSubnetIds
durch Werte für die Sicherheitsgruppen und Subnetze Ihrer Amazon MWAA-Umgebung ersetzen.$
aws cloudformation create-stack \ --stack-name
my-ecs-stack
--template-body file://ecs-mwaa-cfn.json \ --parameters ParameterKey=SecurityGroups,ParameterValue=your-mwaa-security-group
\ ParameterKey=SubnetIds,ParameterValue=your-mwaa-subnet-1
\\,your-mwaa-subnet-1
\ --capabilities CAPABILITY_IAMAlternativ können Sie das folgende Shell-Skript verwenden. Das Skript ruft mithilfe des
get-environment
AWS CLI Befehls die erforderlichen Werte für die Sicherheitsgruppen und Subnetze Ihrer Umgebung ab und erstellt dann den Stack entsprechend. Gehen Sie wie folgt vor, um das Skript auszuführen.-
Kopieren und speichern Sie das Skript
ecs-stack-helper.sh
in demselben Verzeichnis wie Ihre AWS CloudFormation Vorlage.#!/bin/bash joinByString() { local separator="$1" shift local first="$1" shift printf "%s" "$first" "${@/#/$separator}" } response=$(aws mwaa get-environment --name $1) securityGroupId=$(echo "$response" | jq -r '.Environment.NetworkConfiguration.SecurityGroupIds[]') subnetIds=$(joinByString '\,' $(echo "$response" | jq -r '.Environment.NetworkConfiguration.SubnetIds[]')) aws cloudformation create-stack --stack-name $2 --template-body file://ecs-cfn.json \ --parameters ParameterKey=SecurityGroups,ParameterValue=$securityGroupId \ ParameterKey=SubnetIds,ParameterValue=$subnetIds \ --capabilities CAPABILITY_IAM
-
Führen Sie das Skript mit den folgenden Befehlen aus. Ersetzen Sie
environment-name
undstack-name
durch Ihre Informationen.$
chmod +x ecs-stack-helper.sh
$
./ecs-stack-helper.bash
environment-name
stack-name
Bei Erfolg wird die folgende Ausgabe angezeigt, in der Ihre neue AWS CloudFormation Stack-ID angezeigt wird.
{ "StackId": "arn:aws:cloudformation:us-west-2:123456789012:stack/my-ecs-stack/123456e7-8ab9-01cd-b2fb-36cce63786c9" }
-
Nachdem Ihr AWS CloudFormation Stack AWS fertiggestellt und Ihre Amazon ECS-Ressourcen bereitgestellt wurden, können Sie Ihre DAG erstellen und hochladen.
Codebeispiel
-
Öffnen Sie eine Befehlszeile und navigieren Sie zu dem Verzeichnis, in dem Ihr DAG-Code gespeichert ist. Zum Beispiel:
cd dags
-
Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal unter. Laden Sie dann Ihre neue DAG auf Amazon S3 hoch.
mwaa-ecs-operator.py
from http import client from airflow import DAG from airflow.providers.amazon.aws.operators.ecs import ECSOperator from airflow.utils.dates import days_ago import boto3 CLUSTER_NAME="mwaa-ecs-test-cluster" #Replace value for CLUSTER_NAME with your information. CONTAINER_NAME="mwaa-ecs-test-container" #Replace value for CONTAINER_NAME with your information. LAUNCH_TYPE="FARGATE" with DAG( dag_id = "ecs_fargate_dag", schedule_interval=None, catchup=False, start_date=days_ago(1) ) as dag: client=boto3.client('ecs') services=client.list_services(cluster=CLUSTER_NAME,launchType=LAUNCH_TYPE) service=client.describe_services(cluster=CLUSTER_NAME,services=services['serviceArns']) ecs_operator_task = ECSOperator( task_id = "ecs_operator_task", dag=dag, cluster=CLUSTER_NAME, task_definition=service['services'][0]['taskDefinition'], launch_type=LAUNCH_TYPE, overrides={ "containerOverrides":[ { "name":CONTAINER_NAME, "command":["ls", "-l", "/"], }, ], }, network_configuration=service['services'][0]['networkConfiguration'], awslogs_group="mwaa-ecs-zero", awslogs_stream_prefix=f"ecs/{CONTAINER_NAME}", )
Anmerkung
In der Beispiel-DAG für
awslogs_group
müssen Sie möglicherweise die Protokollgruppe mit dem Namen Ihrer Amazon ECS-Aufgabenprotokollgruppe ändern. Das Beispiel geht von einer Protokollgruppe mit dem Namen ausmwaa-ecs-zero
. Verwenden Sie fürawslogs_stream_prefix
das Amazon ECS-Aufgabenprotokoll-Stream-Präfix. Das Beispiel geht von einem Log-Stream-Präfix aus,ecs
. -
Führen Sie den folgenden AWS CLI Befehl aus, um die DAG in den Bucket Ihrer Umgebung zu kopieren, und lösen Sie dann die DAG mithilfe der Apache Airflow-Benutzeroberfläche aus.
$
aws s3 cp
your-dag
.py s3://your-environment-bucket
/dags/ -
Bei Erfolg erhalten Sie in den Aufgabenprotokollen für die
ecs_fargate_dag
DAG eine Ausgabe, dieecs_operator_task
der folgenden ähnelt:[2022-01-01, 12:00:00 UTC] {{ecs.py:300}} INFO - Running ECS Task - Task definition: arn:aws:ecs:us-west-2:123456789012:task-definition/mwaa-ecs-test-task:1 - on cluster mwaa-ecs-test-cluster [2022-01-01, 12:00:00 UTC] {{ecs-operator-test.py:302}} INFO - ECSOperator overrides: {'containerOverrides': [{'name': 'mwaa-ecs-test-container', 'command': ['ls', '-l', '/']}]} . . . [2022-01-01, 12:00:00 UTC] {{ecs.py:379}} INFO - ECS task ID is: e012340b5e1b43c6a757cf012c635935 [2022-01-01, 12:00:00 UTC] {{ecs.py:313}} INFO - Starting ECS Task Log Fetcher [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] total 52 [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx 1 root root 7 Jun 13 18:51 bin -> usr/bin [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 2 root root 4096 Apr 9 2019 boot [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 5 root root 340 Jul 19 17:54 dev [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 1 root root 4096 Jul 19 17:54 etc [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 2 root root 4096 Apr 9 2019 home [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx 1 root root 7 Jun 13 18:51 lib -> usr/lib [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx 1 root root 9 Jun 13 18:51 lib64 -> usr/lib64 [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 2 root root 4096 Jun 13 18:51 local [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 2 root root 4096 Apr 9 2019 media [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 2 root root 4096 Apr 9 2019 mnt [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 2 root root 4096 Apr 9 2019 opt [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 103 root root 0 Jul 19 17:54 proc [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-x-\-\- 2 root root 4096 Apr 9 2019 root [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 2 root root 4096 Jun 13 18:52 run [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx 1 root root 8 Jun 13 18:51 sbin -> usr/sbin [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 2 root root 4096 Apr 9 2019 srv [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 13 root root 0 Jul 19 17:54 sys [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxrwxrwt 2 root root 4096 Jun 13 18:51 tmp [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 13 root root 4096 Jun 13 18:51 usr [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 18 root root 4096 Jun 13 18:52 var . . . [2022-01-01, 12:00:00 UTC] {{ecs.py:328}} INFO - ECS Task has been successfully executed