View a markdown version of this page

为 Flink Operator 和 Flink 应用程序使用高可用性(HA) - Amazon EMR

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

为 Flink Operator 和 Flink 应用程序使用高可用性(HA)

本主题介绍了如何配置高可用性以及如何在几种不同的用例中使用。包括在使用 Job Manager 和 Flink Native Kubernetes 时。

我们为 Flink Operator 启用了高可用性,这样就可以使用备用 Flink Operator 进行故障转移,从而在发生故障时最大限度地减少 Operator 控制回路中的停机时间。默认会启用“高可用性”,启动 Operator 副本的默认数量为 2。您可以在 Helm 图表的 values.yaml 文件中配置副本字段。

以下字段支持自定义:

  • replicas(可选,默认值为 2):将此数字设置为大于 1 会创建其他备用 Operator,从而更快地恢复任务。

  • highAvailabilityEnabled(可选,默认值为 true):控制是否要启用 HA。将此参数指定为 true 可启用多可用区部署支持,并设置正确的 flink-conf.yaml 参数。

values.yaml 文件中设置以下配置可以为 Operator 禁用 HA。

... imagePullSecrets: [] replicas: 1 # set this to false if you don't want HA highAvailabilityEnabled: false ...

多可用区部署

我们在多个可用区中创 Operator Pod。这是一个软约束,如果不同可用区中没有足够的资源,您的 Operator Pod 将被调度到同一可用区中。

确定主副本

如果启用了 HA,各副本会使用租约来确定哪个 JM 是主副本,并使用 K8s Lease 来选举主副本。您可以描述租赁并查看. Spec.Holder 用于确定当前领导者的标识字段

kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"

Flink-S3 互动

配置访问凭证

请确保已为 IRSA 配置了相应的 IAM 权限来访问 S3 存储桶。

从 S3 应用程序模式获取任务 jar

Flink Operator 也支持从 S3 获取应用程序 jar。您只需在 FlinkDeployment 规范中提供 JarurI 的 S3 位置即可。

您也可以使用此功能下载其他工件,例如 PyFlink 脚本。生成的 Python 脚本放在路径 /opt/flink/usrlib/ 下。

以下示例演示了如何将此功能用于作 PyFlink 业。注意 jarURI 和 args 字段。

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: python-example spec: image: <YOUR CUSTOM PYFLINK IMAGE> emrReleaseLabel: "emr-6.12.0-flink-latest" flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" serviceAccount: flink jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"] parallelism: 1 upgradeMode: stateless

Flink S3 连接器

Flink 随附两个 S3 连接器(如下所示)。以下各节旨在介绍何时使用哪个连接器。

检查点:Presto S3 连接器

  • 将 S3 方案设置为 s3p://

  • 用于检查点到 s3 的推荐连接器。有关更多信息,请参阅 Apache Flink 文档S3-specific中的。

示例 FlinkDeployment 规范:

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/

读取/写入 S3:Hadoop S3 连接器

  • 将 S3 方案设置为 s3://s3a://

  • 用于从 S3 读取和写入文件的推荐连接器(仅限实现 Flinks Filesystem 接口的 S3 连接器)。

  • 默认在 flink-conf.yaml 文件中设置 fs.s3a.aws.credentials.provider,即 com.amazonaws.auth.WebIdentityTokenCredentialsProvider。如果完全覆盖默认值 flink-conf,并且正在与 S3 进行交互,请务必使用此提供程序。

示例 FlinkDeployment 规范

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: job: jarURI: local:///opt/flink/examples/streaming/WordCount.jar args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ] parallelism: 2 upgradeMode: stateless

Flink Deployments 的高可用性 (HA) 允许任务继续取得进展,即使遇到暂时性错误并 JobManager 导致崩溃。任务将重新启动,但会从上次成功启用 HA 的检查点开始。如果未启用 HA,Kubernetes 将重启你的 JobManager,但你的作业将以全新的作业开始并失去其进度。配置 HA 后,我们可以让 Kubernetes 将 HA 元数据存储在永久存储中,以便在中出现暂时故障时参考, JobManager 然后从上次成功的检查点恢复我们的作业。

Flink 任务会默认启用 HA(副本计数设置为 2,这需要您提供 S3 存储位置来永久存储 HA 元数据)。

HA 配置

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" executionRoleArn: "<JOB EXECUTION ROLE ARN>" emrReleaseLabel: "emr-6.13.0-flink-latest" jobManager: resource: memory: "2048m" cpu: 1 replicas: 2 highAvailabilityEnabled: true storageDir: "s3://<S3 PERSISTENT STORAGE DIR>" taskManager: resource: memory: "2048m" cpu: 1

以下是 Job Manager 中上述 HA 配置的描述(在 .spec.jobManager 下定义):

  • highAvailabilityEnabled(可选,默认值为 true):如果不想启用 HA,也不想使用提供的 HA 配置,请将其设置为 false 。您仍然可以操作“replicas”字段来手动配置 HA。

  • replicas(可选,默认值为 2):将此数字设置为大于 1 会创建其他待机状态 JobManagers ,从而可以更快地恢复作业。如果禁用 HA,则必须将副本计数设置为 1,否则会不断收到验证错误(如果未启用 HA,则仅支持 1 个副本)。

  • storageDir(必需):由于默认使用副本计数 2,我们必须提供永久 storageDir。目前,此字段仅接受 S3 路径作为存储位置。

Pod 区域

如果您启用 HA,我们还会尝试将 Job Manager 容器和任务管理器容器放在同一个可用区中,这样可以提高性能(通过将 Pod 置于相同可用区来减少网络延迟)。这是一个尽力而为的过程,这意味着如果你在调度了大部分 Job Manager 和 Task Manager Pod 的可用区中没有足够的资源,那么剩余的 pod 仍会被调度,但最终可能会出现在该可用区之外的节点上。

确定主副本

如果启用了 HA,各副本会使用租约来确定哪个 JM 是主副本,并使用 K8s Configmap 作为数据存储来存储此元数据。如果要确定主副本,可以查看 Configmap 的内容,在数据下查看密钥 org.apache.flink.k8s.leader.restserver,找到带 IP 地址的 K8s Pod。您也可以使用以下 bash 命令。

ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}') kubectl get pods -n NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"

Amazon EMR 6.13.0 及更高版本都支持 Flink 本机 Kubernetes 在 Amazon EKS 集群上以高可用性模式运行 Flink 应用程序。

注意

提交 Flink 作业时,必须使用一个 Amazon S3 存储桶来存储高可用性元数据。如果不想使用此功能,可以将其禁用。系统会默认启用该功能。

要开启 Flink 高可用性功能,请在运行 run-application CLI 命令时提供以下 Flink 参数。参数在示例的下方定义。

-Dhigh-availability.type=kubernetes \ -Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \ -Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \ -Dkubernetes.jobmanager.replicas=3 \ -Dkubernetes.cluster-id=example-cluster
  • Dhigh-availability.storageDir:您要存储作业的高可用性元数据的 Amazon S3 存储桶。

    Dkubernetes.jobmanager.replicas:要创建的 Job Manager 容器组(pod)的数量,以大于 1 的整数表示。

    Dkubernetes.cluster-id:标识 Flink 集群的唯一 ID。