

在仔細考慮之後，我們決定停止 Amazon Kinesis Data Analytics for SQL 應用程式：

1. 從 **2025 年 9 月 1 日起，**我們不會為 Amazon Kinesis Data Analytics for SQL 應用程式提供任何錯誤修正，因為考慮到即將終止，我們將對其提供有限的支援。

2. 從 **2025 年 10 月 15 日起，**您將無法建立新的 Kinesis Data Analytics for SQL 應用程式。

3. 我們將自 **2026 年 1 月 27** 日起刪除您的應用程式。您將無法啟動或操作 Amazon Kinesis Data Analytics for SQL 應用程式。從那時起，Amazon Kinesis Data Analytics for SQL 將不再提供支援。如需詳細資訊，請參閱[Amazon Kinesis Data Analytics for SQL 應用程式終止](discontinuation.md)。

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 範例：視窗與彙總
<a name="examples-window"></a>

本節提供使用視窗語彙總查詢的 Amazon Kinesis Data Analytics 應用程式範例。(如需詳細資訊，請參閱 [窗口化查詢](windowed-sql.md)。) 每個範例皆包含逐步指示與範例程式碼，協助您建立 Kinesis Data Analytics 應用程式並測試結果。

**Topics**
+ [範例：交錯視窗](examples-window-stagger.md)
+ [範例：使用列時間的輪轉窗口](examples-window-tumbling-rowtime.md)
+ [範例：使用事件時間戳記的輪轉窗口](examples-window-tumbling-event.md)
+ [範例：擷取最常發生的值 (TOP\$1K\$1ITEMS\$1TUMBLING)](examples-window-topkitems.md)
+ [範例：從查詢彙總部分結果](examples-window-partialresults.md)

# 範例：交錯視窗
<a name="examples-window-stagger"></a>

當視窗化查詢為每個不同的分割區索引鍵處理個別視窗時，從具有相符索引鍵的資料到達時開始，即為*交錯視窗*。如需詳細資訊，請參閱 [交錯窗口](stagger-window-concepts.md)。這個 Amazon Kinesis Data Analytics 範例使用 EVENT\$1TIME 和 TICKER 欄來建立交錯視窗。來源串流包含六個記錄的群組，其中有相同的 EVENT\$1TIME 和 TICKER 值，這些資料會在一分鐘內到達，但不一定具有相同的分鐘值 (例如 `18:41:xx`)。

在此範例中，將下列記錄於指定時間寫入 Amazon Kinesis 資料串流。指令碼不會將時間寫入串流，但應用程式擷取記錄的時間會寫入 `ROWTIME` 欄位：

```
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:17:30
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:17:40
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:17:50
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:18:00
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:18:10
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:18:21
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:18:31
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:18:41
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:18:51
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:19:01
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:19:11
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:19:21
...
```



然後，您可以在 中建立 Kinesis Data Analytics 應用程式 AWS 管理主控台，並將 Kinesis 資料串流做為串流來源。探索程序會讀取串流來源上的範例記錄，並推斷含有兩個資料欄 (`EVENT_TIME` 和 `TICKER`) 的應用程式內結構描述，如下所示。

