

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 建立並執行 Managed Service for Apache Flink 應用程式
<a name="gs-table-create"></a>

在本練習中，您會建立 Managed Service for Apache Flink 應用程式，以 Kinesis 資料串流做為來源和目的地。

**Topics**
+ [建立相依資源](#gs-table-resources)
+ [設定您的本機開發環境](#gs-table-2)
+ [下載並檢查 Apache Flink 串流 Java 程式碼](#gs-table-5)
+ [在本機執行您的應用程式](#gs-table-run-locally)
+ [觀察應用程式將資料寫入 S3 儲存貯體](#gs-table-input-output)
+ [停止應用程式在本機執行](#gs-table-stop)
+ [編譯和封裝您的應用程式程式碼](#gs-table-5.5)
+ [上傳應用程式碼 JAR 檔案](#gs-table-6)
+ [建立和設定 Managed Service for Apache Flink 應用程式](#gs-table-7)

## 建立相依資源
<a name="gs-table-resources"></a>

在為本練習建立 Managed Service for Apache Flink 之前，先建立下列相依資源：
+ Amazon S3 儲存貯體，用於存放應用程式的程式碼和寫入應用程式輸出。
**注意**  
本教學假設您正在 us-east-1 區域中部署應用程式。如果您使用另一個區域，則必須相應地調整所有步驟。

### 建立 Amazon S3 儲存貯體
<a name="gs-table-resources-s3"></a>

您可以使用主控台建立 Amazon S3 儲存貯體。如需建立這些資源的相關指示，請參閱以下主題：
+ 《Amazon Simple Storage Service 使用者指南》中的[如何建立 S3 儲存貯體](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html)。**透過附加您的登入名稱，為 Amazon S3 儲存貯體提供全域唯一名稱。
**注意**  
請確定您在用於本教學課程的區域中建立儲存貯體。教學課程的預設值為 us-east-1。

### 其他資源
<a name="gs-table-resources-cw"></a>

建立應用程式時，Managed Service for Apache Flink 會建立下列 Amazon CloudWatch 資源 (如果尚不存在該資源)：
+ 名為 `/AWS/KinesisAnalytics-java/<my-application>` 的日誌群組。
+ 名為 `kinesis-analytics-log-stream` 的日誌串流。

## 設定您的本機開發環境
<a name="gs-table-2"></a>

對於開發和偵錯，您可以直接從您選擇的 IDE 在機器上執行 Apache Flink 應用程式。任何 Apache Flink 相依性都會使用 Maven 做為一般 Java 相依性處理。

**注意**  
在您的開發機器上，您必須安裝 Java JDK 11、Maven 和 Git。我們建議您使用開發環境，例如 [Eclipse Java Neon](https://www.eclipse.org/downloads/packages/release/neon/3) 或 [IntelliJ IDEA](https://www.jetbrains.com/idea/)。若要驗證您是否符合所有先決條件，請參閱 [滿足完成練習的先決條件](getting-started.md#setting-up-prerequisites)。**您不需要**在機器上安裝 Apache Flink 叢集。

### 驗證您的 AWS 工作階段
<a name="get-started-exercise-table-authenticate"></a>

應用程式使用 Kinesis 資料串流來發佈資料。在本機執行時，您必須擁有有效的已 AWS 驗證工作階段，具有寫入 Kinesis 資料串流的許可。使用下列步驟來驗證您的工作階段：

1. 如果您沒有設定 AWS CLI 和具有有效登入資料的具名設定檔，請參閱 [設定 AWS Command Line Interface (AWS CLI)](setup-awscli.md)。

1. 如果您的 IDE 有要整合的外掛程式 AWS，您可以使用它將登入資料傳遞至在 IDE 中執行的應用程式。如需詳細資訊，請參閱 [AWS Toolkit for IntelliJ IDEA](https://aws.amazon.com/intellij/) 和 [AWS Toolkit for編譯應用程式或執行 Eclipse](https://docs.aws.amazon.com/toolkit-for-eclipse/v1/user-guide/welcome.html)。

## 下載並檢查 Apache Flink 串流 Java 程式碼
<a name="gs-table-5"></a>

此範例的應用程式碼可從 GitHub 取得。

**下載 Java 應用程式的程式碼**

1. 使用以下指令複製遠端儲存庫：

   ```
   git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
   ```

1. 導覽至 `./java/GettingStartedTable` 目錄。

### 檢閱應用程式元件
<a name="get-started-exercise-table-components"></a>

應用程式完全在 `com.amazonaws.services.msf.BasicTableJob`類別中實作。`main()` 方法定義來源、轉換和接收器。此方法結尾的執行陳述式會啟動執行。

**注意**  
為了獲得最佳的開發人員體驗，應用程式設計為在 Amazon Managed Service for Apache Flink 和本機上執行，在 IDE 中進行開發時不會有任何程式碼變更。
+ 若要讀取執行時間組態，以便在 Amazon Managed Service for Apache Flink 和 IDE 中執行時運作，應用程式會自動偵測它是否在 IDE 中的本機執行獨立。在這種情況下，應用程式會以不同的方式載入執行時間組態：

  1. 當應用程式偵測到其在您的 IDE 中以獨立模式執行時，請形成包含在專案**資源**資料夾中`application_properties.json`的檔案。檔案的內容如下。

  1. 當應用程式在 Amazon Managed Service for Apache Flink 中執行時，預設行為會從您在 Amazon Managed Service for Apache Flink 應用程式中定義的執行期屬性載入應用程式組態。請參閱 [建立和設定 Managed Service for Apache Flink 應用程式](get-started-exercise.md#get-started-exercise-7)。

     ```
     private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
         if (env instanceof LocalStreamEnvironment) {
             LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE);
             return KinesisAnalyticsRuntime.getApplicationProperties(
                     BasicStreamingJob.class.getClassLoader()
                             .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath());
         } else {
             LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink");
             return KinesisAnalyticsRuntime.getApplicationProperties();
         }
     }
     ```
+ `main()` 方法會定義應用程式資料流程並執行它。
  + 初始化預設串流環境。在此範例中，我們會示範如何建立 `StreamExecutionEnvironment` 以與 DataStream API 搭配使用，以及建立 `StreamTableEnvironment`以與 SQL 和資料表 API 搭配使用。兩個環境物件是相同執行時間環境的兩個不同參考，以使用不同的 APIs。

    ```
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
    ```
  + 載入應用程式組態參數。這會自動從正確的位置載入它們，視應用程式執行的位置而定：

    ```
    Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    ```
  + 當 Flink 完成[檢查點](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/stateful-stream-processing/#checkpointing)時，應用程式用來將結果寫入 Amazon S3 輸出檔案的 [FileSystem 接收器連接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/filesystem/#streaming-sink)。您必須啟用檢查點，才能將檔案寫入目的地。當應用程式在 Amazon Managed Service for Apache Flink 中執行時，應用程式組態會控制檢查點，並預設啟用它。相反地，在本機執行時，檢查點預設為停用。應用程式偵測到它在本機執行，並每 5，000 毫秒設定檢查點。

    ```
     if (env instanceof LocalStreamEnvironment) {
        env.enableCheckpointing(5000);
     }
    ```
  + 此應用程式不會從實際外部來源接收資料。它會產生隨機資料，以透過 [DataGen 連接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/)進行處理。此連接器適用於 DataStream API、SQL 和資料表 API。為了示範 APIs之間的整合，應用程式會使用 DataStram API 版本，因為它提供更多彈性。每個記錄都是由`StockPriceGeneratorFunction`在此情況下稱為 的產生器函數產生，您可以在其中放置自訂邏輯。

    ```
    DataGeneratorSource<StockPrice> source = new DataGeneratorSource<>(
            new StockPriceGeneratorFunction(),
            Long.MAX_VALUE,
            RateLimiterStrategy.perSecond(recordPerSecond),
            TypeInformation.of(StockPrice.class));
    ```
  + 在 DataStream API 中，記錄可以有自訂類別。類別必須遵循特定規則，以便 Flink 可以將其用作記錄。如需詳細資訊，請參閱[支援的資料類型](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#supported-data-types)。在此範例中，`StockPrice`類別是 [POJO](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos)。
  + 然後，來源會連接至執行環境，產生 `DataStream`的 `StockPrice`。此應用程式不使用[事件時間語意](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/time/#notions-of-time-event-time-and-processing-time)，也不會產生浮水印。執行平行處理為 1 的 DataGenerator 來源，與應用程式的其餘部分無關。

    ```
    DataStream<StockPrice> stockPrices = env.fromSource(
            source, 
            WatermarkStrategy.noWatermarks(),
            "data-generator"
        ).setParallelism(1);
    ```
  + 資料處理流程中的後續內容是使用資料表 API 和 SQL 來定義。為此，我們將 StockPrices 的 DataStream 轉換為資料表。資料表的結構描述會自動從 `StockPrice`類別推斷。

    ```
    Table stockPricesTable = tableEnv.fromDataStream(stockPrices);
    ```
  + 下列程式碼片段說明如何使用程式設計資料表 API 定義檢視和查詢：

    ```
    Table filteredStockPricesTable = stockPricesTable.
            select(
                    $("eventTime").as("event_time"),
                    $("ticker"),
                    $("price"),
                    dateFormat($("eventTime"), "yyyy-MM-dd").as("dt"),
                    dateFormat($("eventTime"), "HH").as("hr")
            ).where($("price").isGreater(50));
            
    tableEnv.createTemporaryView("filtered_stock_prices", filteredStockPricesTable);
    ```
  + 系統會定義接收器資料表，將結果寫入 Amazon S3 儲存貯體做為 JSON 檔案。為了說明以程式設計方式定義檢視的差異，使用資料表 API 時，系統會使用 SQL 定義目的地資料表。

    ```
    tableEnv.executeSql("CREATE TABLE s3_sink (" +
                "eventTime TIMESTAMP(3)," +
                "ticker STRING," +
                "price DOUBLE," +
                "dt STRING," +
                "hr STRING" +
            ") PARTITIONED BY ( dt, hr ) WITH (" +
            "'connector' = 'filesystem'," +
            "'fmat' = 'json'," +
            "'path' = 's3a://"  + s3Path + "'" +
            ")");
    ```
  + 的最後一個步驟是`executeInsert()`將篩選的股票價格檢視插入目的地資料表。此方法會啟動執行我們到目前為止定義的資料流程。

    ```
    filteredStockPricesTable.executeInsert("s3_sink");
    ```

### 使用 pom.xml 檔案
<a name="get-started-exercise-5-2"></a>

pom.xml 檔案會定義應用程式所需的所有相依性，並設定 Maven Shade 外掛程式來建置包含 Flink 所需所有相依性的 fat-jar。
+ 有些相依性具有`provided`範圍。當應用程式在 Amazon Managed Service for Apache Flink 中執行時，這些相依性會自動可用。應用程式或本機 IDE 中的應用程式需要它們。如需詳細資訊，請參閱 （更新至 TableAPI)[在本機執行您的應用程式](get-started-exercise.md#get-started-exercise-5-run)。請確定您使用的 Flink 版本與您在 Amazon Managed Service for Apache Flink 中使用的執行時間相同。若要使用 TableAPI 和 SQL，您必須包含 `flink-table-planner-loader`和 `flink-table-runtime-dependencies`，兩者皆包含 `provided` 範圍。

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-loader</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-runtime</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  ```
+ 您必須使用預設範圍將其他 Apache Flink 相依性新增至 pom。例如，[DataGen 連接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/)、[FileSystem SQL 連接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/filesystem/)和 [JSON 格式](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/formats/json/)。

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-datagen</artifactId>
      <version>${flink.version}</version>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-files</artifactId>
      <version>${flink.version}</version>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-json</artifactId>
      <version>${flink.version}</version>
  </dependency>
  ```
+ 若要在本機執行時寫入 Amazon S3，S3 Hadoop 檔案系統也包含 wit `provided`範圍。

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-s3-fs-hadoop</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  ```
+ Maven Java 編譯器外掛程式可確保程式碼是根據 Apache Flink 目前支援的 JDK 版本 Java 11 編譯。
+ Maven Shade 外掛程式會封裝 fat-jar，不包括執行時間提供的一些程式庫。它也會指定兩個轉換器： `ServicesResourceTransformer`和 `ManifestResourceTransformer`。後者會設定 類別，其中包含啟動應用程式的 `main`方法。如果您重新命名主類別，請不要忘記更新此轉換器。
+ 

  ```
  <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      ...
          <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
              <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass>
          </transformer>
      ...
  </plugin>
  ```

## 在本機執行您的應用程式
<a name="gs-table-run-locally"></a>

您可以在 IDE 中的本機執行和偵錯 Flink 應用程式。

**注意**  
繼續之前，請確認輸入和輸出串流是否可用。請參閱 [建立兩個 Amazon Kinesis 資料串流](get-started-exercise.md#get-started-exercise-1)。此外，請確認您具有從兩個串流讀取和寫入的許可。請參閱 [驗證您的 AWS 工作階段](get-started-exercise.md#get-started-exercise-2-5)。  
設定本機開發環境需要 Java 11 JDK、Apache Maven 和 IDE for Java 開發。確認您符合必要的先決條件。請參閱 [滿足完成練習的先決條件](getting-started.md#setting-up-prerequisites)。

### 將 Java 專案匯入 IDE
<a name="gs-table-import"></a>

若要開始在 IDE 中使用應用程式，您必須將其匯入為 Java 專案。

您複製的儲存庫包含多個範例。每個範例都是個別的專案。在本教學課程中，將`./jave/GettingStartedTable`子目錄中的內容匯入 IDE 。

使用 Maven 將程式碼插入為現有的 Java 專案。

**注意**  
匯入新 Java 專案的確切程序取決於您使用的 IDE。

### 修改本機應用程式組態
<a name="gs-table-modify-2"></a>

在本機執行時，應用程式會使用 下專案資源資料夾中 `application_properties.json` 檔案中的組態`./src/main/resources`。對於此教學課程應用程式，組態參數是儲存貯體的名稱，以及寫入資料的路徑。

編輯組態並修改 Amazon S3 儲存貯體的名稱，以符合您在本教學課程開始時建立的儲存貯體。

```
[
 {
 "PropertyGroupId": "bucket",
 "PropertyMap": {
 "name": "{{<bucket-name>}}",
 "path": "output"
 }
 }
]
```

**注意**  
組態屬性`name`必須僅包含儲存貯體名稱，例如 `my-bucket-name`。請勿包含任何字首，例如 `s3://`或結尾斜線。  
如果您修改路徑，請省略任何正斜線或尾斜線。

### 設定 IDE 執行組態
<a name="gs-table-setupIDE"></a>

您可以執行主要類別 ，直接從 IDE 執行和偵錯 Flink 應用程式`com.amazonaws.services.msf.BasicTableJob`，就像執行任何 Java 應用程式一樣。在執行應用程式之前，您必須設定執行組態。設定取決於您使用的 IDE。例如，請參閱 IntelliJ IDEA 文件中的[執行/偵錯組態](https://www.jetbrains.com/help/idea/run-debug-configuration.html)。特別是，您必須設定下列項目：

1. **將`provided`相依性新增至 classpath**。這是必要的，以確保在本機執行時，具有`provided`範圍的相依性會傳遞至應用程式。如果沒有此設定，應用程式會立即顯示`class not found`錯誤。

1. **傳遞 AWS 登入資料以存取應用程式的 Kinesis 串流**。最快的方法是使用 [AWS Toolkit for IntelliJ IDEA](https://aws.amazon.com/intellij/)。在執行組態中使用此 IDE 外掛程式，您可以選取特定 AWS 設定檔。 AWS 身分驗證會使用此設定檔進行。您不需要直接傳遞 AWS 登入資料。

1. 確認 IDE 使用 **JDK 11 **執行應用程式。

### 在 IDE 中執行應用程式
<a name="gs-table-runIDE"></a>

設定 的執行組態後`BasicTableJob`，您可以像一般 Java 應用程式一樣執行或偵錯組態。

**注意**  
您無法`java -jar ...`直接從命令列使用 執行 Maven 產生的 fat-jar。此 jar 不包含獨立執行應用程式所需的 Flink 核心相依性。

當應用程式成功啟動時，它會記錄有關獨立微型叢集和連接器初始化的一些資訊。這後面接著一些 INFO 和一些 WARN 日誌，Flink 通常會在應用程式啟動時發出。

```
21:28:34,982 INFO  com.amazonaws.services.msf.BasicTableJob                     
                [] - Loading application properties from 'flink-application-properties-dev.json'
21:28:35,149 INFO  com.amazonaws.services.msf.BasicTableJob                     
[] - s3Path is ExampleBucket/my-output-bucket
...
```

初始化完成後，應用程式不會發出任何進一步的日誌項目。**資料流動時，不會發出日誌。**

若要驗證應用程式是否正確處理資料，您可以檢查輸出儲存貯體的內容，如下節所述。

**注意**  
 不發出有關流動資料的日誌是 Flink 應用程式的正常行為。在每個記錄上發出日誌對於偵錯可能很方便，但在生產環境中執行時可能會增加相當大的開銷。

## 觀察應用程式將資料寫入 S3 儲存貯體
<a name="gs-table-input-output"></a>

此範例應用程式會在內部產生隨機資料，並將此資料寫入您設定的目的地 S3 儲存貯體。除非您修改預設組態路徑，否則資料將寫入`output`路徑，後面接著資料和小時分割，格式為 `./output/<yyyy-MM-dd>/<HH>`。

[ FileSystem 接收器連接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/filesystem/#streaming-sink)會在 Flink 檢查點上建立新的檔案。在本機執行時，應用程式會每 5 秒 (5，000 毫秒） 執行檢查點，如程式碼中所指定。

```
 if (env instanceof LocalStreamEnvironment) {
    env.enableCheckpointing(5000);
 }
```

**瀏覽 S3 儲存貯體並觀察應用程式寫入的檔案**

1. 

   1. 開啟位於 [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/) 的 Amazon S3 主控台。

1. 選擇您先前建立的儲存貯體。

1. 導覽至`output`路徑，然後導覽至與 UTC 時區中目前時間對應的日期和小時資料夾。

1. 定期重新整理以觀察每 5 秒出現的新檔案。

1. 選取並下載一個檔案來觀察內容。
**注意**  
根據預設，檔案沒有副檔名。內容的格式為 JSON。您可以使用任何文字編輯器開啟檔案來檢查內容。

## 停止應用程式在本機執行
<a name="gs-table-stop"></a>

停止在 IDE 中執行的應用程式。IDE 通常提供「停止」選項。確切的位置和方法取決於 IDE。

## 編譯和封裝您的應用程式程式碼
<a name="gs-table-5.5"></a>

在本節中，您會使用 Apache Maven 編譯 Java 程式碼，並將其封裝至 JAR 檔案。您可以使用 Maven 命令列工具或 IDE 來編譯和封裝程式碼。

**使用 Maven 命令列編譯和封裝**

移至包含 Jave GettingStarted 專案的目錄，並執行下列命令：

```
$ mvn package
```

**使用 IDE 編譯和封裝**

`mvn package` 從 IDE Maven 整合執行 。

在這兩種情況下，`target/amazon-msf-java-table-app-1.0.jar`都會建立 JAR 檔案。

**注意**  
從 IDE 執行*建置專案*可能不會建立 JAR 檔案。

## 上傳應用程式碼 JAR 檔案
<a name="gs-table-6"></a>

在本節中，您將您在上一節中建立的 JAR 檔案上傳至在本教學課程開始時建立的 Amazon S3 儲存貯體。如果您已完成，請完成 [建立 Amazon S3 儲存貯體](#gs-table-resources-s3)。

**上傳應用程式的程式碼**

1. 開啟位於 [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/) 的 Amazon S3 主控台。

1. 選擇您先前為應用程式程式碼建立的儲存貯體。

1. 選擇**上傳**欄位。

1. 選擇 **Add files (新增檔案)**。

1. 導覽至上一節產生的 JAR 檔案：`target/amazon-msf-java-table-app-1.0.jar`。

1. 選擇**上傳**，而不變更任何其他設定。
**警告**  
 請務必在 中選取正確的 JAR 檔案`<repo-dir>/java/GettingStarted/target/amazon/msf-java-table-app-1.0.jar`。  
目標目錄也包含您不需要上傳的其他 JAR 檔案。

## 建立和設定 Managed Service for Apache Flink 應用程式
<a name="gs-table-7"></a>

您可以使用 主控台或 建立和設定 Managed Service for Apache Flink 應用程式 AWS CLI。在本教學課程中，您將使用 主控台。

**注意**  
當您使用 主控台建立應用程式時，系統會為您建立 AWS Identity and Access Management (IAM) 和 Amazon CloudWatch Logs 資源。當您使用 建立應用程式時 AWS CLI，您必須分別建立這些資源。

### 建立應用程式
<a name="gs-table-7-console-create"></a>

1. 登入 AWS 管理主控台，並在 https：//https://console.aws.amazon.com/flink 開啟 Amazon MSF 主控台。

1. 確認已選取正確的區域：美國東部 （維吉尼亞北部） us-east-1。

1. 在右側功能表中，選擇 **Apache Flink 應用程式**，然後選擇**建立串流應用程式**。或者，在初始頁面的**開始使用**區段中選擇**建立串流應用程式**。

1. 在**建立串流應用程式**頁面上，完成下列操作：
   + 針對**選擇設定串流處理應用程式的方法**，選擇**從頭開始建立**。
   + 針對 **Apache Flink 組態、Application Flink 版本**，選擇 **Apache Flink 1.19**。
   + 在**應用程式組態**區段中，完成下列操作：
     + 在**應用程式名稱**中，輸入 **MyApplication**。
     + 對於 **Description (說明)**，輸入 **My Java Table API test app**。
     + 針對**存取應用程式資源**，選擇**使用必要政策建立/更新 IAM 角色 kinesis-analytics-MyApplication-us-east-1**。
   + 在**應用程式設定的範本**中，完成下列操作：
     + 針對**範本**，選擇**開發**。

1. 選擇**建立串流應用程式**。

**注意**  
使用主控台建立 Managed Service for Apache Flink 應用程式時，可以選擇是否為應用程式建立 IAM 角色和政策。應用程式使用此角色和政策來存取其相依資源。這些 IAM 資源會如下所述使用您的應用程式名稱和區域命名：  
政策：`kinesis-analytics-service-{{MyApplication}}-{{us-east-1}}`
角色：`kinesisanalytics-{{MyApplication}}-{{us-east-1}}`

### 編輯 IAM 政策
<a name="gs-table-7-console-iam"></a>

編輯 IAM 政策以新增 Amazon S3 儲存貯體存取許可。

**編輯 IAM 政策以新增 S3 儲存貯體許可**

1. 前往 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 開啟 IAM 主控台。

1. 選擇**政策**。選擇主控台為您在上一節所建立的 **`kinesis-analytics-service-MyApplication-us-east-1`** 政策。

1. 選擇**編輯**，然後選擇 **JSON** 標籤。

1. 將下列政策範例的反白部分新增至政策。將範例帳戶 ID ({{012345678901}}) 取代為您的帳戶 ID，並將 {{<bucket-name>}} 取代為您建立的 S3 儲存貯體名稱。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::{{amzn-s3-demo-bucket}}/kinesis-analytics-placeholder-s3-object"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:{{us-east-1}}:{{123456789012}}:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:{{us-east-1}}:{{123456789012}}:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:{{us-east-1}}:{{123456789012}}:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           }{{, 
           {
               "Sid": "WriteOutputBucket", 
               "Effect": "Allow", 
               "Action": "s3:*", 
               "Resource": [
                   "arn:aws:s3:::{{amzn-s3-demo-bucket2}}"
               ]
          }}}
       ]
   }
   ```

------

1.  選擇**下一步**，然後選擇**儲存變更**。

### 設定應用程式
<a name="gs-table-7-console-configure"></a>

編輯應用程式以設定應用程式碼成品。

**設定應用程式**

1. 在 **MyApplication** 頁面，選擇**設定**。

1. 在**複寫程式碼位置**區段中，選擇**設定**。
   + 針對 **Amazon S3 儲存貯**體，選取您先前為應用程式碼建立的儲存貯體。選擇**瀏覽**並選擇正確的儲存貯體，然後選擇**選擇**。請勿按一下儲存貯體名稱。
   + 對於 **Amazon S3 物件的路徑**，請輸入 **amazon-msf-java-table-app-1.0.jar**。

1. 對於**存取許可**，選擇**建立/更新 IAM 角色 `kinesis-analytics-MyApplication-us-east-1`**。

1. 在**執行期屬性**區段中，新增下列屬性。

1. 選擇**新增項目**並新增下列每個參數：    
[See the AWS documentation website for more details](http://docs.aws.amazon.com/zh_tw/managed-flink/latest/java/gs-table-create.html)

1. 請勿修改任何其他設定。

1. 選擇**儲存變更**。

**注意**  
當您選擇啟用 Amazon CloudWatch 日誌時，Managed Service for Apache Flink 便會為您建立日誌群組和日誌串流。這些資源的名稱如下所示：  
日誌群組：`/aws/kinesis-analytics/MyApplication`
日誌串流：`kinesis-analytics-log-stream`

### 執行應用程式
<a name="gs-table-7-console-run"></a>

應用程式現在已設定並準備好執行。

**執行應用程式**

1. 返回 Amazon Managed Service for Apache Flink 中的主控台頁面，然後選擇 **MyApplication**。

1. 選擇**執行**以啟動應用程式。

1. 在**應用程式還原組態**上，選擇**使用最新的快照執行**。

1. 選擇**執行**。

1. **應用程式詳細資訊**中的**狀態**會在應用程式啟動`Running`後從 轉換為 `Ready` `Starting` ，然後轉換為 。

當應用程式處於 `Running` 狀態時，您可以開啟 Flink 儀表板。

**開啟儀表板並檢視任務**

1. 選擇**開啟 Apache Flink 破折號**。儀表板會在新頁面中開啟。

1. 在**執行中任務**清單中，選擇您可以看到的單一任務。
**注意**  
如果您未正確設定執行時間屬性或編輯 IAM 政策，應用程式狀態可能會變更為 `Running`，但 Flink 儀表板會顯示任務持續重新啟動。當應用程式設定錯誤或缺少存取外部資源的許可時，這是常見的失敗案例。  
發生這種情況時，請檢查 Flink 儀表板中的**例外**狀況索引標籤，以調查問題的原因。

### 觀察執行中應用程式的指標
<a name="gs-observe-metrics"></a>

在 **MyApplication** 頁面的 **Amazon CloudWatch 指標**區段中，您可以從執行中的應用程式查看一些基本指標。

**檢視指標**

1. 在**重新整理**按鈕旁，從下拉式清單中選取 **10 秒**。

1. 當應用程式執行正常時，您可以看到**運作時間**指標持續增加。

1. **fullrestarts** 指標應為零。如果增加，組態可能會發生問題。檢閱 Flink 儀表板上的**例外狀況**索引標籤以調查問題。

1. 在運作狀態良好的應用程式中，**失敗檢查點指標的數量**應為零。
**注意**  
此儀表板會顯示一組固定的指標，精細程度為 5 分鐘。您可以使用 CloudWatch 儀表板中的任何指標來建立自訂應用程式儀表板。

### 觀察應用程式將資料寫入目的地儲存貯體
<a name="gs-observe-output"></a>

您現在可以觀察在 Amazon Managed Service for Apache Flink 中執行的應用程式，將檔案寫入 Amazon S3。

若要觀察檔案，請遵循您在本機執行應用程式時用來檢查所寫入檔案的相同程序。請參閱 [觀察應用程式將資料寫入 S3 儲存貯體](#gs-table-input-output)。

請記住，應用程式會在 Flink 檢查點寫入新檔案。在 Amazon Managed Service for Apache Flink 上執行時，預設會啟用檢查點，並每 60 秒執行一次。應用程式大約每 1 分鐘建立新檔案。

### 停止應用程式
<a name="gs-table-7-console-stop"></a>

若要停止應用程式，請前往名為 的 Managed Service for Apache Flink 應用程式的主控台頁面`MyApplication`。

**停止應用程式**

1. 從**動作**下拉式清單中，選擇**停止**。

1. **應用程式詳細資訊**中的**狀態**會從 轉換為 `Running` `Stopping`，然後在應用程式完全停止`Ready`時轉換為 。
**注意**  
別忘了也要停止從 Python 指令碼或 Kinesis Data Generator 將資料傳送至輸入串流。