本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 Flink Operator 和 Flink 應用程式的高可用性 (HA)
本主題說明如何設定高可用性,並說明它在幾個不同的使用案例中的運作方式。這包括當您使用 任務管理員時,以及當您使用 Flink 原生 kubernetes 時。
Flink Operator 高可用性
我們啟用 Flink Operator 的高可用性,以便可以容錯移轉至待命 Flink Operator,在發生故障時將 Operator 控制迴圈中的停機時間降至最低。依預設會啟用「高可用性」,且起始 Operator 複本的預設數目為 2。可以在 values.yaml 檔案中設定 Helm Chart 的複本欄位。
下列欄位可自訂:
-
replicas(選用,預設值為 2):將此數字設定為大於 1 可建立其他待命 Operator,並允許更快速地復原作業。 -
highAvailabilityEnabled(選用,預設值為 true):控制是否要啟用 HA。將此參數指定為 true 可啟用多可用區部署支援,並設定正確的flink-conf.yaml參數。
透過在 values.yaml 檔案中設定下列組態,停用 operator 的高可用性。
... imagePullSecrets: [] replicas: 1 # set this to false if you don't want HA highAvailabilityEnabled: false ...
多可用區部署
我們會在多個可用區域建立 operator Pod。這是一個軟約束,如果您在不同的可用區域中沒有足夠的資源,將在相同的可用區域中排程您的 operator Pod 。
確定領導者複本
如果啟用 HA,則複本使用 Lease 來確定哪些 JM 是領導者,並使用 K8s Lease 進行領導者選舉。您可以描述 Lease 並查看 .Spec.Holder Identity 欄位,以確定目前的領導者
kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"
Flink-S3 互動
設定存取憑證
請確定您已設定 IRSA,具有可存取 S3 儲存貯體的適當 IAM 許可。
從 S3 應用程式模式中擷取作業 jar
Flink Operator 也支援從 S3 中擷取應用程式 jar。只需在 FlinkDeployment 規格中提供 jarURI 的 S3 位置即可。
也可以使用此功能來下載其他成品,例如 PyFLink 指令碼。生成的 Python 指令碼放在路徑 /opt/flink/usrlib/ 下。
以下範例示範如何將此功能用於 PyFLink 作業。請注意 jarURI 和引數欄位。
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: python-example spec: image: <YOUR CUSTOM PYFLINK IMAGE> emrReleaseLabel: "emr-6.12.0-flink-latest" flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" serviceAccount: flink jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"] parallelism: 1 upgradeMode: stateless
Flink S3 連接器
Flink 隨附有兩個 S3 連接器 (如下所列)。以下各章節討論何時使用哪個連接器。
檢查點:Presto S3 連接器
-
將 S3 結構描述設為 s3p://
-
用於檢查點到 s3 的建議連接器。如需詳細資訊,請參閱 Apache Flink 文件中的 S3-specific
。
FlinkDeployment 規格範例:
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
讀取和寫入 S3:Hadoop S3 連接器
-
將 S3 結構描述設定為
s3://或 (s3a://) -
用於從 S3 讀取和寫入檔案的建議連接器 (只有 S3 連接器實作 Flinks Filesystem 介面
)。 -
根據預設,我們在
flink-conf.yaml檔案fs.s3a.aws.credentials.provider中設定 ,也就是com.amazonaws.auth.WebIdentityTokenCredentialsProvider。如果完全覆寫預設flink-conf並且正在與 S3 進行互動,請確保使用此提供程式。
FlinkDeployment 規格範例
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: job: jarURI: local:///opt/flink/examples/streaming/WordCount.jar args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ] parallelism: 2 upgradeMode: stateless
Flink Job Manager
Flink 部署的高可用性 (HA) 可讓作業繼續進行,即使遇到暫時性錯誤以及 JobManager 當機。作業將會重新啟動,但會從上次啟用 HA 的成功檢查點開始。如果沒有啟用 HA,Kubernetes 將重新啟動 JobManager,但您的作業將作為新作業開始,並將失去其進度。設定 HA 之後,我們可以告訴 Kubernetes 將 HA 中繼資料儲存在永久性儲存體中,以便在 JobManager 中發生暫時性失敗時進行參考,然後從上次成功的檢查點恢復我們的作業。
Flink 作業預設為啟用 HA (複本計數設為 2,這將要求您提供 S3 儲存位置,以便 HA 中繼資料持續存在)。
HA 組態
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" executionRoleArn: "<JOB EXECUTION ROLE ARN>" emrReleaseLabel: "emr-6.13.0-flink-latest" jobManager: resource: memory: "2048m" cpu: 1 replicas: 2 highAvailabilityEnabled: true storageDir: "s3://<S3 PERSISTENT STORAGE DIR>" taskManager: resource: memory: "2048m" cpu: 1
以下是 Job Manager 中上述 HA 組態的描述 (在 .spec.jobManager 下定義):
-
highAvailabilityEnabled(選用,預設值為 true):如果您不想啟用 HA 且不想使用提供的 HA 組態,請將此設定為false。您仍然可以操作「複本」欄位以手動設定 HA。 -
replicas(選用,預設值為 2):將此數字設定為大於 1 可建立其他待命 JobManager,並允許更快速地復原作業。如果停用 HA,則必須將複本計數設定為 1,否則您將繼續收到驗證錯誤 (如果未啟用 HA,則僅支援 1 個複本)。 -
storageDir(必填):因為我們預設使用的複本計數為 2,所以我們必須提供一個持久的 StorageDir。目前,此欄位僅接受 S3 路徑作為儲存位置。
Pod 位置
如果您啟用 HA,我們也會嘗試在相同的可用區域中串連 Job Manager Pod 和 Task Manager Pod,進而改善效能 (透過在相同的AZs區域中擁有 Pod 來降低網路延遲)。這是一個盡最大努力的程序,這表示如果您在排定大部分 Job Manager 和 Task Manager Pod 的 AZ 中沒有足夠的資源,則剩餘的 Pod 仍會排定,但最終可能會在此 AZ 之外的節點上。
確定領導者複本
如果啟用 HA,複本會使用租用來判斷哪個 JM 是領導者,並使用 K8s Configmap 作為儲存此中繼資料的資料儲存。如果您想確定領導者,可以查看 Configmap 的內容,然後查看資料下的關鍵字 org.apache.flink.k8s.leader.restserver,以使用該 IP 地址尋找 K8s Pod。也可使用下列 bash 命令。
ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}') kubectl get pods -nNAMESPACE-o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
Flink 作業:原生 Kubernetes
Amazon EMR 6.13.0 及更高版本支援 Flink Native Kubernetes,以便在 Amazon EKS 叢集上以高可用性模式執行 Flink 應用程式。
注意
提交 Flink 作業時,必須建立 Amazon S3 儲存貯體來儲存高可用性中繼資料。如果不想使用此功能,可以停用它。依預設會啟用此功能。
若要開啟 Flink 高可用性功能,請在執行 run-application CLI 命令時提供下列 Flink 參數。參數定義於範例下方。
-Dhigh-availability.type=kubernetes \ -Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET\ -Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \ -Dkubernetes.jobmanager.replicas=3 \ -Dkubernetes.cluster-id=example-cluster
-
Dhigh-availability.storageDir:您要在其中存放作業的高可用性中繼資料的 Amazon S3 儲存貯體。Dkubernetes.jobmanager.replicas:要建立的 Job Manager Pod 數量,為大於1的整數。Dkubernetes.cluster-id:識別 Flink 叢集的唯一識別碼。