![\[顯示應用程式內結構描述的主控台螢幕擷取畫面，其中包含價格和股票代碼欄。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_stagger_schema.png)


您可以將應用程式碼與 `COUNT` 函數搭配使用，以建立資料的視窗化彙總。接著將產生的資料插入另一個應用程式內串流，如下列螢幕擷取畫面所示：



![\[顯示應用程式內串流生成資料的主控台螢幕擷取畫面。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_stagger.png)


在下列程序中，您會建立 Kinesis Data Analytics 應用程式，根據 EVENT\$1TIME 和 TICKER，在交錯視窗彙總輸入串流中的值。

**Topics**
+ [步驟 1：建立 Kinesis Data Stream](#examples-stagger-window-1)
+ [步驟 2：建立 Kinesis Data Analytics 應用程式](#examples-stagger-window-2)

## 步驟 1：建立 Kinesis Data Stream
<a name="examples-stagger-window-1"></a>

建立 Amazon Kinesis 資料串流，並填入紀錄，如下所示：

1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。

1. 在導覽窗格中選擇**資料串流**。

1. 選擇**建立 Kinesis 串流**，然後建立內含一個碎片之串流。如需詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[建立串流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

1. 若要在生產環境中將記錄寫入 Kinesis 資料串流，建議您使用 [Kinesis Producer Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) 或 [Kinesis Data Streams API](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html)。為了簡單起見，這個例子使用下面的 Python 指令碼來生成記錄。執行程式碼以填入範例股票代號記錄。這個簡單的程式碼會在一分鐘的時間內，連續將具有相同隨機 `EVENT_TIME` 和股票代號的六筆記錄寫入串流。讓指令碼持續執行，以便在稍後的步驟產生應用程式結構描述。

   ```
    
   import datetime
   import json
   import random
   import time
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       event_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=10)
       return {
           "EVENT_TIME": event_time.isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           # Send six records, ten seconds apart, with the same event time and ticker
           for _ in range(6):
               print(data)
               kinesis_client.put_record(
                   StreamName=stream_name,
                   Data=json.dumps(data),
                   PartitionKey="partitionkey",
               )
               time.sleep(10)
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 步驟 2：建立 Kinesis Data Analytics 應用程式
<a name="examples-stagger-window-2"></a>

建立 Kinesis Data Analytics 應用程式，如下所示。

1. 前往 [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics) 開啟 Managed Service for Apache Flink 主控台。

1. 選擇**建立應用程式**，輸入應用程式名稱，然後選擇**建立應用程式**。

1. 在應用程式詳細資料頁面上，選擇**連接串流資料**來連接至來源。

1. 在**連接至來源**頁面，執行下列動作：

   

   1. 選擇您在上一節建立的串流。

   1. 選擇**探索結構描述**。等待主控台顯示推斷的結構描述和範例記錄，這些記錄可用來推斷應用程式內串流所建立的結構描述。推斷的結構描述有兩個資料欄。

   1. 選擇**編輯結構描述**。將 **EVENT\$1TIME** 欄的**欄類型**變更為 `TIMESTAMP`。

   1. 選擇**儲存結構描述並更新串流範例**。主控台儲存結構描述後，選擇**結束**。

   1. 選擇**儲存並繼續**。

1. 在應用程式詳細資訊頁面上，選擇**至 SQL 編輯器**。若要啟動應用程式，請在出現的對話方塊中選擇**是，啟動應用程式**。

1. 在 SQL 編輯器中，編寫應用程式碼並驗證結果，如下所示：

   1. 請複製以下應用程式碼，然後貼到編輯器中。

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
          event_time TIMESTAMP,
          ticker_symbol    VARCHAR(4),
          ticker_count     INTEGER);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
        INSERT INTO "DESTINATION_SQL_STREAM" 
          SELECT STREAM 
              EVENT_TIME, 
              TICKER,
              COUNT(TICKER) AS ticker_count
          FROM "SOURCE_SQL_STREAM_001"
          WINDOWED BY STAGGER (
                  PARTITION BY TICKER, EVENT_TIME RANGE INTERVAL '1' MINUTE);
      ```

   1. 選擇 **儲存並執行 SQL**。

      在**即時分析**標籤上，您可以查看應用程式建立的所有應用程式內串流，並驗證資料。

# 範例：使用列時間的輪轉窗口
<a name="examples-window-tumbling-rowtime"></a>

當窗口查詢以非重疊的方式處理每個窗口時，即稱作*輪轉窗口*。如需詳細資訊，請參閱[輪轉窗口（使用 GROUP BY 彙總）](tumbling-window-concepts.md)。此 Amazon Kinesis Data Analytics 範例使用 `ROWTIME` 資料欄來建立輪轉窗口。該 `ROWTIME` 欄示應用程式讀取記錄的時間。

在此範例中，將下列記錄寫入 Amazon Kinesis 資料串流。

```
{"TICKER": "TBV", "PRICE": 33.11}
{"TICKER": "INTC", "PRICE": 62.04}
{"TICKER": "MSFT", "PRICE": 40.97}
{"TICKER": "AMZN", "PRICE": 27.9}
...
```



然後，您可以在 中建立 Kinesis Data Analytics 應用程式 AWS 管理主控台，並將 Kinesis 資料串流做為串流來源。探索程序會讀取串流來源上的範例記錄，並推斷含有兩個資料欄 (`TICKER` 和 `PRICE`) 的應用程式內結構描述，如下所示。

![\[顯示應用程式內結構描述的主控台螢幕擷取畫面，其中包含價格和股票代碼欄。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_tumbling_rowtime_schema.png)


您可以將應用程式碼與 `MIN` 和 `MAX` 函數搭配使用，以建立資料的窗口化彙總。接著將產生的資料插入另一個應用程式內串流，如下列螢幕擷取畫面所示：



![\[顯示應用程式內串流生成資料的主控台螢幕擷取畫面。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_tumbling_rowtime.png)


在下列程序中，建立 Kinesis Data Analytics 應用程式，以根據 ROWTIME 在輪轉窗口彙總輸入串流中的值。

**Topics**
+ [步驟 1：建立 Kinesis Data Stream](#examples-tumbling-window-1)
+ [步驟 2：建立 Kinesis Data Analytics 應用程式](#examples-tumbling-window-2)

## 步驟 1：建立 Kinesis Data Stream
<a name="examples-tumbling-window-1"></a>

建立 Amazon Kinesis 資料串流，並填入紀錄，如下所示：

1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。

1. 在導覽窗格中選擇**資料串流**。

1. 選擇**建立 Kinesis 串流**，然後建立內含一個碎片之串流。如需詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[建立串流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

1. 若要在生產環境中將記錄寫入 Kinesis 資料串流，建議您使用 [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html)或 [Kinesis Data Streams API](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html)。為了簡單起見，這個例子使用下面的 Python 指令碼來生成記錄。執行程式碼以填入範例股票代號記錄。這個簡單的程式碼會持續將隨機股票代號記錄寫入串流。讓指令碼持續執行，以便在稍後的步驟產生應用程式結構描述。

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 步驟 2：建立 Kinesis Data Analytics 應用程式
<a name="examples-tumbling-window-2"></a>

建立 Kinesis Data Analytics 應用程式，如下所示。

1. 前往 [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics) 開啟 Managed Service for Apache Flink 主控台。

1. 選擇**建立應用程式**，輸入應用程式名稱，然後選擇**建立應用程式**。

1. 在應用程式詳細資料頁面上，選擇**連接串流資料**來連接至來源。

1. 在**連接至來源**頁面，執行下列動作：

   

   1. 選擇您在上一節建立的串流。

   1. 選擇**探索結構描述**。等待主控台顯示推斷的結構描述和範例記錄，這些記錄可用來推斷應用程式內串流所建立的結構描述。推斷的結構描述有兩個資料欄。

   1. 選擇**儲存結構描述並更新串流範例**。主控台儲存結構描述後，選擇**結束**。

   1. 選擇**儲存並繼續**。

1. 在應用程式詳細資訊頁面上，選擇**至 SQL 編輯器**。若要啟動應用程式，請在出現的對話方塊中選擇**是，啟動應用程式**。

1. 在 SQL 編輯器中，編寫應用程式碼並驗證結果，如下所示：

   1. 請複製以下應用程式碼，然後貼到編輯器中。

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (TICKER VARCHAR(4), MIN_PRICE REAL, MAX_PRICE REAL);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
          SELECT STREAM TICKER, MIN(PRICE), MAX(PRICE)
              FROM "SOURCE_SQL_STREAM_001"
              GROUP BY TICKER, 
                  STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);
      ```

   1. 選擇 **儲存並執行 SQL**。

      在**即時分析**標籤上，您可以查看應用程式建立的所有應用程式內串流，並驗證資料。

# 範例：使用事件時間戳記的輪轉窗口
<a name="examples-window-tumbling-event"></a>

當窗口查詢以非重疊的方式處理每個窗口時，即稱作*輪轉窗口*。如需詳細資訊，請參閱[輪轉窗口（使用 GROUP BY 彙總）](tumbling-window-concepts.md)。這個 Amazon Kinesis Data Analytics 範例示範使用事件時間戳記 (即使用者建立，包含在串流資料中的時間戳記) 的輪轉窗口。範例中使用這種方法，而非僅使用 ROWTIME，即 Kinesis Data Analytics 在應用程式收到記錄時所建立的時間戳記。如果您想要根據事件發生的時間 (而非應用程式收到事件的時間) 建立彙總，可在串流資料中使用事件時間戳記。在此範例中，`ROWTIME` 值會每分鐘觸發彙總，而記錄會依照 `ROWTIME` 和包含的事件時間兩者進行彙總。

在此範例中，將下列記錄寫入 Amazon Kinesis 串流：此 `EVENT_TIME` 值設定為過去 5 秒，以模擬處理和傳輸延遲，這些延遲可能會造成事件發生與記錄擷取至 Kinesis Data Analytics 的時間差。

```
{"EVENT_TIME": "2018-06-13T14:11:05.766191", "TICKER": "TBV", "PRICE": 43.65}
{"EVENT_TIME": "2018-06-13T14:11:05.848967", "TICKER": "AMZN", "PRICE": 35.61}
{"EVENT_TIME": "2018-06-13T14:11:05.931871", "TICKER": "MSFT", "PRICE": 73.48}
{"EVENT_TIME": "2018-06-13T14:11:06.014845", "TICKER": "AMZN", "PRICE": 18.64}
...
```



然後，您可以在 中建立 Kinesis Data Analytics 應用程式 AWS 管理主控台，並將 Kinesis 資料串流做為串流來源。探索程序會讀取串流來源上的範例記錄，並推斷含有三個資料欄 (`EVENT_TIME`、`TICKER` 和`PRICE`) 的應用程式內結構描述，如下所示。

![\[顯示應用程式內結構描述的主控台螢幕擷取畫面，其中包含事件時間、股票代號以及價格欄。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_tumbling_event_schema.png)


您可以將應用程式碼與 `MIN` 和 `MAX` 函數搭配使用，以建立資料的窗口化彙總。接著將產生的資料插入另一個應用程式內串流，如下列螢幕擷取畫面所示：



![\[顯示應用程式內串流生成資料的主控台螢幕擷取畫面。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_tumbling_event.png)


在下列程序中，建立 Kinesis Data Analytics 應用程式，該應用程式會根據事件時間在輪轉窗口中彙總輸入串流的值。

**Topics**
+ [步驟 1：建立 Kinesis Data Stream](#examples-window-tumbling-event-1)
+ [步驟 2：建立 Kinesis Data Analytics 應用程式](#examples-window-tumbling-event-2)

## 步驟 1：建立 Kinesis Data Stream
<a name="examples-window-tumbling-event-1"></a>

建立 Amazon Kinesis 資料串流，並填入紀錄，如下所示：

1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。

1. 在導覽窗格中選擇**資料串流**。

1. 選擇**建立 Kinesis 串流**，然後建立內含一個碎片之串流。如需詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[建立串流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

1. 若要在生產環境中將記錄寫入 Kinesis 資料串流，建議您使用 [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html)或 [Kinesis Data Streams API](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html)。為了簡單起見，這個例子使用下面的 Python 指令碼來生成記錄。執行程式碼以填入範例股票代號記錄。這個簡單的程式碼會持續將隨機股票代號記錄寫入串流。讓指令碼持續執行，以便在稍後的步驟產生應用程式結構描述。

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 步驟 2：建立 Kinesis Data Analytics 應用程式
<a name="examples-window-tumbling-event-2"></a>

建立 Kinesis Data Analytics 應用程式，如下所示。

1. 前往 [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics) 開啟 Managed Service for Apache Flink 主控台。

1. 選擇**建立應用程式**，輸入應用程式名稱，然後選擇**建立應用程式**。

1. 在應用程式詳細資料頁面上，選擇**連接串流資料**來連接至來源。

1. 在**連接至來源**頁面，執行下列動作：

   

   1. 選擇您在上一節建立的串流。

   1. 選擇**探索結構描述**。等待主控台顯示推斷的結構描述和範例記錄，這些記錄可用來推斷應用程式內串流所建立的結構描述。推斷的結構描述有三個資料欄。

   1. 選擇**編輯結構描述**。將 **EVENT\$1TIME** 欄的**欄類型**變更為 `TIMESTAMP`。

   1. 選擇**儲存結構描述並更新串流範例**。主控台儲存結構描述後，選擇**結束**。

   1. 選擇**儲存並繼續**。

1. 在應用程式詳細資訊頁面上，選擇**至 SQL 編輯器**。若要啟動應用程式，請在出現的對話方塊中選擇**是，啟動應用程式**。

1. 在 SQL 編輯器中，編寫應用程式碼並驗證結果，如下所示：

   1. 請複製以下應用程式碼，然後貼到編輯器中。

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME timestamp, TICKER VARCHAR(4), min_price REAL, max_price REAL);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
        INSERT INTO "DESTINATION_SQL_STREAM" 
          SELECT STREAM STEP("SOURCE_SQL_STREAM_001".EVENT_TIME BY INTERVAL '60' SECOND),
              TICKER,
               MIN(PRICE) AS MIN_PRICE,
               MAX(PRICE) AS MAX_PRICE
          FROM    "SOURCE_SQL_STREAM_001"
          GROUP BY TICKER, 
                   STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND), 
                   STEP("SOURCE_SQL_STREAM_001".EVENT_TIME BY INTERVAL '60' SECOND);
      ```

   1. 選擇 **儲存並執行 SQL**。

      在**即時分析**標籤上，您可以查看應用程式建立的所有應用程式內串流，並驗證資料。

# 範例：擷取最常發生的值 (TOP\$1K\$1ITEMS\$1TUMBLING)
<a name="examples-window-topkitems"></a>

這個 Amazon Kinesis Data Analytics 範例示範如何使用 `TOP_K_ITEMS_TUMBLING` 函數擷取輪轉窗口中最常出現的值。如需詳細資訊，請參閱《Amazon Managed Service for Apache Flink SQL 參考資料》**中的 [`TOP_K_ITEMS_TUMBLING` 函數](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/top-k.html)。

此 `TOP_K_ITEMS_TUMBLING` 功能在彙總超過數萬或數十萬個金鑰，且想要減少資源使用量時非常有用。該函數產生與 `GROUP BY` 和 `ORDER BY` 子句匯總相同的結果。

在此範例中，將下列記錄寫入 Amazon Kinesis 資料串流：

```
{"TICKER": "TBV"}
{"TICKER": "INTC"}
{"TICKER": "MSFT"}
{"TICKER": "AMZN"}
...
```



然後，您可以在 中建立 Kinesis Data Analytics 應用程式 AWS 管理主控台，並將 Kinesis 資料串流做為串流來源。探索程序會讀取串流來源上的範例記錄，並以一個資料欄 (`TICKER`) 推斷應用程式內結構描述，如下所示：

![\[顯示應用程式內結構描述的主控台螢幕擷取畫面，其中包含股票代碼欄。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_topk_schema.png)


您可以將應用程式碼與 `TOP_K_VALUES_TUMBLING` 函數搭配使用，以建立資料的視窗化彙總。接著將產生的資料插入另一個應用程式內串流，如下列螢幕擷取畫面所示：



![\[顯示應用程式內串流生成資料的主控台螢幕擷取畫面。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_topk.png)


在下列程序中，建立 Kinesis Data Analytics 應用程式，以擷取輸入串流中最常出現的值。

**Topics**
+ [步驟 1：建立 Kinesis Data Stream](#examples-window-topkitems-1)
+ [步驟 2：建立 Kinesis Data Analytics 應用程式](#examples-window-topkitems-2)

## 步驟 1：建立 Kinesis Data Stream
<a name="examples-window-topkitems-1"></a>

建立 Amazon Kinesis 資料串流，並填入紀錄，如下所示：

1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。

1. 在導覽窗格中選擇**資料串流**。

1. 選擇**建立 Kinesis 串流**，然後建立內含一個碎片之串流。如需詳細資訊，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[建立串流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

1. 若要在生產環境中將記錄寫入 Kinesis 資料串流，建議您使用 [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html)或 [Kinesis Data Streams API](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html)。為了簡單起見，這個例子使用下面的 Python 指令碼來生成記錄。執行程式碼以填入範例股票代號記錄。這個簡單的程式碼會持續將隨機股票代號記錄寫入串流。讓指令碼保持執行，以便在稍後的步驟中產生應用程式結構描述。

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 步驟 2：建立 Kinesis Data Analytics 應用程式
<a name="examples-window-topkitems-2"></a>

建立 Kinesis Data Analytics 應用程式，如下所示。

1. 前往 [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics) 開啟 Managed Service for Apache Flink 主控台。

1. 選擇**建立應用程式**，輸入應用程式名稱，然後選擇**建立應用程式**。

1. 在應用程式詳細資料頁面上，選擇**連接串流資料**來連接至來源。

1. 在**連接至來源**頁面，執行下列動作：

   

   1. 選擇您在上一節建立的串流。

   1. 選擇**探索結構描述**。等待主控台顯示推斷的結構描述和範例記錄，這些記錄可用來推斷應用程式內串流所建立的結構描述。推斷的結構描述有一個資料欄。

   1. 選擇**儲存結構描述並更新串流範例**。主控台儲存結構描述後，選擇**結束**。

   1. 選擇**儲存並繼續**。

1. 在應用程式詳細資訊頁面上，選擇**至 SQL 編輯器**。若要啟動應用程式，請在出現的對話方塊中選擇**是，啟動應用程式**。

1. 在 SQL 編輯器中，編寫應用程式碼並驗證結果，如下所示：

   1. 請複製以下應用程式碼，然後在編輯器中貼上。

      ```
      CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
        "TICKER" VARCHAR(4), 
        "MOST_FREQUENT_VALUES" BIGINT
      );
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
          INSERT INTO "DESTINATION_SQL_STREAM"
          SELECT STREAM * 
              FROM TABLE (TOP_K_ITEMS_TUMBLING(
                  CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
                  'TICKER',         -- name of column in single quotes
                  5,                       -- number of the most frequently occurring values
                  60                       -- tumbling window size in seconds
                  )
              );
      ```

   1. 選擇 **儲存並執行 SQL**。

      在**即時分析**標籤上，您可以查看應用程式建立的所有應用程式內串流，並驗證資料。

# 範例：從查詢彙總部分結果
<a name="examples-window-partialresults"></a>

如 Amazon Kinesis 資料串流中有事件時間與擷取時間不完全相符的記錄，輪轉窗口中一些結果抵達，但不一定在窗口中發生。在這種情況下，輪轉窗口只會包含您想要的部分結果集。您可以使用以下幾種方法來修正此問題：
+ 僅使用輪轉窗口，並用 upserts 透過資料庫或資料倉儲彙總部分後處理結果。這種方法在處理應用程式時非常有效。它會無限期地處理彙總運算子 (`sum`、`min`、`max` 等等) 的延遲資料。這種方法的缺點是，您必須在資料庫層開發和維護其他應用程式邏輯。
+ 使用輪轉和滑動窗口，可提早產生部分結果，但在滑動窗口期間也會繼續產生完整的結果。這種方法使用複寫，而非 upsert 處理延遲資料，這樣就不需要在資料庫層添加額外的應用程式邏輯。此方法的缺點是它使用了更多的 Kinesis 處理單元 (KPU)，且依舊產生兩個結果，這可能不適用於某些使用案例。

如需輪轉與滑動窗口的詳細資訊，請參閱 [窗口化查詢](windowed-sql.md)。

在下列程序中，輪轉窗口彙總會產生兩個部分結果 (傳送至 `CALC_COUNT_SQL_STREAM` 應用程式內串流)，必須結合才能產生最終結果。接著，應用程式會產生第二個彙總 (傳送至 `DESTINATION_SQL_STREAM` 應用程式內串流)，結合兩個部分結果。

**若要建立使用事件時間彙總部分結果的應用程式**

1. 登入 AWS 管理主控台 並開啟位於 https：//[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) 的 Kinesis 主控台。

1. 在導覽窗格中，選擇**資料分析**。建立 Kinesis Data Analytics 應用程式，如 [Amazon Kinesis Data Analytics for SQL 應用程式入門](getting-started.md) 教學課程所述。

1. 在 SQL 編輯器中，以下列項目取代應用程式碼：

   ```
   CREATE OR REPLACE STREAM "CALC_COUNT_SQL_STREAM" 
       (TICKER      VARCHAR(4), 
       TRADETIME   TIMESTAMP, 
       TICKERCOUNT       DOUBLE);
   	            
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" 
       (TICKER      VARCHAR(4), 
       TRADETIME   TIMESTAMP, 
       TICKERCOUNT       DOUBLE);            
   	
   CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS 
       INSERT INTO "CALC_COUNT_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT")
       SELECT STREAM
           "TICKER_SYMBOL",
           STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
           COUNT(*) AS "TickerCount"
       FROM "SOURCE_SQL_STREAM_001"
       GROUP BY
           STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '1' MINUTE),
           STEP("SOURCE_SQL_STREAM_001"."APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
           TICKER_SYMBOL;
   
   CREATE PUMP "AGGREGATED_SQL_PUMP" AS 
       INSERT INTO "DESTINATION_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT")
       SELECT STREAM
           "TICKER",
           "TRADETIME",
           SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT"
       FROM "CALC_COUNT_SQL_STREAM"
       WINDOW W1 AS (PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING);
   ```

   應用程式碼中的 `SELECT` 陳述式會篩選 `SOURCE_SQL_STREAM_001` 中的資料欄，找出變更大於 1% 的股票價格，並使用幫浦將這些資料列插入另一個應用程式內串流 `CHANGE_STREAM`。

1. 選擇 **儲存並執行 SQL**。

第一個幫浦會將串流輸出至 `CALC_COUNT_SQL_STREAM` ，類似下列內容。請注意，結果集不完整：

![\[主控台螢幕擷取畫面顯示部分結果。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_partial_0.png)


然後，第二個幫浦輸出一個包含完整結果集的串流至 `DESTINATION_SQL_STREAM`：

![\[主控台螢幕擷取畫面顯示完整結果。\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/ex_partial_1.png)
