本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
建立並執行 Managed Service for Apache Flink 應用程式
在此步驟中,您會建立 Managed Service for Apache Flink 應用程式,並將 Kinesis 資料串流做為來源和接收器。
本節包含下列步驟:
建立相依資源
在為本練習建立 Managed Service for Apache Flink 應用程式之前,先建立下列相依資源:
-
兩個用於輸入和輸出的 Kinesis 資料串流
-
存放應用程式程式碼的 Amazon S3 儲存貯體
注意
本教學假設您正在 us-east-1 美國東部 (維吉尼亞北部) 區域中部署應用程式。如果您使用另一個區域,請相應地調整所有步驟。
建立兩個 Amazon Kinesis 資料串流
在為本練習建立 Managed Service for Apache Flink 應用程式之前,請先建立兩個 Kinesis 資料串流 (ExampleInputStream 和 ExampleOutputStream)。您的應用程式會將這些串流用於應用程式來源和目的地串流。
您可以使用 Amazon Kinesis 主控台或下列 AWS CLI 命令來建立這些串流。如需主控台指示,請參閱《Amazon Kinesis Data Streams 開發人員指南》中的建立和更新資料串流。若要使用 建立串流 AWS CLI,請使用下列命令,調整至您用於應用程式的 區域。
建立資料串流 (AWS CLI)
-
若要建立第一個串流 (
ExampleInputStream),請使用下列 Amazon Kinesiscreate-streamAWS CLI 命令:$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \ -
若要建立應用程式用來寫入輸出的第二個串流,請執行相同的命令,將串流名稱變更為
ExampleOutputStream:$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1 \
為應用程式碼建立 Amazon S3 儲存貯體
您可以使用主控台建立 Amazon S3 儲存貯體。若要了解如何使用主控台建立 Amazon S3 儲存貯體,請參閱《Amazon S3 使用者指南》中的建立儲存貯體。使用全域唯一名稱命名 Amazon S3 儲存貯體,例如透過附加您的登入名稱。
注意
請確定您在用於本教學課程的 區域中建立儲存貯體 (us-east-1)。
其他資源
當您建立應用程式時,Managed Service for Apache Flink 會在資源不存在時自動建立下列 Amazon CloudWatch 資源:
-
名為
/AWS/KinesisAnalytics-java/<my-application>的日誌群組。 -
名為
kinesis-analytics-log-stream的日誌串流。
設定您的本機開發環境
對於開發和偵錯,您可以直接從您選擇的 IDE 在機器上執行 Apache Flink 應用程式。任何 Apache Flink 相依性都會像一般 Java 相依性一樣使用 Apache Maven 來處理。
注意
在您的開發機器上,您必須安裝 Java JDK 11、Maven 和 Git。我們建議您使用開發環境,例如 Eclipse Java Neon
驗證您的 AWS 工作階段
應用程式使用 Kinesis 資料串流來發佈資料。在本機執行時,您必須擁有有效的已 AWS 驗證工作階段,具有寫入 Kinesis 資料串流的許可。使用下列步驟來驗證您的工作階段:
-
如果您沒有設定 AWS CLI 和具有有效登入資料的具名設定檔,請參閱 設定 AWS Command Line Interface (AWS CLI)。
-
透過發佈下列測試記錄,確認您的 AWS CLI 已正確設定,且您的使用者具有寫入 Kinesis 資料串流的許可:
$ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST -
如果您的 IDE 有要整合的外掛程式 AWS,您可以使用它將登入資料傳遞至在 IDE 中執行的應用程式。如需詳細資訊,請參閱 AWS Toolkit for IntelliJ IDEA
和 AWS Toolkit for Eclipse。
下載並檢查 Apache Flink 串流 Java 程式碼
此範例的 Java 應用程式的程式碼可從 GitHub 下載。若要下載應用程式的程式碼,請執行下列動作:
-
使用以下指令複製遠端儲存庫:
git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git -
導覽至
amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted目錄。
檢閱應用程式元件
應用程式完全在 com.amazonaws.services.msf.BasicStreamingJob類別中實作。main() 方法定義處理串流資料和執行資料的資料流程。
注意
為了獲得最佳化的開發人員體驗,應用程式設計為在 Amazon Managed Service for Apache Flink 和本機上執行,在 IDE 中進行開發時不會有任何程式碼變更。
-
若要讀取執行時間組態,以便在 Amazon Managed Service for Apache Flink 和 IDE 中執行時運作,應用程式會自動偵測它是否在 IDE 中於本機獨立執行。在這種情況下,應用程式會以不同的方式載入執行期組態:
-
當應用程式偵測到其在您的 IDE 中以獨立模式執行時,請形成包含在專案資源資料夾中
application_properties.json的檔案。檔案的內容如下。 -
當應用程式在 Amazon Managed Service for Apache Flink 中執行時,預設行為會從您在 Amazon Managed Service for Apache Flink 應用程式中定義的執行期屬性載入應用程式組態。請參閱 建立和設定 Managed Service for Apache Flink 應用程式。
private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
-
-
main()方法定義應用程式資料流程並執行它。-
初始化預設串流環境。在此範例中,我們會示範如何建立
StreamExecutionEnvironment要與 DataSteam API 搭配使用的 ,以及StreamTableEnvironment要與 SQL 和資料表 API 搭配使用的 。兩個環境物件是相同執行時間環境的兩個不同參考,以使用不同的 APIs。StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -
載入應用程式組態參數。這會自動從正確的位置載入它們,視應用程式執行的位置而定:
Map<String, Properties> applicationParameters = loadApplicationProperties(env); -
應用程式使用 Kinesis Consumer
連接器定義來源,以從輸入串流讀取資料。輸入串流的組態在 PropertyGroupId= 中定義InputStream0。串流的名稱和區域分別位於名為stream.name和 的屬性中aws.region。為了簡化,此來源會將記錄讀取為字串。private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) { String inputStreamName = inputProperties.getProperty("stream.name"); return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties); } ... public static void main(String[] args) throws Exception { ... SourceFunction<String> source = createSource(applicationParameters.get("InputStream0")); DataStream<String> input = env.addSource(source, "Kinesis Source"); ... } -
應用程式接著會使用 Kinesis Streams Sink
連接器定義接收器,將資料傳送至輸出串流。輸出串流名稱和區域在 PropertyGroupId= 中定義OutputStream0,類似於輸入串流。接收器會直接連接到從來源DataStream取得資料的 內部。在實際的應用程式中,您會在來源和接收之間進行一些轉換。private static KinesisStreamsSink<String> createSink(Properties outputProperties) { String outputStreamName = outputProperties.getProperty("stream.name"); return KinesisStreamsSink.<String>builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputStreamName) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } ... public static void main(String[] args) throws Exception { ... Sink<String> sink = createSink(applicationParameters.get("OutputStream0")); input.sinkTo(sink); ... } -
最後,執行您剛定義的資料流程。這必須是
main()方法的最後一個指示,在您定義所有運算子之後,資料流程需要:env.execute("Flink streaming Java API skeleton");
-
使用 pom.xml 檔案
pom.xml 檔案會定義應用程式所需的所有相依性,並設定 Maven Shade 外掛程式來建置包含 Flink 所需所有相依性的 fat-jar。
-
有些相依性具有
provided範圍。當應用程式在 Amazon Managed Service for Apache Flink 中執行時,這些相依性會自動可用。它們需要用來編譯應用程式,或在 IDE 中本機執行應用程式。如需詳細資訊,請參閱在本機執行您的應用程式。請確定您使用的 Flink 版本與您在 Amazon Managed Service for Apache Flink 中使用的執行時間相同。<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
-
您必須使用預設範圍將額外的 Apache Flink 相依性新增至 pom,例如此應用程式使用的 Kinesis 連接器
。如需詳細資訊,請參閱使用 Apache Flink 連接器。您也可以新增應用程式所需的任何其他 Java 相依性。 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>${aws.connector.version}</version> </dependency>
-
Maven Java 編譯器外掛程式可確保程式碼是根據 Apache Flink 目前支援的 JDK 版本 Java 11 編譯。
-
Maven Shade 外掛程式會封裝 fat-jar,不包括執行時間提供的一些程式庫。它也會指定兩個轉換器:
ServicesResourceTransformer和ManifestResourceTransformer。後者會設定 類別,其中包含啟動應用程式的main方法。如果您重新命名主類別,請不要忘記更新此轉換器。 -
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>
將範例記錄寫入輸入串流
在本節中,您將傳送範例記錄到串流,供應用程式處理。您可以使用 Python 指令碼或 Kinesis Data Generator
使用 Python 指令碼產生範例資料
您可以使用 Python 指令碼將範例記錄傳送至串流。
注意
若要執行此 Python 指令碼,您必須使用 Python 3.x 並安裝AWS 適用於 Python (Boto) 的 SDK
若要開始將測試資料傳送至 Kinesis 輸入串流:
-
從資料產生器 GitHub 儲存庫下載資料產生器
stock.pyPython 指令碼。 -
執行
stock.py指令碼:$ python stock.py
在您完成教學課程的其餘部分時,請讓指令碼保持執行。您現在可以執行 Apache Flink 應用程式。
使用 Kinesis Data Generator 產生範例資料
或者,若要使用 Python 指令碼,您可以使用託管版本
若要設定和執行 Kinesis Data Generator:
-
遵循 Kinesis Data Generator 文件
中的指示來設定對工具的存取。您將執行設定使用者和密碼的 CloudFormation 範本。 -
透過 CloudFormation 範本產生的 URL 存取 Kinesis Data Generator。CloudFormation 範本完成後,您可以在輸出索引標籤中找到 URL。
-
設定資料產生器:
-
區域:選取您用於本教學課程的區域:us-east-1
-
串流/交付串流:選取應用程式將使用的輸入串流:
ExampleInputStream -
每秒記錄數:100
-
記錄範本:複製並貼上下列範本:
{ "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
-
-
測試範本:選擇測試範本,並確認產生的記錄與以下內容類似:
{ "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 } -
啟動資料產生器:選擇選取傳送資料。
Kinesis Data Generator 現在正在將資料傳送至 ExampleInputStream。
在本機執行您的應用程式
您可以在 IDE 中的本機執行和偵錯 Flink 應用程式。
注意
繼續之前,請確認輸入和輸出串流是否可用。請參閱 建立兩個 Amazon Kinesis 資料串流。此外,請確認您具有從兩個串流讀取和寫入的許可。請參閱 驗證您的 AWS 工作階段。
設定本機開發環境需要 Java 11 JDK、Apache Maven 和 以及適用於 Java 開發的 IDE。確認您符合必要的先決條件。請參閱 滿足完成練習的先決條件。
將 Java 專案匯入 IDE
若要開始在 IDE 中使用應用程式,您必須將其匯入為 Java 專案。
您複製的儲存庫包含多個範例。每個範例都是個別的專案。在本教學課程中,將./java/GettingStarted子目錄中的內容匯入 IDE。
使用 Maven 將程式碼插入為現有的 Java 專案。
注意
匯入新 Java 專案的確切程序取決於您使用的 IDE。
檢查本機應用程式組態
在本機執行時,應用程式會使用 下專案資源資料夾中 application_properties.json 檔案中的組態./src/main/resources。您可以編輯此檔案以使用不同的 Kinesis 串流名稱或區域。
[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]
設定 IDE 執行組態
您可以執行主要類別 ,直接從 IDE 執行和偵錯 Flink 應用程式com.amazonaws.services.msf.BasicStreamingJob,就像執行任何 Java 應用程式一樣。在執行應用程式之前,您必須設定執行組態。設定取決於您使用的 IDE。例如,請參閱 IntelliJ IDEA 文件中的執行/偵錯組態
-
將
provided相依性新增至 classpath。這是必要的,以確保在本機執行時,具有provided範圍的相依性會傳遞至應用程式。如果沒有此設定,應用程式會立即顯示class not found錯誤。 -
傳遞 AWS 登入資料以存取應用程式的 Kinesis 串流。最快的方法是使用 AWS Toolkit for IntelliJ IDEA
。在執行組態中使用此 IDE 外掛程式,您可以選取特定 AWS 設定檔。 AWS 身分驗證會使用此設定檔進行。您不需要直接傳遞 AWS 登入資料。 -
確認 IDE 使用 JDK 11 執行應用程式。
在 IDE 中執行應用程式
設定 的執行組態後BasicStreamingJob,您可以像一般 Java 應用程式一樣執行或偵錯組態。
注意
您無法java -jar ...直接從命令列使用 執行 Maven 產生的 fat-jar。此 jar 不包含獨立執行應用程式所需的 Flink 核心相依性。
當應用程式成功啟動時,它會記錄有關獨立迷你叢集和連接器初始化的一些資訊。這後面接著一些 INFO 和一些 WARN 日誌,Flink 通常會在應用程式啟動時發出。
13:43:31,405 INFO com.amazonaws.services.msf.BasicStreamingJob [] - Loading application properties from 'flink-application-properties-dev.json' 13:43:31,549 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 13:43:31,677 INFO org.apache.flink.runtime.minicluster.MiniCluster [] - Starting Flink Mini Cluster ....
初始化完成後,應用程式不會發出任何進一步的日誌項目。資料流動時,不會發出日誌。
若要驗證應用程式是否正確處理資料,您可以檢查輸入和輸出 Kinesis 串流,如下節所述。
注意
不發出有關流動資料的日誌是 Flink 應用程式的正常行為。在每個記錄上發出日誌對於偵錯可能很方便,但在生產環境中執行時可能會增加大量的額外負荷。
觀察 Kinesis 串流中的輸入和輸出資料
您可以使用 Amazon Kinesis 主控台中的資料檢視器,觀察 (產生範例 Python) 或 Kinesis Data Generator (連結) 傳送至輸入串流的記錄。 Amazon Kinesis
若要觀察記錄
在以下網址開啟 Kinesis 主控台:https://console.aws.amazon.com/kinesis
。 -
確認您執行本教學課程的區域相同,預設為 us-east-1 美國東部 (維吉尼亞北部)。如果區域不相符,請變更區域。
-
選擇資料串流。
-
選取您要觀察的串流,或
ExampleInputStreamExampleOutputStream. -
選擇資料檢視器標籤。
-
選擇任何碎片,保持最新為開始位置,然後選擇取得記錄。您可能會看到「找不到此請求的記錄」錯誤。若是如此,請選擇重試取得記錄。發佈至串流顯示的最新記錄。
-
選擇資料欄中的值,以 JSON 格式檢查記錄的內容。
停止應用程式在本機執行
停止在 IDE 中執行的應用程式。IDE 通常提供「停止」選項。確切的位置和方法取決於您使用的 IDE。
編譯和封裝您的應用程式程式碼
在本節中,您會使用 Apache Maven 編譯 Java 程式碼,並將其封裝成 JAR 檔案。您可以使用 Maven 命令列工具或 IDE 來編譯和封裝程式碼。
若要使用 Maven 命令列編譯和封裝:
移至包含 Java GettingStarted 專案的目錄,並執行下列命令:
$ mvn package
若要使用 IDE 編譯和封裝:
mvn package 從 IDE Maven 整合執行 。
在這兩種情況下,都會建立下列 JAR 檔案:target/amazon-msf-java-stream-app-1.0.jar。
注意
從 IDE 執行「建置專案」可能不會建立 JAR 檔案。
上傳應用程式碼 JAR 檔案
在本節中,您將您在上一節中建立的 JAR 檔案上傳至在本教學課程開始時建立的 Amazon Simple Storage Service (Amazon S3) 儲存貯體。如果您尚未完成此步驟,請參閱 (連結)。
上傳應用程式碼 JAR 檔案
開啟位於 https://console.aws.amazon.com/s3/
的 Amazon S3 主控台。 -
選擇您先前為應用程式碼建立的儲存貯體。
-
選擇上傳。
-
選擇 Add files (新增檔案)。
-
導覽至上一個步驟中產生的 JAR 檔案:
target/amazon-msf-java-stream-app-1.0.jar。 -
選擇上傳,而不變更任何其他設定。
警告
請務必在 中選取正確的 JAR 檔案<repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar。
target 目錄也包含您不需要上傳的其他 JAR 檔案。
建立和設定 Managed Service for Apache Flink 應用程式
您可以使用主控台或 AWS CLI建立和執行 Managed Service for Apache Flink 應用程式。在本教學課程中,您將使用 主控台。
注意
當您使用 主控台建立應用程式時,系統會為您建立 AWS Identity and Access Management (IAM) 和 Amazon CloudWatch Logs 資源。當您使用 建立應用程式時 AWS CLI,您可以分別建立這些資源。
建立應用程式
建立應用程式
登入 AWS 管理主控台,並在 https://https://console.aws.amazon.com/flink 開啟 Amazon MSF 主控台。
-
確認已選取正確的區域:us-east-1 美國東部 (維吉尼亞北部)
-
開啟右側的選單,然後選擇 Apache Flink 應用程式,然後選擇建立串流應用程式。或者,在初始頁面的入門容器中選擇建立串流應用程式。
-
在建立串流應用程式頁面上:
-
選擇設定串流處理應用程式的方法:選擇從頭開始建立。
-
Apache Flink 組態、Application Flink 版本:選擇 Apache Flink 1.20。
-
-
設定您的應用程式
-
應用程式名稱:輸入
MyApplication。 -
描述:輸入
My java test app。 -
存取應用程式資源:選擇
kinesis-analytics-MyApplication-us-east-1使用必要政策建立/更新 IAM 角色。
-
-
設定您的範本以進行應用程式設定
-
範本:選擇開發。
-
-
選擇頁面底部的建立串流應用程式。
注意
使用主控台建立 Managed Service for Apache Flink 應用程式時,可以選擇是否為應用程式建立 IAM 角色和政策。應用程式使用此角色和政策來存取其相依資源。這些 IAM 資源會如下所述使用您的應用程式名稱和區域命名:
-
政策:
kinesis-analytics-service-MyApplication-us-east-1 -
角色:
kinesisanalytics-MyApplication-us-east-1
Amazon Managed Service for Apache Flink 先前稱為 Kinesis Data Analytics。自動建立的資源名稱會加上 字首,kinesis-analytics-以確保回溯相容性。
編輯 IAM 政策
編輯 IAM 政策來新增存取 Kinesis 資料串流的許可。
編輯政策
前往 https://console.aws.amazon.com/iam/
開啟 IAM 主控台。 -
選擇政策。選擇主控台為您在上一節所建立的
kinesis-analytics-service-MyApplication-us-east-1政策。 -
選擇編輯,然後選擇 JSON 索引標籤。
-
將下列政策範例的反白部分新增至政策。使用您的帳戶 ID 取代範例帳戶 ID (
012345678901)。 -
選擇頁面底部的下一步,然後選擇儲存變更。
設定應用程式
編輯應用程式組態以設定應用程式程式碼成品。
編輯組態
-
在 MyApplication 頁面,選擇設定。
-
在應用程式碼位置區段中:
-
針對 Amazon S3 儲存貯體,選取您先前為應用程式碼建立的儲存貯體。選擇瀏覽並選取正確的儲存貯體,然後選取選擇。請勿按一下儲存貯體名稱。
-
對於 Amazon S3 物件的路徑,請輸入
amazon-msf-java-stream-app-1.0.jar。
-
-
針對存取許可,選擇
kinesis-analytics-MyApplication-us-east-1使用必要政策建立/更新 IAM 角色。 -
在執行期屬性區段中,新增下列屬性。
-
選擇新增項目並新增下列每個參數:
群組 ID 金鑰 值 InputStream0stream.nameExampleInputStreamInputStream0aws.regionus-east-1OutputStream0stream.nameExampleOutputStreamOutputStream0aws.regionus-east-1 -
請勿修改任何其他區段。
-
選擇儲存變更。
注意
當您選擇啟用 Amazon CloudWatch 日誌時,Managed Service for Apache Flink 便會為您建立日誌群組和日誌串流。這些資源的名稱如下所示:
-
日誌群組:
/aws/kinesis-analytics/MyApplication -
日誌串流:
kinesis-analytics-log-stream
執行應用程式
應用程式現在已設定並準備好執行。
執行應用程式
-
在 Amazon Managed Service for Apache Flink 的 主控台上,選擇我的應用程式,然後選擇執行。
-
在下一頁的應用程式還原組態頁面上,選擇使用最新的快照執行,然後選擇執行。
應用程式詳細資訊中的狀態會從
Ready轉換為Starting,然後在應用程式啟動Running時轉換為 。
當應用程式處於 Running 狀態時,您現在可以開啟 Flink 儀表板。
開啟 儀表板
-
選擇開啟 Apache Flink 儀表板。儀表板會在新頁面上開啟。
-
在執行中任務清單中,選擇您可以看到的單一任務。
注意
如果您設定執行期屬性或不正確地編輯 IAM 政策,應用程式狀態可能會變成
Running,但 Flink 儀表板會顯示任務正在持續重新啟動。如果應用程式設定錯誤或缺少存取外部資源的許可,這是常見的失敗案例。發生這種情況時,請檢查 Flink 儀表板中的例外狀況索引標籤,以查看問題的原因。
觀察執行中應用程式的指標
在 MyApplication 頁面的 Amazon CloudWatch 指標區段中,您可以從執行中的應用程式查看一些基本指標。
檢視指標
-
在重新整理按鈕旁,從下拉式清單中選取 10 秒。
-
當應用程式執行正常時,您可以看到運作時間指標持續增加。
-
fullrestarts 指標應為零。如果增加,組態可能會發生問題。若要調查問題,請檢閱 Flink 儀表板上的例外狀況索引標籤。
-
在運作狀態良好的應用程式中,失敗檢查點指標的數量應為零。
注意
此儀表板會顯示一組固定的指標,精細程度為 5 分鐘。您可以使用 CloudWatch 儀表板中的任何指標來建立自訂應用程式儀表板。
觀察 Kinesis 串流中的輸出資料
請確定您仍在使用 Python 指令碼或 Kinesis Data Generator 將資料發佈至輸入。
您現在可以使用 https://console.aws.amazon.com/kinesis/
檢視輸出
在以下網址開啟 Kinesis 主控台:https://console.aws.amazon.com/kinesis
。 -
確認區域與您用來執行本教學課程的區域相同。根據預設,它是 us-east-1US East (N. Virginia)。視需要變更 區域。
-
選擇資料串流。
-
選取您要觀察的串流。在本教學課程中,使用
ExampleOutputStream。 -
選擇資料檢視器標籤。
-
選取任何碎片,保持最新為開始位置,然後選擇取得記錄。您可能會看到「找不到此請求的記錄」錯誤。若是如此,請選擇重試取得記錄。發佈至串流顯示的最新記錄。
-
選取資料欄中的值,以 JSON 格式檢查記錄的內容。
停止應用程式
若要停止應用程式,請前往名為 的 Managed Service for Apache Flink 應用程式的主控台頁面MyApplication。
停止應用程式
-
從動作下拉式清單中,選擇停止。
-
應用程式詳細資訊中的狀態會從 轉換為
RunningStopping,然後在應用程式完全停止Ready時轉換為 。注意
別忘了也要停止從 Python 指令碼或 Kinesis Data Generator 將資料傳送至輸入串流。