Run a Flink application
With Amazon EMR 6.13.0 and higher, you can run a Flink application with the Flink Kubernetes operator in Application mode on Amazon EMR on EKS. With Amazon EMR 6.15.0 and higher, you can also run a Flink application in Session mode. This page describes both methods that you can use to run a Flink application with Amazon EMR on EKS.
Topics
Note
You must have an Amazon S3 bucket created to store the high-availability metadata when you submit your Flink job. If you don’t want to use this feature, you can disable it. It's enabled by default.
Prerequisite – Before you can run a Flink application with the Flink Kubernetes operator, complete the steps in Setting up the Flink Kubernetes operator for Amazon EMR on EKS and Install the Kubernetes operator.
- Application mode
-
With Amazon EMR 6.13.0 and higher, you can run a Flink application with the Flink Kubernetes operator in Application mode on Amazon EMR on EKS.
-
Create a
FlinkDeploymentdefinition filebasic-example-app-cluster.yamllike in the following example. If you activated and use one of the opt-in AWS Regions, make sure you uncomment and configure the configurationfs.s3a.endpoint.region.apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example-app-cluster spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" #fs.s3a.endpoint.region:OPT_IN_AWS_REGION_NAMEstate.checkpoints.dir:CHECKPOINT_S3_STORAGE_PATHstate.savepoints.dir:SAVEPOINT_S3_STORAGE_PATHflinkVersion: v1_17 executionRoleArn:JOB_EXECUTION_ROLE_ARNemrReleaseLabel: "emr-6.13.0-flink-latest" # 6.13 or higher jobManager: storageDir:HIGH_AVAILABILITY_STORAGE_PATHresource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: # if you have your job jar in S3 bucket you can use that path as well jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2 upgradeMode: savepoint savepointTriggerNonce: 0 monitoringConfiguration: cloudWatchMonitoringConfiguration: logGroupName:LOG_GROUP_NAME -
Submit the Flink deployment with the following command. This will also create a
FlinkDeploymentobject namedbasic-example-app-cluster.kubectl create -f basic-example-app-cluster.yaml -n <NAMESPACE> -
Access the Flink UI.
kubectl port-forward deployments/basic-example-app-cluster 8081 -nNAMESPACE -
Open
localhost:8081to view your Flink jobs locally. -
Clean up the job. Remember to clean up the S3 artifacts that were created for this job, such as checkpointing, high-availability, savepointing metadata, and CloudWatch logs.
For more information on submitting applications to Flink through the Flink Kubernetes operator, see Flink Kubernetes operator examples
in the apache/flink-kubernetes-operatorfolder on GitHub. -
- Session mode
-
With Amazon EMR 6.15.0 and higher, you can run a Flink application with the Flink Kubernetes operator in Session mode on Amazon EMR on EKS.
-
Create a
FlinkDeploymentdefinition file namedbasic-example-app-cluster.yamllike in the following example. If you activated and use one of the opt-in AWS Regions, make sure you uncomment and configure the configurationfs.s3a.endpoint.region.apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example-session-cluster spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" #fs.s3a.endpoint.region:OPT_IN_AWS_REGION_NAMEstate.checkpoints.dir:CHECKPOINT_S3_STORAGE_PATHstate.savepoints.dir:SAVEPOINT_S3_STORAGE_PATHflinkVersion: v1_17 executionRoleArn:JOB_EXECUTION_ROLE_ARNemrReleaseLabel: "emr-6.15.0-flink-latest" jobManager: storageDir:HIGH_AVAILABILITY_S3_STORAGE_PATHresource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 monitoringConfiguration: s3MonitoringConfiguration: logUri: cloudWatchMonitoringConfiguration: logGroupName:LOG_GROUP_NAME -
Submit the Flink deployment with the following command. This will also create a
FlinkDeploymentobject namedbasic-example-session-cluster.kubectl create -f basic-example-app-cluster.yaml -nNAMESPACE Use the following command to confirm that the session cluster
LIFECYCLEisSTABLE:kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -nNAMESPACEThe output should be similar to the following example:
NAME JOB STATUS LIFECYCLE STATE basic-example-session-cluster STABLE
Create a
FlinkSessionJobcustom definition resource filebasic-session-job.yamlwith the following example content:apiVersion: flink.apache.org/v1beta1 kind: FlinkSessionJob metadata: name: basic-session-job spec: deploymentName: basic-session-deployment job: # If you have your job jar in an S3 bucket you can use that path. # To use jar in S3 bucket, set # OPERATOR_EXECUTION_ROLE_ARN (--set emrContainers.operatorExecutionRoleArn=$OPERATOR_EXECUTION_ROLE_ARN) # when you install Spark operator jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar parallelism: 2 upgradeMode: statelessSubmit the Flink session job with the following command. This will create a
FlinkSessionJobobjectbasic-session-job.kubectl apply -f basic-session-job.yaml -n $NAMESPACEUse the following command to confirm that the session cluster
LIFECYCLEisSTABLE, and theJOB STATUSisRUNNING:kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -nNAMESPACEThe output should be similar to the following example:
NAME JOB STATUS LIFECYCLE STATE basic-example-session-cluster RUNNING STABLE
-
Access the Flink UI.
kubectl port-forward deployments/basic-example-session-cluster 8081 -nNAMESPACE -
Open
localhost:8081to view your Flink jobs locally. -
Clean up the job. Remember to clean up the S3 artifacts that were created for this job, such as checkpointing, high-availability, savepointing metadata, and CloudWatch logs.
-