本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
執行 Flink 應用程式
使用 Amazon EMR 6.13.0 及更高版本時,您可以在 Amazon EMR on EKS 上使用 Flink Kubernetes Operator 在應用程式模式下執行 Flink 應用程式。使用 Amazon EMR 6.15.0 及更高版本時,您也可以在工作階段模式下執行 Flink 應用程式。本頁面介紹了可用於透過 Amazon EMR on EKS 執行 Flink 應用程式的兩種方法。
主題
注意
提交 Flink 作業時,必須建立 Amazon S3 儲存貯體來儲存高可用性中繼資料。如果不想使用此功能,可以停用它。依預設會啟用此功能。
必要條件:在使用 Flink Kubernetes Operator 執行 Flink 應用程式之前,請完成 針對 Amazon EMR on EKS 設定 Flink Kubernetes Operator 和 安裝 Kubernetes 運算子 中的步驟。
- Application mode
-
使用 Amazon EMR 6.13.0 及更高版本時,您可以在 Amazon EMR on EKS 上使用 Flink Kubernetes Operator 在應用程式模式下執行 Flink 應用程式。
-
建立
FlinkDeployment定義檔案,basic-example-app-cluster.yaml如下列範例所示。如果您啟用並使用其中一個選擇加入 AWS 區域,請務必取消註解並設定組態fs.s3a.endpoint.region。apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example-app-cluster spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" #fs.s3a.endpoint.region:OPT_IN_AWS_REGION_NAMEstate.checkpoints.dir:CHECKPOINT_S3_STORAGE_PATHstate.savepoints.dir:SAVEPOINT_S3_STORAGE_PATHflinkVersion: v1_17 executionRoleArn:JOB_EXECUTION_ROLE_ARNemrReleaseLabel: "emr-6.13.0-flink-latest" # 6.13 or higher jobManager: storageDir:HIGH_AVAILABILITY_STORAGE_PATHresource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: # if you have your job jar in S3 bucket you can use that path as well jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2 upgradeMode: savepoint savepointTriggerNonce: 0 monitoringConfiguration: cloudWatchMonitoringConfiguration: logGroupName:LOG_GROUP_NAME -
使用下列命令提交 Flink 部署。這也將建立名為
basic-example-app-cluster的FlinkDeployment物件。kubectl create -f basic-example-app-cluster.yaml -n <NAMESPACE> -
存取 Flink UI。
kubectl port-forward deployments/basic-example-app-cluster 8081 -nNAMESPACE -
開啟
localhost:8081以在本機檢視 Flink 作業。 -
清除作業。請記得清除為此作業建立的 S3 成品,例如檢查點、高可用性、儲存點中繼資料和 CloudWatch 日誌。
如需有關透過 Flink Kubernetes Operator 提交應用程式至 Flink 的詳細資訊,請參閱 GitHub 上
apache/flink-kubernetes-operator資料夾中的 Flink Kubernetes Operator 範例。 -
- Session mode
-
使用 Amazon EMR 6.15.0 及更高版本時,您可以在 Amazon EMR on EKS 上使用 Flink Kubernetes Operator 在工作階段模式下執行 Flink 應用程式。
-
在下列範例中建立名為
basic-example-app-cluster.yamlFlinkDeployment的定義檔案。如果您啟用並使用其中一個選擇加入 AWS 區域,請務必取消註解並設定組態fs.s3a.endpoint.region。apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example-session-cluster spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" #fs.s3a.endpoint.region:OPT_IN_AWS_REGION_NAMEstate.checkpoints.dir:CHECKPOINT_S3_STORAGE_PATHstate.savepoints.dir:SAVEPOINT_S3_STORAGE_PATHflinkVersion: v1_17 executionRoleArn:JOB_EXECUTION_ROLE_ARNemrReleaseLabel: "emr-6.15.0-flink-latest" jobManager: storageDir:HIGH_AVAILABILITY_S3_STORAGE_PATHresource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 monitoringConfiguration: s3MonitoringConfiguration: logUri: cloudWatchMonitoringConfiguration: logGroupName:LOG_GROUP_NAME -
使用下列命令提交 Flink 部署。這也將建立名為
basic-example-session-cluster的FlinkDeployment物件。kubectl create -f basic-example-app-cluster.yaml -nNAMESPACE 使用下列命令確認工作階段叢集
LIFECYCLE為STABLE:kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -nNAMESPACE輸出應類似以下範例:
NAME JOB STATUS LIFECYCLE STATE basic-example-session-cluster STABLE
使用以下範例內容,建立
FlinkSessionJob自訂定義資源檔案basic-session-job.yaml:apiVersion: flink.apache.org/v1beta1 kind: FlinkSessionJob metadata: name: basic-session-job spec: deploymentName: basic-session-deployment job: # If you have your job jar in an S3 bucket you can use that path. # To use jar in S3 bucket, set # OPERATOR_EXECUTION_ROLE_ARN (--set emrContainers.operatorExecutionRoleArn=$OPERATOR_EXECUTION_ROLE_ARN) # when you install Spark operator jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.1/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar parallelism: 2 upgradeMode: stateless使用下列命令提交 Flink 工作階段作業。這將建立
FlinkSessionJob物件basic-session-job。kubectl apply -f basic-session-job.yaml -n $NAMESPACE使用下列命令確認工作階段叢集
LIFECYCLE為STABLE,且JOB STATUS為RUNNING:kubectl get flinkdeployments.flink.apache.org basic-example-session-cluster -nNAMESPACE輸出應類似以下範例:
NAME JOB STATUS LIFECYCLE STATE basic-example-session-cluster RUNNING STABLE
-
存取 Flink UI。
kubectl port-forward deployments/basic-example-session-cluster 8081 -nNAMESPACE -
開啟
localhost:8081以在本機檢視 Flink 作業。 -
清除作業。請記得清除為此作業建立的 S3 成品,例如檢查點、高可用性、儲存點中繼資料和 CloudWatch 日誌。
-