

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 在 Amazon EMR 中設定 Flink
<a name="flink-configure"></a>

## 將 Flink 與 Hive Metastore 和 Glue Catalog 搭配使用
<a name="flink-configure-hive"></a>

Amazon EMR 6.9.0 版及更高版本支援 Hive 中繼存放區和 AWS Glue Catalog 搭配 Hive 的 Apache Flink 連接器。本章節概述了使用 Flink 設定 [AWS Glue Catalog](#flink-configure-hive-glue) 和 [Hive 中繼存放區](#flink-configure-hive-metastore)所需的步驟。

**Topics**
+ [使用 Hive 中繼存放區](#flink-configure-hive-metastore)
+ [使用 AWS Glue Data Catalog](#flink-configure-hive-glue)

### 使用 Hive 中繼存放區
<a name="flink-configure-hive-metastore"></a>

1. **建立具有 6.9.0 版或更高版本的 EMR 叢集，以及至少兩個應用程式：**Hive** 和 Flink**。

1. 使用[指令碼執行器](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html)將下列指令碼作為 Step Functions 執行：

   `hive-metastore-setup.sh`

   ```
   sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib 
   sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib 
   sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib 
   sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib
   sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar 
   sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar 
   sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar
   sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
   ```  
![\[Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.\]](http://docs.aws.amazon.com/zh_tw/emr/latest/ReleaseGuide/images/hive.png)

### 使用 AWS Glue Data Catalog
<a name="flink-configure-hive-glue"></a>

1. **建立具有 6.9.0 版或更高版本的 EMR 叢集，以及至少兩個應用程式：**Hive** 和 Flink**。

1. 在 AWS Glue Data Catalog 設定中選取**用於 Hive 資料表中繼資料**，以在叢集中啟用 Data Catalog。

1. 使用[指令碼執行器](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html)將下列指令碼作為 Step Functions 執行：[在 Amazon EMR 叢集上執行命令和指令碼](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html)：

   glue-catalog-setup.sh 

   ```
   sudo cp /usr/lib/hive/auxlib/aws-glue-datacatalog-hive3-client.jar /usr/lib/flink/lib 
   sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib 
   sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib 
   sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib 
   sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib
   sudo chmod 755 /usr/lib/flink/lib/aws-glue-datacatalog-hive3-client.jar 
   sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar 
   sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar 
   sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar
   sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
   ```  
![\[Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.\]](http://docs.aws.amazon.com/zh_tw/emr/latest/ReleaseGuide/images/hive.png)

## 使用組態檔案設定 Flink
<a name="flink-configure-config"></a>

您可以使用 Amazon EMR 組態 API 透過組態檔案設定 Flink。可在 API 內設定的檔案包括：
+ `flink-conf.yaml`
+ `log4j.properties`
+ `flink-log4j-session`
+ `log4j-cli.properties`

Flink 的主要組態檔案名為 `flink-conf.yaml`。

**從 設定用於 Flink 的任務槽數量 AWS CLI**

1. 使用下列內容建立檔案 `configurations.json`：

   ```
   [
       {
         "Classification": "flink-conf",
         "Properties": {
           "taskmanager.numberOfTaskSlots":"2"
         }
       }
   ]
   ```

1. 再以下列組態建立叢集：

   ```
   aws emr create-cluster --release-label emr-7.12.0 \
   --applications Name=Flink \
   --configurations file://./configurations.json \
   --region us-east-1 \
   --log-uri s3://myLogUri \
   --instance-type m5.xlarge \
   --instance-count 2 \
   --service-role EMR_DefaultRole_V2 \ 
   --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole
   ```

**注意**  
您還可以使用 Flink API 變更部分組態。如需詳細資訊，請參閱 Flink 文件中的[https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/index.html](https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/index.html)。  
對於 Amazon EMR 版本 5.21.0 及更高版本，您可以覆寫叢集組態，並且為執行中叢集的每個執行個體群組，指定額外組態分類。您可以使用 Amazon EMR 主控台、 AWS Command Line Interface (AWS CLI) 或 AWS SDK 來執行此操作。如需詳細資訊，請參閱[為執行中叢集的執行個體群組提供組態](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps-running-cluster.html)。

### 平行處理選項
<a name="flink-parallelism"></a>

作為自己應用程式的擁有者，您最清楚應在 Flink 內將哪些資源指派給任務。如需本文中的範例，請使用與您用於應用程式的任務執行個體相同數量的任務。一般而言，我們會建議在執行初始等級的平行處理時採用此設定，但您也可以用任務位置來增加平行處理的精細度；通常不應超過每個執行個體的[虛擬核心](https://aws.amazon.com/ec2/virtualcores/)數量。如需有關 Flink 架構的詳細資訊，請參閱 Flink 文件中的[https://ci.apache.org/projects/flink/flink-docs-master/concepts/index.html](https://ci.apache.org/projects/flink/flink-docs-master/concepts/index.html)。

## 在具有多個主節點的 EMR 叢集上設定 Flink
<a name="flink-multi-master"></a>

在具有多個主節點的 Amazon EMR 叢集中，Flink 的 JobManager 在主節點容錯移轉程序期間仍可繼續使用。從 Amazon EMR 5.28.0 開始，也會自動啟用 JobManager 高可用性。不需要手動設定。

對於 Amazon EMR 5.27.0 版或更早版本，JobManager 是單一故障點。JobManager 故障時會失去所有任務狀態，而且執行中的任務將不再繼續。您可以設定應用程式嘗試計數、設置檢查點作業，以及啟用 ZooKeeper 做為 Flink 的狀態儲存，以啟用 JobManager 高可用性，如下列範例所示：

```
[
  {
    "Classification": "yarn-site",
    "Properties": {
      "yarn.resourcemanager.am.max-attempts": "10"
    }
  },
  {
    "Classification": "flink-conf",
    "Properties": {
        "yarn.application-attempts": "10",
        "high-availability": "zookeeper",
        "high-availability.zookeeper.quorum": "%{hiera('hadoop::zk')}",
        "high-availability.storageDir": "hdfs:///user/flink/recovery",
        "high-availability.zookeeper.path.root": "/flink"
    }
  }
]
```

您必須為 YARN 設定最大應用程式主控嘗試次數，以及為 Flink 設定應用程式嘗試次數。如需詳細資訊，請參閱 [YARN 叢集高可用性的組態](https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/jobmanager_high_availability.html#maximum-application-master-attempts-yarn-sitexml)。您也可以設定 Flink 檢查點作業，讓重新啟動的 JobManager 從之前完成的檢查點恢復執行中的任務。如需詳細資訊，請參閱 [Flink 設定檢查點](https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/checkpointing.html)。

## 設定記憶體程序大小
<a name="flink-process-memory"></a>

對於使用 Flink 1.11.x 的 Amazon EMR 版本，您必須在 `flink-conf.yaml` 中設定 JobManager (`jobmanager.memory.process.size`) 和 TaskManager (`taskmanager.memory.process.size`) 的總記憶體程序大小。您可以透過使用組態 API 設定叢集或透過 SSH 手動取消註解這些欄位來設定這些值。Flink 提供了下列預設值。
+ `jobmanager.memory.process.size`：1600m
+ `taskmanager.memory.process.size`：1728m

若要排除 JVM 中繼空間和額外負荷，請使用 Flink 記憶體總大小 (`taskmanager.memory.flink.size`) 而非 `taskmanager.memory.process.size`。`taskmanager.memory.process.size` 的預設值為 1280m。不建議同時設定 `taskmanager.memory.process.size` 和 `taskmanager.memory.process.size`。

所有使用 Flink 1.12.0 及更新版本的 Amazon EMR 版本都將 Flink 開放原始碼集中列出的預設值作為 Amazon EMR 上的預設值，因此您無需自行設定它們。

## 設定日誌輸出檔案大小
<a name="flink-log-output"></a>

Flink 應用程式容器會建立並寫入三種類型的日誌檔案：`.out` 檔案、`.log` 檔案和 `.err` 檔案。僅 `.err` 檔案被壓縮並從檔案系統中移除，而 `.log` 和 `.out` 日誌檔案仍保留在檔案系統中。為了確保這些輸出檔案保持可管理且叢集保持穩定，您可以在 `log4j.properties` 中設定日誌輪換以設定檔案數量上限並限制大小。

**Amazon EMR 5.30.0 版及更新版本**

從 Amazon EMR 5.30.0 開始，Flink 使用組態分類名稱為 `flink-log4j.` 的 log4j2 日誌記錄架構。下列範例組態示範了 log4j2 格式。

```
[
  {
    "Classification": "flink-log4j",
    "Properties": {
      "appender.main.name": "MainAppender",
      "appender.main.type": "RollingFile",
      "appender.main.append" : "false",
      "appender.main.fileName" : "${sys:log.file}",
      "appender.main.filePattern" : "${sys:log.file}.%i",
      "appender.main.layout.type" : "PatternLayout",
      "appender.main.layout.pattern" : "%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n",
      "appender.main.policies.type" : "Policies",
      "appender.main.policies.size.type" : "SizeBasedTriggeringPolicy",
      "appender.main.policies.size.size" : "100MB",
      "appender.main.strategy.type" : "DefaultRolloverStrategy",
      "appender.main.strategy.max" : "10"
    },
  }
]
```

**Amazon EMR 5.29.0 版及更早版本**

在 Amazon EMR 5.29.0 版及更早版本中，Flink 使用 log4j 日誌記錄架構。下列範例組態示範了 log4j 格式。

```
[
  {
    "Classification": "flink-log4j",
    "Properties": {
      "log4j.appender.file": "org.apache.log4j.RollingFileAppender",
      "log4j.appender.file.append":"true",
      # keep up to 4 files and each file size is limited to 100MB
      "log4j.appender.file.MaxFileSize":"100MB",
      "log4j.appender.file.MaxBackupIndex":4,
      "log4j.appender.file.layout":"org.apache.log4j.PatternLayout",
      "log4j.appender.file.layout.ConversionPattern":"%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n"
    },
  }
]
```

## 將 Flink 設定為使用 Java 11 執行
<a name="flink-configure-java11"></a>

Amazon EMR 6.12.0 版及更高版本為 Flink 提供了 Java 11 執行期支援。下列各章節描述如何設定叢集，以為 Flink 提供 Java 11 執行期支援。

**Topics**
+ [在建立叢集時為 Java 11 設定 Flink](#flink-configure-java11-create)
+ [在執行中的叢集上為 Java 11 設定 Flink](#flink-configure-java11-update)
+ [確認執行中叢集上 Flink 的 Java 執行期](#flink-configure-java11-confirm)

### 在建立叢集時為 Java 11 設定 Flink
<a name="flink-configure-java11-create"></a>

使用下列步驟透過 Flink 和 Java 11 執行期建立 EMR 叢集。您在其中新增 Java 11 執行期支援的組態檔案是 `flink-conf.yaml`。

------
#### [ Console ]

**在主控台中建立具有 Flink 和 Java 11 執行時間的叢集**

1. 登入 AWS 管理主控台，並在 https：//[https://console.aws.amazon.com/emr](https://console.aws.amazon.com/emr) 開啟 Amazon EMR 主控台。

1. 在導覽窗格中的 **EC2 上的 EMR** 下，選擇**叢集**，然後選擇**建立叢集**。

1. 選取 Amazon EMR 6.12.0 版或更高版本，然後選擇安裝 Flink 應用程式。選取您要在您的叢集上安裝的任何其他應用程式。

1. 繼續設定您的叢集。在選用**軟體設定**區段中，使用預設**輸入組態**選項，然後輸入下列組態：

   ```
   [
       {
         "Classification": "flink-conf",
         "Properties": {
           "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
           "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
           "env.java.home":"/usr/lib/jvm/jre-11"
         }
       }
   ]
   ```

1. 繼續設定並啟動您的叢集。

------
#### [ AWS CLI ]

**從 CLI 使用 Flink 和 Java 11 執行期建立叢集**

1. 建立將 Flink 設定為使用 Java 11 的組態檔案 `configurations.json`。

   ```
   [
       {
         "Classification": "flink-conf",
         "Properties": {
           "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
           "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
           "env.java.home":"/usr/lib/jvm/jre-11"
         }
       }
   ]
   ```

1. 從 AWS CLI中，使用 Amazon EMR 6.12.0 版或更新版本建立新的 EMR 叢集，並安裝 Flink 應用程式，如下列範例所示：

   ```
   aws emr create-cluster --release-label emr-6.12.0 \ 
   --applications Name=Flink \ 
   --configurations file://./configurations.json \ 
   --region us-east-1 \ 
   --log-uri s3://myLogUri \ 
   --instance-type m5.xlarge \ 
   --instance-count 2 \ 
   --service-role EMR_DefaultRole_V2 \ 
   --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole
   ```

------

### 在執行中的叢集上為 Java 11 設定 Flink
<a name="flink-configure-java11-update"></a>

使用下列步驟透過 Flink 和 Java 11 執行期更新執行中的 EMR 叢集。您在其中新增 Java 11 執行期支援的組態檔案是 `flink-conf.yaml`。

------
#### [ Console ]

**在主控台中使用 Flink 和 Java 11 執行時間更新執行中的叢集**

1. 登入 AWS 管理主控台，並在 https：//[https://console.aws.amazon.com/emr](https://console.aws.amazon.com/emr) 開啟 Amazon EMR 主控台。

1. 在導覽窗格中的 **EC2 上的 EMR** 下，選擇**叢集**，然後選取您要更新的叢集。
**注意**  
叢集必須使用 Amazon EMR 6.12.0 版或更高版本才能支援 Java 11。

1. 選取**組態**標籤。

1. 在**執行個體群組組態**區段中，選取您要更新的**執行中**執行個體群組，然後從清單動作功能表中選擇**重新設定**。

1. 使用**編輯屬性**選項重新設定執行個體群組，如下所示。在每一項之後選取**新增組態**。    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/emr/latest/ReleaseGuide/flink-configure.html)

1. 選取**儲存變更**以新增組態。

------
#### [ AWS CLI ]

**從 CLI 使用 Flink 和 Java 11 執行期更新執行中的叢集**

使用 `modify-instance-groups` 命令，為執行中叢集中的執行個體群組指定新組態。

1. 首先，建立將 Flink 設定為使用 Java 11 的組態檔案 `configurations.json`。在下列範例中，將 *ig-1xxxxxxx9* 取代為您要重新設定的執行個體群組的 ID。將檔案儲存在執行 `modify-instance-groups` 命令的相同目錄中。

   ```
   [
      {
         "InstanceGroupId":"ig-1xxxxxxx9",
         "Configurations":[
            {
               "Classification":"flink-conf",
               "Properties":{
                 "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
                 "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
                 "env.java.home":"/usr/lib/jvm/jre-11"
               },
               "Configurations":[]
            }
         ]
      }
   ]
   ```

1. 從 中 AWS CLI，執行下列命令。取代您要重新設定的執行個體群組的 ID：

   ```
   aws emr modify-instance-groups --cluster-id j-2AL4XXXXXX5T9 \
   --instance-groups file://configurations.json
   ```

------

### 確認執行中叢集上 Flink 的 Java 執行期
<a name="flink-configure-java11-confirm"></a>

若要確定執行中的叢集的 Java 執行期，請使用 SSH 登入主節點，如[使用 SSH 連接至主節點](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)中所述。然後執行以下命令：

```
ps -ef | grep flink
```

具有 `-ef` 選項的 `ps` 命令列出了系統上所有執行中的程序。您可以使用 `grep` 篩選該輸出，以尋找提及的字串 `flink`。檢閱 Java 執行階段環境 (JRE) 值 `jre-XX` 的輸出。在下列輸出中，`jre-11` 指出在執行期為 Flink 選擇 Java 11。

```
flink    19130     1  0 09:17 ?        00:00:15 /usr/lib/jvm/jre-11/bin/java -Djava.io.tmpdir=/mnt/tmp -Dlog.file=/usr/lib/flink/log/flink-flink-historyserver-0-ip-172-31-32-127.log -Dlog4j.configuration=file:/usr/lib/flink/conf/log4j.properties -Dlog4j.configurationFile=file:/usr/lib/flink/conf/log4j.properties -Dlogback.configurationFile=file:/usr/lib/flink/conf/logback.xml -classpath /usr/lib/flink/lib/flink-cep-1.17.0.jar:/usr/lib/flink/lib/flink-connector-files-1.17.0.jar:/usr/lib/flink/lib/flink-csv-1.17.0.jar:/usr/lib/flink/lib/flink-json-1.17.0.jar:/usr/lib/flink/lib/flink-scala_2.12-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-java-uber-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-scala-bridge_2.12-1.17.0.
```

或者，[使用 SSH 登入主節點](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)，然後使用命令 `flink-yarn-session -d` 啟動 Flink YARN 工作階段。輸出顯示 Flink 的 Java 虛擬機器 (JVM)，在下列範例中為 `java-11-amazon-corretto`：

```
2023-05-29 10:38:14,129 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: containerized.master.env.JAVA_HOME, /usr/lib/jvm/java-11-amazon-corretto.x86_64
```