Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Creazione di una connessione SSH utilizzando SSHOperator
L'esempio seguente descrive come utilizzare il SSHOperator in un grafo aciclico diretto (DAG) per connetterti a un' EC2 istanza Amazon remota dal tuo ambiente Amazon Managed Workflows for Apache Airflow. Puoi utilizzare un approccio simile per connetterti a qualsiasi istanza remota con accesso SSH.
Nell'esempio seguente, carichi una chiave segreta SSH (.pem) dags nella directory del tuo ambiente su Amazon S3. Quindi, installi le dipendenze necessarie utilizzando requirements.txt e creando una nuova connessione Apache Airflow nell'interfaccia utente. Infine, scrivi un DAG che crea una connessione SSH all'istanza remota.
Argomenti
Versione
È possibile utilizzare l'esempio di codice in questa pagina con Apache Airflow v2 in Python 3.10 e Apache Airflow v3in Python 3.11
Prerequisiti
Per utilizzare il codice di esempio in questa pagina, avrai bisogno di quanto segue:
-
Un ambiente Amazon MWAA.
-
Una chiave segreta SSH. L'esempio di codice presuppone che tu abbia un' EC2 istanza Amazon e una
.pemnella stessa regione del tuo ambiente Amazon MWAA. Se non disponi di una chiave, consulta la sezione Creare o importare una coppia di chiavi nella Amazon EC2 User Guide.
Autorizzazioni
Non sono necessarie autorizzazioni aggiuntive per utilizzare l'esempio di codice in questa pagina.
Requisiti
Aggiungi il seguente parametro per requirements.txt installare il apache-airflow-providers-ssh pacchetto sul server web. Una volta aggiornato l'ambiente e dopo che Amazon MWAA avrà installato correttamente la dipendenza, otterrai un nuovo tipo di connessione SSH nell'interfaccia utente.
-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt apache-airflow-providers-ssh
Nota
-cdefinisce l'URL dei vincoli in. requirements.txt Ciò garantisce che Amazon MWAA installi la versione del pacchetto corretta per il tuo ambiente.
Copia la tua chiave segreta su Amazon S3
Usa il seguente AWS Command Line Interface comando per copiare la .pem chiave dags nella directory del tuo ambiente in Amazon S3.
aws s3 cpyour-secret-key.pem s3://amzn-s3-demo-bucket/dags/
Amazon MWAA copia il contenutodags, inclusa la .pem chiave, nella /usr/local/airflow/dags/ directory locale. In questo modo, Apache Airflow può accedere alla chiave.
Creare una nuova connessione Apache Airflow
Per creare una nuova connessione SSH utilizzando l'interfaccia utente di Apache Airflow
-
Apri la pagina Ambienti
sulla console Amazon MWAA. -
Dall'elenco degli ambienti, scegli Open Airflow UI per il tuo ambiente.
-
Nella pagina dell'interfaccia utente di Apache Airflow, scegli Amministratore dalla barra di navigazione principale per espandere l'elenco a discesa, quindi scegli Connessioni.
-
Nella pagina Elenco connessioni, scegli + o il pulsante Aggiungi un nuovo record per aggiungere una nuova connessione.
-
Nella pagina Aggiungi connessione, aggiungi le seguenti informazioni:
-
Per ID di connessione, immettere
ssh_new. -
Per Tipo di connessione, scegli SSH dall'elenco a discesa.
Nota
Se il tipo di connessione SSH non è disponibile nell'elenco, Amazon MWAA non ha installato il pacchetto richiesto.
apache-airflow-providers-sshAggiorna ilrequirements.txtfile per includere questo pacchetto, quindi riprova. -
Per Host, inserisci l'indirizzo IP dell' EC2 istanza Amazon a cui desideri connetterti. Ad esempio,
12.345.67.89. -
Per Nome utente, inserisci
ec2-userse ti stai connettendo a un' EC2 istanza Amazon. Il tuo nome utente potrebbe essere diverso, a seconda del tipo di istanza remota a cui desideri connettere Apache Airflow. -
Per Extra, inserisci la seguente coppia chiave-valore in formato JSON:
{ "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }Questa coppia chiave-valore indica ad Apache Airflow di cercare la chiave segreta nella directory locale.
/dags
-
Esempio di codice
Il seguente DAG utilizza il SSHOperator per connettersi all' EC2istanza Amazon di destinazione, quindi esegue il comando hostname Linux per stampare il nome dell'istanza. È possibile modificare il DAG per eseguire qualsiasi comando o script sull'istanza remota.
-
Apri un terminale e vai alla directory in cui è memorizzato il codice DAG. Ad esempio:
cd dags -
Copiate il contenuto del seguente esempio di codice e salvatelo localmente con nome.
ssh.pyfrom airflow.decorators import dag from datetime import datetime from airflow.providers.ssh.operators.ssh import SSHOperator @dag( dag_id="ssh_operator_example", schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False, ) def ssh_dag(): task_1=SSHOperator( task_id="ssh_task", ssh_conn_id='ssh_new', command='hostname', ) my_ssh_dag = ssh_dag() -
Esegui il AWS CLI comando seguente per copiare il DAG nel bucket del tuo ambiente, quindi attiva il DAG utilizzando l'interfaccia utente di Apache Airflow.
aws s3 cpyour-dag.py s3://your-environment-bucket/dags/ -
In caso di successo, otterrete un output simile al seguente nei log delle attività del DAG:
ssh_taskssh_operator_example[2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None, Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/
your-secret-key.pem'} [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4) [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful! [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916