本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
为 Flink Operator 和 Flink 应用程序使用高可用性(HA)
本主题介绍了如何配置高可用性以及如何在几种不同的用例中使用。包括在使用 Job Manager 和 Flink Native Kubernetes 时。
Flink Operator 高可用性
我们为 Flink Operator 启用了高可用性,这样就可以使用备用 Flink Operator 进行故障转移,从而在发生故障时最大限度地减少 Operator 控制回路中的停机时间。默认会启用“高可用性”,启动 Operator 副本的默认数量为 2。您可以在 Helm 图表的 values.yaml 文件中配置副本字段。
以下字段支持自定义:
-
replicas(可选,默认值为 2):将此数字设置为大于 1 会创建其他备用 Operator,从而更快地恢复任务。 -
highAvailabilityEnabled(可选,默认值为 true):控制是否要启用 HA。将此参数指定为 true 可启用多可用区部署支持,并设置正确的flink-conf.yaml参数。
在 values.yaml 文件中设置以下配置可以为 Operator 禁用 HA。
... imagePullSecrets: [] replicas: 1 # set this to false if you don't want HA highAvailabilityEnabled: false ...
多可用区部署
我们在多个可用区中创 Operator Pod。这是一个软约束,如果不同可用区中没有足够的资源,您的 Operator Pod 将被调度到同一可用区中。
确定主副本
如果启用了 HA,各副本会使用租约来确定哪个 JM 是主副本,并使用 K8s Lease 来选举主副本。您可以描述租赁并查看. Spec.Holder 用于确定当前领导者的标识字段
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
Flink-S3 互动
配置访问凭证
请确保已为 IRSA 配置了相应的 IAM 权限来访问 S3 存储桶。
从 S3 应用程序模式获取任务 jar
Flink Operator 也支持从 S3 获取应用程序 jar。您只需在 FlinkDeployment 规范中提供 JarurI 的 S3 位置即可。
您也可以使用此功能下载其他工件,例如 PyFlink 脚本。生成的 Python 脚本放在路径 /opt/flink/usrlib/ 下。
以下示例演示了如何将此功能用于作 PyFlink 业。注意 jarURI 和 args 字段。
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: python-example spec: image: <YOUR CUSTOM PYFLINK IMAGE> emrReleaseLabel: "emr-6.12.0-flink-latest" flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" serviceAccount: flink jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"] parallelism: 1 upgradeMode: stateless
Flink S3 连接器
Flink 随附两个 S3 连接器(如下所示)。以下各节旨在介绍何时使用哪个连接器。
检查点:Presto S3 连接器
-
将 S3 方案设置为 s3p://
-
用于检查点到 s3 的推荐连接器。有关更多信息,请参阅 Apache Flink 文档S3-specific
中的。
示例 FlinkDeployment 规范:
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
读取/写入 S3:Hadoop S3 连接器
-
将 S3 方案设置为
s3://或s3a:// -
用于从 S3 读取和写入文件的推荐连接器(仅限实现 Flinks Filesystem 接口
的 S3 连接器)。 -
默认在
flink-conf.yaml文件中设置fs.s3a.aws.credentials.provider,即com.amazonaws.auth.WebIdentityTokenCredentialsProvider。如果完全覆盖默认值flink-conf,并且正在与 S3 进行交互,请务必使用此提供程序。
示例 FlinkDeployment 规范
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: job: jarURI: local:///opt/flink/examples/streaming/WordCount.jar args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ] parallelism: 2 upgradeMode: stateless
Flink Job Manager
Flink Deployments 的高可用性 (HA) 允许任务继续取得进展,即使遇到暂时性错误并 JobManager 导致崩溃。任务将重新启动,但会从上次成功启用 HA 的检查点开始。如果未启用 HA,Kubernetes 将重启你的 JobManager,但你的作业将以全新的作业开始并失去其进度。配置 HA 后,我们可以让 Kubernetes 将 HA 元数据存储在永久存储中,以便在中出现暂时故障时参考, JobManager 然后从上次成功的检查点恢复我们的作业。
Flink 任务会默认启用 HA(副本计数设置为 2,这需要您提供 S3 存储位置来永久存储 HA 元数据)。
HA 配置
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" executionRoleArn: "<JOB EXECUTION ROLE ARN>" emrReleaseLabel: "emr-6.13.0-flink-latest" jobManager: resource: memory: "2048m" cpu: 1 replicas: 2 highAvailabilityEnabled: true storageDir: "s3://<S3 PERSISTENT STORAGE DIR>" taskManager: resource: memory: "2048m" cpu: 1
以下是 Job Manager 中上述 HA 配置的描述(在 .spec.jobManager 下定义):
-
highAvailabilityEnabled(可选,默认值为 true):如果不想启用 HA,也不想使用提供的 HA 配置,请将其设置为false。您仍然可以操作“replicas”字段来手动配置 HA。 -
replicas(可选,默认值为 2):将此数字设置为大于 1 会创建其他待机状态 JobManagers ,从而可以更快地恢复作业。如果禁用 HA,则必须将副本计数设置为 1,否则会不断收到验证错误(如果未启用 HA,则仅支持 1 个副本)。 -
storageDir(必需):由于默认使用副本计数 2,我们必须提供永久 storageDir。目前,此字段仅接受 S3 路径作为存储位置。
Pod 区域
如果您启用 HA,我们还会尝试将 Job Manager 容器和任务管理器容器放在同一个可用区中,这样可以提高性能(通过将 Pod 置于相同可用区来减少网络延迟)。这是一个尽力而为的过程,这意味着如果你在调度了大部分 Job Manager 和 Task Manager Pod 的可用区中没有足够的资源,那么剩余的 pod 仍会被调度,但最终可能会出现在该可用区之外的节点上。
确定主副本
如果启用了 HA,各副本会使用租约来确定哪个 JM 是主副本,并使用 K8s Configmap 作为数据存储来存储此元数据。如果要确定主副本,可以查看 Configmap 的内容,在数据下查看密钥 org.apache.flink.k8s.leader.restserver,找到带 IP 地址的 K8s Pod。您也可以使用以下 bash 命令。
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}') kubectl get pods -nNAMESPACE-o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
Flink 作业 - 本机 Kubernetes
Amazon EMR 6.13.0 及更高版本都支持 Flink 本机 Kubernetes 在 Amazon EKS 集群上以高可用性模式运行 Flink 应用程序。
注意
提交 Flink 作业时,必须使用一个 Amazon S3 存储桶来存储高可用性元数据。如果不想使用此功能,可以将其禁用。系统会默认启用该功能。
要开启 Flink 高可用性功能,请在运行 run-application CLI 命令时提供以下 Flink 参数。参数在示例的下方定义。
-Dhigh-availability.type=kubernetes \ -Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET\ -Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \ -Dkubernetes.jobmanager.replicas=3 \ -Dkubernetes.cluster-id=example-cluster
-
Dhigh-availability.storageDir:您要存储作业的高可用性元数据的 Amazon S3 存储桶。Dkubernetes.jobmanager.replicas:要创建的 Job Manager 容器组(pod)的数量,以大于1的整数表示。Dkubernetes.cluster-id:标识 Flink 集群的唯一 ID。