Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Esecuzione di un'applicazione Flink
Con Amazon EMR 6.13.0 e rilasci successivi, puoi eseguire un'applicazione Flink con l'operatore Flink Kubernetes in modalità Applicazione su Amazon EMR su EKS. Con Amazon EMR 6.15.0 e rilasci successivi, puoi anche eseguire un'applicazione Flink in modalità Sessione. Questa pagina descrive entrambi i metodi che puoi utilizzare per eseguire un'applicazione Flink con Amazon EMR su EKS.
Devi disporre di un bucket Amazon S3 creato per archiviare i metadati ad alta disponibilità del processo quando invii il processo Flink. Se non desideri utilizzare questa funzionalità, puoi disattivarla. È abilitata per impostazione predefinita.
Prerequisito: per poter eseguire un'applicazione Flink con l'operatore Flink Kubernetes, completa le fasi indicate in Configurazione dell'operatore Flink Kubernetes per Amazon EMR su EKS e Installa l'operatore Kubernetes.
- Application mode
-
Con Amazon EMR 6.13.0 e rilasci successivi, puoi eseguire un'applicazione Flink con l'operatore Flink Kubernetes in modalità Applicazione su Amazon EMR su EKS.
-
Crea un file di FlinkDeployment
definizione basic-example-app-cluster.yaml
come nell'esempio seguente. Se hai attivato e utilizzi uno degli opt-in Regioni AWS, assicurati di decommentare e configurare la configurazione. fs.s3a.endpoint.region
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-example-app-cluster
spec:
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
#fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH
flinkVersion: v1_17
executionRoleArn: JOB_EXECUTION_ROLE_ARN
emrReleaseLabel: "emr-6.13.0-flink-latest" # 6.13 or higher
jobManager:
storageDir: HIGH_AVAILABILITY_STORAGE_PATH
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
# if you have your job jar in S3 bucket you can use that path as well
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: savepoint
savepointTriggerNonce: 0
monitoringConfiguration:
cloudWatchMonitoringConfiguration:
logGroupName: LOG_GROUP_NAME
-
Invia l'implementazione Flink con il comando seguente. L'operazione creerà anche un oggetto FlinkDeployment
denominato basic-example-app-cluster
.
kubectl create -f basic-example-app-cluster.yaml -n <NAMESPACE>
-
Accedi all'interfaccia utente Flink.
kubectl port-forward deployments/basic-example-app-cluster 8081 -n NAMESPACE
-
Apri localhost:8081
per visualizzare localmente i tuoi processi Flink.
-
Ripulisci il processo. Ricordati di ripulire gli artefatti di S3 che sono stati creati per questo lavoro, come i metadati di checkpoint, l'alta disponibilità, i savepointing e i log. CloudWatch
Per ulteriori informazioni sull'invio di applicazioni a Flink tramite l'operatore Flink Kubernetes, consulta Esempi di operatori Flink Kubernetes nella cartella on. apache/flink-kubernetes-operator
GitHub
- Session mode
-
Con Amazon EMR 6.15.0 e rilasci successivi, puoi eseguire un'applicazione Flink con l'operatore Flink Kubernetes in modalità Sessione su Amazon EMR su EKS.
-
Create un file di definizione denominato come nell'esempio seguente. FlinkDeployment
basic-example-app-cluster.yaml
Se hai attivato e utilizzi uno degli opt-in Regioni AWS, assicurati di decommentare e configurare la configurazione. fs.s3a.endpoint.region
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-example-session-cluster
spec:
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
#fs.s3a.endpoint.region: OPT_IN_AWS_REGION_NAME
state.checkpoints.dir: CHECKPOINT_S3_STORAGE_PATH
state.savepoints.dir: SAVEPOINT_S3_STORAGE_PATH
flinkVersion: v1_17
executionRoleArn: JOB_EXECUTION_ROLE_ARN
emrReleaseLabel: "emr-6.15.0
-flink-latest"
jobManager:
storageDir: HIGH_AVAILABILITY_S3_STORAGE_PATH
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
monitoringConfiguration:
s3MonitoringConfiguration:
logUri:
cloudWatchMonitoringConfiguration:
logGroupName: LOG_GROUP_NAME
-
Invia l'implementazione Flink con il comando seguente. L'operazione creerà anche un oggetto FlinkDeployment
denominato basic-example-session-cluster
.
kubectl create -f basic-example-app-cluster.yaml -n NAMESPACE
Usa il seguente comando per confermare che il cluster di sessione LIFECYCLE
è STABLE
:
kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
L'output visualizzato dovrebbe essere come il seguente esempio:
NAME JOB STATUS LIFECYCLE STATE
basic-example-session-cluster STABLE
Crea un file di risorse di definizione personalizzato FlinkSessionJob
basic-session-job.yaml
con il seguente contenuto di esempio:
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: basic-session-job
spec:
deploymentName: basic-session-deployment
job:
# If you have your job jar in an S3 bucket you can use that path.
# To use jar in S3 bucket, set
# OPERATOR_EXECUTION_ROLE_ARN (--set emrContainers.operatorExecutionRoleArn=$OPERATOR_EXECUTION_ROLE_ARN
)
# when you install Spark operator
jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
parallelism: 2
upgradeMode: stateless
Invia il processo della sessione Flink con il comando seguente. L'operazione creerà un oggetto FlinkSessionJob
denominato basic-session-job
.
kubectl apply -f basic-session-job.yaml -n $NAMESPACE
Utilizza il comando seguente per confermare che il cluster di sessione LIFECYCLE
è STABLE
e JOB STATUS
è RUNNING
:
kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -n NAMESPACE
L'output visualizzato dovrebbe essere come il seguente esempio:
NAME JOB STATUS LIFECYCLE STATE
basic-example-session-cluster RUNNING STABLE
-
Accedi all'interfaccia utente Flink.
kubectl port-forward deployments/basic-example-session-cluster 8081 -n NAMESPACE
-
Apri localhost:8081
per visualizzare localmente i tuoi processi Flink.
-
Ripulisci il processo. Ricordati di ripulire gli artefatti di S3 che sono stati creati per questo lavoro, come i metadati di checkpoint, l'alta disponibilità, i savepointing e i log. CloudWatch