本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
搭配使用 Amazon MWAA 與 Amazon EKS
下列範例示範如何使用 Amazon Managed Workflows for Apache Airflow 搭配 Amazon EKS。
版本
先決條件
若要使用此主題中的範例,您需要下列項目:
當您使用 eksctl
命令時,您可以包含 --profile
來指定預設值以外的設定檔。
建立 Amazon EC2 的公有金鑰
使用下列命令,從您的私有金鑰對建立公有金鑰。
ssh-keygen -y -f myprivatekey.pem > mypublickey.pub
若要進一步了解,請參閱擷取金鑰對的公有金鑰。
建立叢集
使用下列命令來建立叢集。如果您想要叢集的自訂名稱,或在不同的區域中建立它,請取代名稱和區域值。您必須在建立 Amazon MWAA 環境的相同區域中建立叢集。取代子網路的值,以符合您用於 Amazon MWAA 的 Amazon VPC 網路中的子網路。取代 的值ssh-public-key
,以符合您使用的金鑰。您可以使用位於相同區域中的 Amazon EC2 現有金鑰,或在建立 Amazon MWAA 環境的相同區域中建立新的金鑰。
eksctl create cluster \
--name mwaa-eks \
--region us-west-2 \
--version 1.18 \
--nodegroup-name linux-nodes \
--nodes 3 \
--nodes-min 1 \
--nodes-max 4 \
--with-oidc \
--ssh-access \
--ssh-public-key MyPublicKey
\
--managed \
--vpc-public-subnets "subnet-11111111111111111
, subnet-2222222222222222222
" \
--vpc-private-subnets "subnet-33333333333333333
, subnet-44444444444444444
"
完成建立叢集需要一些時間。完成後,您可以使用下列命令,確認叢集已成功建立,並已設定 IAM OIDC 提供者:
eksctl utils associate-iam-oidc-provider \
--region us-west-2 \
--cluster mwaa-eks \
--approve
建立mwaa
命名空間
確認叢集已成功建立後,請使用下列命令來建立 Pod 的命名空間。
kubectl create namespace mwaa
建立 mwaa
命名空間的角色
建立命名空間後,請在 EKS 上為可在 MWAA 命名空間中執行 Pod 的 Amazon MWAA 使用者建立角色和角色繫結關係。如果您使用不同的命名空間名稱,請將 中的 mwaa 取代為您使用-n mwaa
的名稱。
cat << EOF | kubectl apply -f - -n mwaa
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: mwaa-role
rules:
- apiGroups:
- ""
- "apps"
- "batch"
- "extensions"
resources:
- "jobs"
- "pods"
- "pods/attach"
- "pods/exec"
- "pods/log"
- "pods/portforward"
- "secrets"
- "services"
verbs:
- "create"
- "delete"
- "describe"
- "get"
- "list"
- "patch"
- "update"
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: mwaa-role-binding
subjects:
- kind: User
name: mwaa-service
roleRef:
kind: Role
name: mwaa-role
apiGroup: rbac.authorization.k8s.io
EOF
執行下列命令,確認新角色可以存取 Amazon EKS 叢集。如果您未使用 mwaa
,請務必使用正確的名稱:
kubectl get pods -n mwaa
--as mwaa-service
您應該會看到傳回的訊息,指出:
No resources found in mwaa namespace.
建立並連接 Amazon EKS 叢集的 IAM 角色
您必須建立 IAM 角色,然後將其繫結至 Amazon EKS (k8s) 叢集,以便用於透過 IAM 進行身分驗證。此角色僅用於登入叢集,且沒有任何主控台或 API 呼叫的許可。
使用 中的步驟為 Amazon MWAA 環境建立新的角色Amazon MWAA 執行角色。不過,與其建立和連接該主題中所述的政策,請連接下列政策:
- JSON
-
-
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "airflow:PublishMetrics",
"Resource": "arn:aws:airflow:us-east-1
:111122223333
:environment/${MWAA_ENV_NAME}"
},
{
"Effect": "Deny",
"Action": "s3:ListAllMyBuckets",
"Resource": [
"arn:aws:s3:::{MWAA_S3_BUCKET}",
"arn:aws:s3:::{MWAA_S3_BUCKET}/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject*",
"s3:GetBucket*",
"s3:List*"
],
"Resource": [
"arn:aws:s3:::{MWAA_S3_BUCKET}",
"arn:aws:s3:::{MWAA_S3_BUCKET}/*"
]
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:CreateLogGroup",
"logs:PutLogEvents",
"logs:GetLogEvents",
"logs:GetLogRecord",
"logs:GetLogGroupFields",
"logs:GetQueryResults",
"logs:DescribeLogGroups"
],
"Resource": [
"arn:aws:logs:us-east-1
:111122223333
:log-group:airflow-${MWAA_ENV_NAME}-*"
]
},
{
"Effect": "Allow",
"Action": "cloudwatch:PutMetricData",
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"sqs:ChangeMessageVisibility",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:GetQueueUrl",
"sqs:ReceiveMessage",
"sqs:SendMessage"
],
"Resource": "arn:aws:sqs:us-east-1
:*:airflow-celery-*"
},
{
"Effect": "Allow",
"Action": [
"kms:Decrypt",
"kms:DescribeKey",
"kms:GenerateDataKey*",
"kms:Encrypt"
],
"NotResource": "arn:aws:kms:*:111122223333
:key/*",
"Condition": {
"StringLike": {
"kms:ViaService": [
"sqs.us-east-1
.amazonaws.com"
]
}
}
},
{
"Effect": "Allow",
"Action": [
"eks:DescribeCluster"
],
"Resource": "arn:aws:eks:us-east-1
:111122223333
:cluster/${EKS_CLUSTER_NAME}"
}
]
}
建立角色之後,請編輯 Amazon MWAA 環境,以使用您建立做為環境執行角色的角色。若要變更角色,請編輯要使用的環境。您可以在許可下選取執行角色。
已知問題:
-
子路徑無法向 Amazon EKS 驗證的角色 ARNs 存在已知問題。解決方法是手動建立服務角色,而不是使用 Amazon MWAA 本身建立的服務角色。若要進一步了解,請參閱 aws-auth 組態表中包含路徑的 ARN 時,具有路徑的角色無法運作
-
如果 IAM 中沒有 Amazon MWAA 服務清單,您需要選擇替代服務政策,例如 Amazon EC2,然後更新角色的信任政策以符合下列項目:
- JSON
-
-
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": [
"airflow-env.amazonaws.com",
"airflow.amazonaws.com"
]
},
"Action": "sts:AssumeRole"
}
]
}
若要進一步了解,請參閱如何搭配 IAM 角色使用信任政策。
建立 requirements.txt 檔案
若要使用本節中的範例程式碼,請確定您已將下列其中一個資料庫選項新增至 requirements.txt
。若要進一步了解,請參閱 安裝 Python 相依性。
- Apache Airflow v2
-
kubernetes
apache-airflow[cncf.kubernetes]==3.0.0
- Apache Airflow v1
-
awscli
kubernetes==12.0.1
建立 Amazon EKS 的身分映射
針對您在下列命令中建立的角色使用 ARN,為 Amazon EKS 建立身分映射。將 Region us-east-1
變更為您建立環境的區域。取代角色的 ARN,最後以您環境的執行角色取代 mwaa-execution-role
。
eksctl create iamidentitymapping \
--region us-east-1
\
--cluster mwaa-eks \
--arn arn:aws:iam::123456789012
:role/mwaa-execution-role
\
--username mwaa-service
建立 kubeconfig
使用下列命令來建立 kubeconfig
:
aws eks update-kubeconfig \
--region us-west-2 \
--kubeconfig ./kube_config.yaml \
--name mwaa-eks \
--alias aws
如果您在執行時使用特定設定檔update-kubeconfig
,則需要移除新增至 kube_config.yaml 檔案的 env:
區段,以便其與 Amazon MWAA 搭配使用。若要這麼做,請從 檔案刪除下列項目,然後儲存它:
env:
- name: AWS_PROFILE
value: profile_name
建立 DAG
使用下列程式碼範例來建立 Python 檔案,例如 mwaa_pod_example.py
DAG 的 。
- Apache Airflow v2
-
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow import DAG
from datetime import datetime
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
default_args = {
'owner': 'aws',
'depends_on_past': False,
'start_date': datetime(2019, 2, 20),
'provide_context': True
}
dag = DAG(
'kubernetes_pod_example', default_args=default_args, schedule_interval=None)
#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'
podRun = KubernetesPodOperator(
namespace="mwaa",
image="ubuntu:18.04",
cmds=["bash"],
arguments=["-c", "ls"],
labels={"foo": "bar"},
name="mwaa-pod-test",
task_id="pod-task",
get_logs=True,
dag=dag,
is_delete_operator_pod=False,
config_file=kube_config_path,
in_cluster=False,
cluster_context='aws'
)
- Apache Airflow v1
-
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
from airflow import DAG
from datetime import datetime
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
default_args = {
'owner': 'aws',
'depends_on_past': False,
'start_date': datetime(2019, 2, 20),
'provide_context': True
}
dag = DAG(
'kubernetes_pod_example', default_args=default_args, schedule_interval=None)
#use a kube_config stored in s3 dags folder for now
kube_config_path = '/usr/local/airflow/dags/kube_config.yaml'
podRun = KubernetesPodOperator(
namespace="mwaa",
image="ubuntu:18.04",
cmds=["bash"],
arguments=["-c", "ls"],
labels={"foo": "bar"},
name="mwaa-pod-test",
task_id="pod-task",
get_logs=True,
dag=dag,
is_delete_operator_pod=False,
config_file=kube_config_path,
in_cluster=False,
cluster_context='aws'
)
將 DAG 和 kube_config.yaml
新增至 Amazon S3 儲存貯體
將您建立的 DAG 和 kube_config.yaml
檔案放入 Amazon MWAA 環境的 Amazon S3 儲存貯體。您可以使用 Amazon S3 主控台或 將檔案放入儲存貯體 AWS Command Line Interface。
啟用並觸發範例
在 Apache Airflow 中,啟用範例,然後觸發該範例。
成功執行並完成之後,請使用下列命令來驗證 Pod:
kubectl get pods -n mwaa
您應該會看到類似下列的輸出:
NAME READY STATUS RESTARTS AGE
mwaa-pod-test-aa11bb22cc3344445555666677778888 0/1 Completed 0 2m23s
然後,您可以使用下列命令來驗證 Pod 的輸出。將名稱值取代為上一個命令傳回的值:
kubectl logs -n mwaa mwaa-pod-test-aa11bb22cc3344445555666677778888