

在仔細考慮之後，我們決定停止 Amazon Kinesis Data Analytics for SQL 應用程式：

1. 從 **2025 年 9 月 1 日起，**我們不會為 Amazon Kinesis Data Analytics for SQL 應用程式提供任何錯誤修正，因為考慮到即將終止，我們將對其提供有限的支援。

2. 從 **2025 年 10 月 15 日起，**您將無法建立新的 Kinesis Data Analytics for SQL 應用程式。

3. 我們將自 **2026 年 1 月 27** 日起刪除您的應用程式。您將無法啟動或操作 Amazon Kinesis Data Analytics for SQL 應用程式。從那時起，Amazon Kinesis Data Analytics for SQL 將不再提供支援。如需詳細資訊，請參閱[Amazon Kinesis Data Analytics for SQL 應用程式終止](discontinuation.md)。

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 設定應用程式輸入
<a name="how-it-works-input"></a>

您的 Amazon Kinesis Data Analytics 應用程式可以從單一串流來源接收輸入，並選擇性地使用一個參考資料來源。如需詳細資訊，請參閱[Amazon Kinesis Data Analytics for SQL 應用程式：運作方式](how-it-works.md)。本主題中的各節說明應用程式輸入來源。

**Topics**
+ [

## 設定串流來源
](#source-streaming)
+ [

## 配置參考來源
](#source-reference)
+ [

# 使用 JSONPath
](about-json-path.md)
+ [

# 將串流來源元素映射至 SQL 輸入資料欄
](sch-mapping.md)
+ [

# 在串流資料上使用結構描述探索功能
](sch-dis.md)
+ [

# 在靜態資料上使用結構描述探索功能
](sch-dis-ref.md)
+ [

# 使用 Lambda 函數預處理資料
](lambda-preprocessing.md)
+ [

# 平行化輸入串流以提高輸送量
](input-parallelism.md)

## 設定串流來源
<a name="source-streaming"></a>

當您建立應用程式時，就要指定串流來源。您也可以在建立應用程式後修改輸入。Amazon Kinesis Data Analytics 支援您的應用程式採用下列串流來源：
+ Kinesis 資料串流 
+ Firehose 交付串流

**注意**  
2023 年 9 月 12 日之後，如果尚未使用 Kinesis Data Analytics for SQL，您將無法使用 Kinesis Data Firehose 做為建立新應用程式的來源。搭配使用 Kinesis Data Analytics for SQL 應用程式與 `KinesisFirehoseInput` 的現有客戶，可以繼續使用 `KinesisFirehoseInput`，在使用 Kinesis Data Analytics 的現有帳戶中新增應用程式。如果您是現有客戶，並且希望使用 Kinesis Data Analytics for SQL 與 `KinesisFirehoseInput` 建立新帳戶，您可以使用增加服務限制額度表單來開立案例。如需詳細資訊，請參閱 [AWS 支援 中心](https://console.aws.amazon.com/support/home#/)。我們建議在推廣生產之前，務必測試任何新的應用程式。

**注意**  
如果 Kinesis 資料串流經過加密，Kinesis Data Analytics 就能順暢存取加密串流中的資料，無需進一步設定。Kinesis Data Analytics 不會儲存從 Kinesis 資料串流讀取的未加密資料。如需詳細資訊，請參閱[什麼是 Kinesis Data Streams 伺服器端加密？](https://docs.aws.amazon.com/streams/latest/dev/what-is-sse.html)。

Kinesis Data Analytics 會持續輪詢串流來源是否有新資料，並根據輸入組態將其擷取至應用程式內串流。

**注意**  
將 Kinesis Stream 新增為應用程式的輸入不會影響串流中的資料。如果 Firehose 交付串流等其他資源也存取相同的 Kinesis 串流，則 Firehose 交付串流和 Kinesis Data Analytics 應用程式都會收到相同的資料。但是，輸送量和限流可能會受到影響。

應用程式碼會查詢應用程式內串流 。作為輸入組態的一部分，您提供以下項目：
+ **串流來源**：您提供的串流 Amazon Resource Name (ARN) 和 IAM 角色，可讓 Kinesis Data Analytics 代您存取串流。
+ **應用程式內串流名稱字首**：當您啟動應用程式時，Kinesis Data Analytics 會建立指定的應用程式內串流。在應用程式碼中，您可以使用此名稱存取應用程式內串流。

  您可以選擇性地將串流來源映射至多個應用程式內串流。如需詳細資訊，請參閱[限制](limits.md)。在這種情況下，Amazon Kinesis Data Analytics 會建立指定數量的應用程式內串流，名稱如下：*字首* `_001`、*字首* `_002` 和*字首* `_003`。根據預設，Kinesis Data Analytics 會將串流來源映射到一個名為*字首* `_001` 的應用程式內串流。

  您在應用程式內串流中插入資料列的速率有其限制。因此，Kinesis Data Analytics 支援多個此類應用程式內串流，讓您更快速地將記錄匯入應用程式。如果發現應用程式未跟上串流來源中的資料，您可以新增平行處理單位以改善效能。
+ **映射結構描述**：描述串流來源上的記錄格式 (JSON、CSV)。同時也描述串流上的每條記錄，如何映射至所建立應用程式內串流的資料欄。這是您提供資料欄名稱和資料類型的地方。

**注意**  
建立輸入應用程式內串流時，Kinesis Data Analytics 會在識別符 (串流名稱和資料欄名稱) 周圍加上引號。查詢此串流和資料欄時，您必須使用相同的大小寫 (完全符合小寫和大寫字母) ，並以引號指定。如需識別符的詳細資訊，請參閱《Amazon Managed Service for Apache Flink SQL 參考資料》**中的[識別符](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-identifiers.html)。

您可以在 Amazon Kinesis Data Analytics 主控台中建立應用程式並設定輸入。然後，主控台進行必要的 API 呼叫。在建立新應用程式 API，或將輸入組態新增至現有應用程式時，您可以設定應用程式輸入。如需詳細資訊，請參閱 [CreateApplication](API_CreateApplication.md) 及 [AddApplicationInput](API_AddApplicationInput.md)。以下是 `Createapplication` API 請求主體的輸入組態部分：

```
 "Inputs": [
        {
            "InputSchema": {
                "RecordColumns": [
                    {
                        "Mapping": "string",
                        "Name": "string",
                        "SqlType": "string"
                    }
                ],
                "RecordEncoding": "string",
                "RecordFormat": {
                    "MappingParameters": {
                        "CSVMappingParameters": {
                            "RecordColumnDelimiter": "string",
                            "RecordRowDelimiter": "string"
                        },
                        "JSONMappingParameters": {
                            "RecordRowPath": "string"
                        }
                    },
                    "RecordFormatType": "string"
                }
            },
            "KinesisFirehoseInput": {
                "ResourceARN": "string",
                "RoleARN": "string"
            },
            "KinesisStreamsInput": {
                "ResourceARN": "string",
                "RoleARN": "string"
            },
            "Name": "string"
        }
    ]
```

## 配置參考來源
<a name="source-reference"></a>

您也可以選擇將參考資料來源新增至現有的應用程式，以豐富來自串流來源的資料。您必須將參考資料作為物件存放在 S3 儲存貯體中。Amazon Kinesis Data Analytics 會在應用程式啟動時讀取 Amazon S3 物件，並建立應用程式內參考資料表。接著，您的應用程式程式碼就可以將其與應用程式內串流聯結。

您可以使用支援的格式 (CSV、JSON) 將參考資料存放在 Amazon S3 物件中。舉例來說，假設您的應用程式對股票訂單執行分析。假設串流來源採用以下記錄格式：

```
Ticker, SalePrice, OrderId

AMZN     $700        1003
XYZ      $250        1004
...
```

在這種情況下，您可以考慮維護一個參考資料來源，以提供每個股票代號的詳細資料，例如公司名稱

```
Ticker, Company
AMZN, Amazon
XYZ, SomeCompany
...
```

您可以使用 API 或主控台新增應用程式參考資料來源。Amazon Kinesis Data Analytics 提供下列 API 動作來管理參考資料來源：
+  [AddApplicationReferenceDataSource](API_AddApplicationReferenceDataSource.md)
+ [UpdateApplication](API_UpdateApplication.md)

如需使用主控台新增任務的詳細資訊，請參閱 [範例：將參考資料新增至 Kinesis Data Analytics 應用程式](app-add-reference-data.md)。

注意下列事項：
+ 如果應用程式正在執行，Kinesis Data Analytics 會建立應用程式內參考資料表，然後立即載入參考資料。
+ 如果應用程式未執行 (例如處於就緒狀態)，Kinesis Data Analytics 只會儲存更新的輸入組態。當應用程式開始執行時，Kinesis Data Analytics 會將參考資料以表格形式載入您的應用程式。

假設在 Kinesis Data Analytics 建立應用程式內參考資料表之後，您想要重新整理資料。也許你更新了 Amazon S3 物件，或者你想使用不同的 Amazon S3 物件。在此情況下，您可以明確呼叫 [UpdateApplication](API_UpdateApplication.md)，或選擇主控台中的**動作**、**同步化參考資料表**。Kinesis Data Analytics 不會自動重新整理應用程式內參考資料表。

您可以建立為參考資料來源的 Amazon S3 物件大小有其限制。如需詳細資訊，請參閱[限制](limits.md)。如果物件大小超過限制，Kinesis Data Analytics 將無法載入資料。應用程式狀態會顯示為執行中，但不會讀取資料。

當新增參考資料來源時，您要提供以下資訊：
+ **S3 儲存貯體和物件金鑰名稱**：除了儲存貯體名稱和物件金鑰之外，您還要提供一個 Kinesis Data Analytics 可以擔任的 IAM 角色，以代表您讀取物件。
+ **應用程式內參考資料表名稱**：Kinesis Data Analytics 會建立此應用程式內表格，並透過讀取 Amazon S3 物件來填入資料。這是您在應用程式碼中指定的資料表名稱。
+ **映射結構描述**：描述記錄格式 (JSON、CSV)，以及儲存在 Amazon S3 物件中的資料編碼。同時也說明每個資料元素如何對應至應用程式內參考資料表中的資料欄。

以下顯示 `AddApplicationReferenceDataSource` API 請求中的請求主體。

```
{
    "applicationName": "string",
    "CurrentapplicationVersionId": number,
    "ReferenceDataSource": {
        "ReferenceSchema": {
            "RecordColumns": [
                {
                    "IsDropped": boolean,
                    "Mapping": "string",
                    "Name": "string",
                    "SqlType": "string"
                }
            ],
            "RecordEncoding": "string",
            "RecordFormat": {
                "MappingParameters": {
                    "CSVMappingParameters": {
                        "RecordColumnDelimiter": "string",
                        "RecordRowDelimiter": "string"
                    },
                    "JSONMappingParameters": {
                        "RecordRowPath": "string"
                    }
                },
                "RecordFormatType": "string"
            }
        },
        "S3ReferenceDataSource": {
            "BucketARN": "string",
            "FileKey": "string",
            "ReferenceRoleARN": "string"
        },
        "TableName": "string"
    }
}
```

# 使用 JSONPath
<a name="about-json-path"></a>

**注意**  
2023 年 9 月 12 日之後，如果尚未使用 Kinesis Data Analytics for SQL，您將無法使用 Kinesis Data Firehose 做為建立新應用程式的來源。如需詳細資訊，請參閱[限制](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html)。

JSONPath 是查詢 JSON 物件元素的標準化方式。JSONPath 使用路徑表達式導覽 JSON 文件中的元素、巢狀元素及陣列。如需 JSON 的詳細資訊，請參閱[介紹 JSON](http://www.json.org/)。

Amazon Kinesis Data Analytics 會在應用程式的來源結構描述中使用 JSONPath 表達式，藉此在內含 JSON 格式資料的串流來源中識別資料元素。

如需將串流資料映射至應用程式輸入串流的詳細資訊，請參閱 [將串流來源元素映射至 SQL 輸入資料欄](sch-mapping.md)。

## 使用 JSONPath 存取 JSON 元素
<a name="about-json-path-elements"></a>

接下來，你可以找到如何使用 JSONPath 表達式來存取 JSON 格式資料的各種元素。針對本節中的範例，假設來源串流包含下列 JSON 記錄：

```
{
  "customerName":"John Doe",
  "address":
  {
    "streetAddress":
    [
      "number":"123",
      "street":"AnyStreet"
    ],
    "city":"Anytown"
  }
  "orders":
  [
    { "orderId":"23284", "itemName":"Widget", "itemPrice":"33.99" },
    { "orderId":"63122", "itemName":"Gadget", "itemPrice":"22.50" },
    { "orderId":"77284", "itemName":"Sprocket", "itemPrice":"12.00" }
  ]
}
```

### 存取 JSON 元素
<a name="about-json-path-firstlevel"></a>

若要用 JSONPath 查詢 JSON Data 中的元素，請使用以下語法。在此，`$` 表示資料階層的根，`elementName` 是要查詢的元素節點名稱。

```
$.elementName
```

下列表達式會查詢上述 JSON 範例中的 `customerName` 元素。

```
$.customerName
```

上述表達式會從前面的 JSON 記錄傳回下列項目。

```
John Doe
```

**注意**  
路徑表達式區分大小寫。表達式 `$.customername` 會從前面的 JSON 範例傳回 `null`。

**注意**  
如果路徑表達式指定的位置沒有出現任何元素，則表達式會傳回 `null`。下列表達式會從前面的 JSON 範例傳回 `null`，因為沒有相符的元素。  

```
$.customerId
```

### 存取巢狀 JSON 元素
<a name="about-json-path-nested"></a>

若要查詢巢狀 JSON 元素，請使用下列語法。

```
$.parentElement.element
```

下列表達式會查詢上述 JSON 範例中的 `city` 元素。

```
$.address.city
```

上述表達式會從前面的 JSON 記錄傳回下列項目。

```
Anytown
```

您可以使用以下語法查詢更多層次的子元素。

```
$.parentElement.element.subElement
```

下列表達式會查詢上述 JSON 範例中的 `street` 元素。

```
$.address.streetAddress.street
```

上述表達式會從前面的 JSON 記錄傳回下列項目。

```
AnyStreet
```

### 存取陣列
<a name="about-json-path-arrays"></a>

您可以通過以下方式存取 JSON 陣列中的資料：
+ 將陣列中的所有元素擷取為單一資料列。
+ 將陣列中的每個元素擷取為單獨的列。

#### 將陣列中的所有元素擷取為單一資料列。
<a name="about-json-path-arrays-row"></a>

若要將陣列的全部內容查詢為單一資料列，請使用下列語法。

```
$.arrayObject[0:]
```

下列表達式會查詢本節使用之上述 JSON 範例中的 `orders` 元素完整內容。它用單欄單列傳回陣列內容。

```
$.orders[0:]
```

上述表達式會從本節使用之範例 JSON 記錄傳回下列項目。

```
[{"orderId":"23284","itemName":"Widget","itemPrice":"33.99"},{"orderId":"61322","itemName":"Gadget","itemPrice":"22.50"},{"orderId":"77284","itemName":"Sprocket","itemPrice":"12.00"}]
```

#### 將陣列中的所有元素擷取為單一資料列。
<a name="about-json-path-arrays-separate"></a>

若要將陣列中的個別元素查詢為單獨的資料列，請使用下列語法。

```
$.arrayObject[0:].element
```

下列表達式會查詢前述 JSON 範例中的 `orderId` 元素，並將每個陣列元素傳回為個別的資料列。

```
$.orders[0:].orderId
```

上述表達式會從前面的 JSON 記錄傳回下列項目，每個資料項目都會以個別的資料列傳回。


****  

|  | 
| --- |
|  23284  | 
|  63122  | 
|  77284  | 

**注意**  
如果查詢非陣列元素的表達式包含在查詢個別陣列元素的結構描述中，則會針對陣列中的每個元素重複非陣列元素。舉例來說，假設上述 JSON 範例的結構描述包含下列表達式：  
\$1.customerName
\$1.orders[0:].orderId
在這種情況下，來自範例輸入串流元素的傳回資料列類似於下列項目，每個 `orderId` 元素都會重複 `name` 元素。  


****  

|  |  | 
| --- |--- |
|  John Doe  |  23284  | 
|  John Doe  |  63122  | 
|  John Doe  |  77284  | 

**注意**  
下列限制適用於 Amazon Kinesis Data Analytics 中的陣列表達式：  
陣列表達式僅支援一個層級的解除參考。不支援下列表達式格式。  

  ```
  $.arrayObject[0:].element[0:].subElement
  ```
結構描述中只能展平一個陣列。可以參考多個陣列 — 傳回為包含陣列中所有元素的一列。但是，只有一個陣列可以將其中的每個元素作為單獨的列傳回。  
包含以下格式元素的結構描述是有效的。這種格式會傳回第二個陣列的內容作為單一欄，在第一個陣列中的每個元素重複。  

  ```
  $.arrayObjectOne[0:].element
  $.arrayObjectTwo[0:]
  ```
包含以下格式元素的結構描述是無效的。  

  ```
  $.arrayObjectOne[0:].element
  $.arrayObjectTwo[0:].element
  ```

## 其他考量
<a name="about-json-path-other"></a>

使用 JSONPath 的其他注意事項如下：
+ 如果在應用程式結構描述中，JSONPath 表達式的個別元素沒有存取任何陣列，則會針對處理的每個 JSON 記錄，在應用程式的輸入串流中建立單一資料列。
+ 當陣列平面化 (即其元素會以個別資料列傳回) 時，任何遺失的元素都會導致應用程式內串流中出現 Null 值。
+ 陣列永遠會平面化到至少一列。如果不會傳回任何值 (也就是陣列為空或未查詢任何元素)，則會傳回包含所有 Null 值的單一資料列。

  下列表達式會從前面的 JSON 範例傳回帶有 null 值的紀錄 ，因為指定的路徑沒有相符元素。

  ```
  $.orders[0:].itemId
  ```

  上述表達式會從前面的 JSON 範例記錄傳回下列項目。  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/about-json-path.html)

## 相關主題
<a name="about-json-path.Related"></a>
+ [JSON 簡介](http://www.json.org/)

# 將串流來源元素映射至 SQL 輸入資料欄
<a name="sch-mapping"></a>

**注意**  
2023 年 9 月 12 日之後，如果尚未使用 Kinesis Data Analytics for SQL，您將無法使用 Kinesis Data Firehose 做為建立新應用程式的來源。如需詳細資訊，請參閱[限制](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html)。

透過 Amazon Kinesis Data Analytics，您可以使用標準 SQL 來處理和分析 JSON 或 CSV 格式的串流資料。
+ 若要處理和分析串流 CSV 資料，請為輸入串流的資料欄指派欄名稱和資料類型。您的應用程式會依序從每個資料欄定義的輸入串流匯入一個資料欄。

  您不必在應用程式輸入串流中包含所有資料欄，但無法略過來源串流中的資料欄。例如，您可以從包含五個元素的輸入串流匯入前三個資料欄，但不能只匯入欄 1、2 和 4。
+ 若要處理和分析串流 JSON 資料，您可以使用 JSONPath 表達式，將 JSON 元素從串流來源映射至輸入串流的 SQL 資料欄。如需搭配 Amazon Kinesis Data Analytics 使用 JSONPath 的詳細資訊，請參閱 [使用 JSONPath](about-json-path.md)。在 SQL 表中的資料欄具有從 JSON 類型映射的資料類型。關於支援的資料類型，請參閱[資料類型](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-data-types.html)。如需將 JSON 資料轉換為 SQL 資料的詳細資訊，請參閱 [將 JSON 資料類型映射到 SQL 資料類型](#sch-mapping-datatypes)。

如需如何配置輸入串流的詳細資訊，請參閱 [設定應用程式輸入](how-it-works-input.md)。

## 將 JSON 資料映射至 SQL 資料欄
<a name="sch-mapping-json"></a>

 您可以使用 AWS 管理主控台 或 Kinesis Data Analytics API 將 JSON 元素映射至輸入資料欄。
+ 若要使用主控台將元素映射至資料欄，請參閱 [使用結構描述編輯器](console-summary-edit-schema.md)。
+ 若要使用 Kinesis Data Analytics API 將元素映射至資料欄，請參閱下節。

若要將 JSON 元素映射至應用程式內輸入串流的資料欄，您需要每個資料欄的結構描述，且其中須包含下列資訊：
+ **來源表達式：**識別資料欄資料位置的 JSONPath 表達式。
+ **資料欄名稱：**SQL 查詢用來參考資料的名稱。
+ **資料類型：**資料欄的 SQL 資料類型。

## 使用 API
<a name="sf-map-api"></a>

若要將串流來源中的元素映射至輸入資料欄，您可以使用 Kinesis Data Analytics API [CreateApplication](API_CreateApplication.md) 動作。如要建立應用程式內串流，您必須指定一個結構描述，以將資料轉換為 SQL 中使用的架構化版本。[CreateApplication](API_CreateApplication.md) 動作可以設定您的應用程式，以從單一串流來源接收輸入。若要將 JSON 元素或 CSV 資料欄映射至 SQL 資料欄，請在 [SourceSchema](API_SourceSchema.md) `RecordColumns` 陣列中建立 [RecordColumn](API_RecordColumn.md) 物件。[RecordColumn](API_RecordColumn.md) 物件包含以下結構描述：

```
{ 
    "Mapping": "String",
    "Name": "String",
    "SqlType": "String"
}
```

[RecordColumn](API_RecordColumn.md) 物件中的欄位具有下列值：
+ `Mapping`：識別輸入串流記錄資料位置的 JSONPath 表達式。CSV 格式的來源串流輸入結構描述不存在此值。
+ `Name`：應用程式內 SQL 資料串流中的資料欄名稱。
+ `SqlType`：應用程式內 SQL 資料串流中資料的類型。

### JSON 輸入結構描述範例
<a name="sf-map-api-json-example"></a>

下列範例會示範 JSON 結構描述 `InputSchema` 值的格式。

```
"InputSchema": {
    "RecordColumns": [
        {
            "SqlType": "VARCHAR(4)",
            "Name": "TICKER_SYMBOL",
            "Mapping": "$.TICKER_SYMBOL"
        },
        {
            "SqlType": "VARCHAR(16)",
            "Name": "SECTOR",
            "Mapping": "$.SECTOR"
        },
        {
            "SqlType": "TINYINT",
            "Name": "CHANGE",
            "Mapping": "$.CHANGE"
        },
        {
            "SqlType": "DECIMAL(5,2)",
            "Name": "PRICE",
            "Mapping": "$.PRICE"
        }
    ],
    "RecordFormat": {
        "MappingParameters": {
            "JSONMappingParameters": {
                "RecordRowPath": "$"
            }
        },
        "RecordFormatType": "JSON"
    },
    "RecordEncoding": "UTF-8"
}
```

### CSV 輸入結構描述範例
<a name="sf-map-api-csv-example"></a>

下列範例會示範使用逗號分隔值 (CSV) 格式的結構描述 `InputSchema` 值格式。

```
"InputSchema": {
    "RecordColumns": [
        {
            "SqlType": "VARCHAR(16)",
            "Name": "LastName"
        },
        {
            "SqlType": "VARCHAR(16)",
            "Name": "FirstName"
        },
        {
            "SqlType": "INTEGER",
             "Name": "CustomerId"
        }
    ],
    "RecordFormat": {
        "MappingParameters": {
            "CSVMappingParameters": {
                "RecordColumnDelimiter": ",",
                "RecordRowDelimiter": "\n"
            }
        },
        "RecordFormatType": "CSV"
    },
    "RecordEncoding": "UTF-8"
}
```

## 將 JSON 資料類型映射到 SQL 資料類型
<a name="sch-mapping-datatypes"></a>

JSON 資料類型轉換為相應的 SQL 資料類型，根據的是應用程式之輸入結構描述。如需支援之 SQL 資料類型的詳細資訊，請參閱[資料類型](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-data-types.html)。Amazon Kinesis Data Analytics 會根據下列規則將 JSON 資料類型轉換為 SQL 資料類型。

### Null 常值
<a name="sch-mapping-datatypes-null"></a>

JSON 輸入串流中的 null 常值 (也就是 `"City":null`) 會轉換為 SQL null，不論目的地資料類型為何。

### 布林常值。
<a name="sch-mapping-datatypes-boolean"></a>

JSON 輸入串流中的布林常值 (也就是 `"Contacted":true`) 會轉換為 SQL 資料，如下所示：
+ 數字 (DECIMAL、INT 等)：`true` 轉換為 1；`false` 轉換為 0。
+ 二進制 (BINARY 或 VARBINARY)：
  + `true`：結果具有最低位元組，並清除剩餘位元數。
  + `false`：結果已清除所有位元數。

  轉換為 VARBINARY 會產生 1 個位元的長度值。
+ 布林值：轉換為相應的 SQL 布林值。
+ 字元 (CHAR 或 VARCHAR)：轉換為對應的字串值 (`true` 或 `false`)。該值被截斷以適合欄位的長度。
+ 日期時間 (DATE、TIME 或 TIMESTAMP)：轉換失敗，強制錯誤會寫入錯誤串流。

### Number
<a name="sch-mapping-datatypes-number"></a>

JSON 輸入串流中的常值 (也就是 `"CustomerId":67321`) 會轉換為 SQL 資料，如下所示：
+ 數字 (DECIMAL、INT 等)：直接轉換。如果轉換後的值超過目標資料類型 (也就是轉換 `123.4` 為 INT) 的大小或精確度，則轉換會失敗，並將強制錯誤寫入錯誤串流。
+ 二進位 (BINARY 或 VARBINARY)：轉換失敗，強制錯誤會寫入錯誤串流。
+ 布林值：
  + `0`：轉換為 `false`。
  + 所有其他數字：轉換為 `true`。
+ 字符（CHAR 或 VARCHAR）：轉換為數字的字符串表示。
+ 日期時間 (DATE、TIME 或 TIMESTAMP)：轉換失敗，強制錯誤會寫入錯誤串流。

### String
<a name="sch-mapping-datatypes-string"></a>

JSON 輸入串流中的字串值 (也就是 `"CustomerName":"John Doe"`) 會轉換為 SQL 資料，如下所示：
+ 數字 (DECIMAL、INT 等)：Amazon Kinesis Data Analytics 會嘗試將值轉換為目標資料類型。如果無法轉換值，則轉換會失敗，並且會將強制錯誤寫入錯誤資料流。
+ 二進位 (BINARY 或 VARBINARY)：如果來源字串是有效的二進位常值 (也就是 `X'3F67A23A'`，具有偶數 f)，該值會轉換為目標資料類型。否則，轉換會失敗，並將強制錯誤寫入錯誤串流。
+ 布林值：如果來源字串為 `"true"`，則會轉換為 `true`。此比較不區分大小寫。否則，會轉換為 `false`。
+ 字元 (CHAR 或 VARCHAR)：轉換為輸入的字串值。如果值長於目標資料類型，則會截斷該值，且不會將錯誤寫入錯誤串流。
+ 日期時間 (DATE、TIME 或 TIMESTAMP)：如果來源字串的格式可轉換為目標值，則會轉換該值。否則，轉換會失敗，並將強制錯誤寫入錯誤串流。

  有效的日期時間格式包括：
  + "1992-02-14"
  + "1992-02-14 18:35:44.0"

### 物件的陣列
<a name="sch-mapping-datatypes-array"></a>

JSON 輸入流中的陣列或物件轉換為 SQL 資料，如下所示：
+ 字元 (CHAR 或 VARCHAR)：轉換為陣列或物件的來源文字。請參閱 [存取陣列](about-json-path.md#about-json-path-arrays)。
+ 其他所有資料類型：轉換失敗，並將強制錯誤寫入錯誤串流。

如需 JSON 陣列的範例，請參閱 [使用 JSONPath](about-json-path.md)。

## 相關主題
<a name="sch-mapping.Related"></a>
+ [設定應用程式輸入](how-it-works-input.md)
+ [資料類型](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-data-types.html)
+ [使用結構描述編輯器](console-summary-edit-schema.md)
+ [CreateApplication](API_CreateApplication.md)
+ [RecordColumn](API_RecordColumn.md)
+ [SourceSchema](API_SourceSchema.md)

# 在串流資料上使用結構描述探索功能
<a name="sch-dis"></a>

**注意**  
2023 年 9 月 12 日之後，如果尚未使用 Kinesis Data Analytics for SQL，您將無法使用 Kinesis Data Firehose 做為建立新應用程式的來源。如需詳細資訊，請參閱[限制](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html)。

如要提供輸入結構描述，以說明串流輸入中的記錄如何映射至應用程式內串流，可能會很麻煩且容易出錯。您可以使用 [DiscoverInputSchema](API_DiscoverInputSchema.md) API (稱為*探索 API*) 來推斷結構描述。使用串流來源上的隨機範例記錄，API 可以推斷結構描述 (也就是資料欄名稱、資料類型以及傳入資料中資料元素的位置)。

**注意**  
若要使用探索 API 在 Amazon S3 存放的檔案產生結構描述，請參閱 [在靜態資料上使用結構描述探索功能](sch-dis-ref.md)。

主控台使用探索 API 產生指定串流來源的結構描述。使用主控台，您也可以更新結構描述，包括新增或移除資料欄、變更資料欄名稱或資料類型等。但是，請仔細進行變更，以確保不會建立無效的結構描述。

完成應用程式內串流的結構描述之後，您可以使用一些函數來操作字串和日期時間值。在產生的應用程式內串流中使用資料列時，您可以在應用程式碼中使用這些函數。如需詳細資訊，請參閱[範例：轉換 DateTime 值](app-string-datetime-manipulation.md)。

## 探索結構描述期間的資料欄命名
<a name="sch-dis-column-names"></a>

在結構描述探索期間，Amazon Kinesis Data Analytics 會嘗試從串流輸入來源盡可能保留原始資料欄名稱，但下列情況除外：
+ 來源串流資料欄名稱是保留的 SQL 關鍵字，例如 `TIMESTAMP`、`USER`、`VALUES` 或 `YEAR`。
+ 來源串流資料欄名稱包含不支援的字元。僅允許使用字母、數字和底線 ( \$1 )。
+ 來源串流資料欄名稱以數字開頭。
+ 來源串流資料欄名稱長度超過 100 個字元。

如果重新命名資料欄，則重新命名的結構描述欄名稱會以 `COL_` 開頭。在某些情況下，無法保留原始資料欄名稱，例如整個名稱是不支援的字元。在這種情況下，資料欄被命名為 `COL_#`，\$1 是指示資料欄在順序中的位置數字。

探索完成後，您可以使用主控台更新結構描述，以新增或移除資料欄，或變更資料欄名稱、資料類型或資料大小。

### 探索建議的資料欄名稱範例
<a name="sch-dis-column-names-examples"></a>


****  

| 來源串流資料欄名稱 | 探索建議的資料欄名稱範例 | 
| --- | --- | 
|  USER  |  COL\$1USER  | 
|  USER@DOMAIN  |  COL\$1USERDOMAIN  | 
|  @@  |  COL\$10  | 

## 結構描述探索問題
<a name="sch-dis-when-fails"></a>

如果 Kinesis Data Analytics 未推斷指定串流來源的結構描述，會發生什麼情況？ 

Kinesis Data Analytics 會推斷您的結構描述使用常見的格式，例如 CSV 和 JSON，這些格式都是 UTF-8 編碼。Kinesis Data Analytics 使用自訂資料欄和列分隔符號支援任何 UTF-8 編碼記錄 (包括應用程式日誌和記錄等原始文字)。如果 Kinesis Data Analytics 未推斷結構描述，您可以使用主控台上的結構描述編輯器 (或使用 API) 手動定義結構描述。

 如果您的資料不遵循模式 (您可以使用結構描述編輯器表明)，您可以將結構描述定義為 VARCHAR (N) 類型的單一資料欄，其中 N 是您預期記錄包含的最多字元數。從那裡，您可以使用字串和日期時間操作，在資料傳入應用程式內串流之後建構資料。如需範例，請參閱 [範例：轉換 DateTime 值](app-string-datetime-manipulation.md)。

# 在靜態資料上使用結構描述探索功能
<a name="sch-dis-ref"></a>

**注意**  
2023 年 9 月 12 日之後，如果尚未使用 Kinesis Data Analytics for SQL，您將無法用 Kinesis Data Firehose 做為建立新應用程式的來源。如需詳細資訊，請參閱[限制](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html)。

結構描述探索功能，可以從串流資料或儲存在 Amazon S3 儲存貯體的靜態檔案資料產生結構描述。假設您想要為 Kinesis Data Analytics 應用程式產生結構描述，以供參考或因應無法使用即時串流資料的時刻。在靜態檔案上使用結構描述探索功能，該檔案包含串流或參考資料中預期格式的資料樣本。Kinesis Data Analytics 可以針對儲存在 Amazon S3 儲存貯體的 JSON 或 CSV 檔案中的範例資料執行結構描述探索。在資料檔案上使用結構描述探索，該檔案使用主控台或指定了 `S3Configuration` 參數的 [DiscoverInputSchema](API_DiscoverInputSchema.md) API。

## 使用主控台執行結構描述探索
<a name="sch-dis-ref-console"></a>

若要使用主控台在靜態檔案上運行探索，請執行下列動作：

1. 將參考資料物件新增至 S3 儲存貯體。

1. 在 Kinesis Data Analytics 資料分析主控台的應用程式主頁中，選擇**連接參考資料**。

1. 提供儲存貯體、路徑和 IAM 角色資料，以存取包含參考資料的 Amazon S3 物件。

1. 選擇**探索結構描述**。

如需如何在主控台中新增參考資料和探索結構描述的詳細資訊，請參閱 [範例：將參考資料新增至 Kinesis Data Analytics 應用程式](app-add-reference-data.md)。

## 使用主控台執行結構描述探索
<a name="sch-dis-ref-api"></a>

若要使用 API 在靜態檔案上執行探索，請為 API 提供具有下列資訊的 `S3Configuration` 結構：
+ `BucketARN`：內含檔案的 Amazon S3 儲存貯體之 Amazon Resource Name (ARN)。如需 Amazon S3儲存貯體 ARN 的格式，請參閱 [Amazon Resource Names (ARNs) 和 Amazon Service 命名空間：Amazon Simple Storage Service (Amazon S3)](https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html#arn-syntax-s3)。
+ `RoleARN`：具有 `AmazonS3ReadOnlyAccess` 政策的 IAM 角色之 ARN。如需將政策新增至角色的詳細資訊，請參閱[修改角色](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_manage_modify.html)。
+ `FileKey`：物件的檔案名稱。

**使用 `DiscoverInputSchema` API 從 Amazon S3 物件產生結構描述**

1. 請確定您已 AWS CLI 設定 。如需詳細資訊，請參閱入門指南一節中的 [步驟 2：設定 AWS Command Line Interface (AWS CLI)](setup-awscli.md)。

1. 使用下列內容建立名為 `data.csv` 的檔案：

   ```
   year,month,state,producer_type,energy_source,units,consumption
   2001,1,AK,TotalElectricPowerIndustry,Coal,ShortTons,47615
   2001,1,AK,ElectricGeneratorsElectricUtilities,Coal,ShortTons,16535
   2001,1,AK,CombinedHeatandPowerElectricPower,Coal,ShortTons,22890
   2001,1,AL,TotalElectricPowerIndustry,Coal,ShortTons,3020601
   2001,1,AL,ElectricGeneratorsElectricUtilities,Coal,ShortTons,2987681
   ```

1. 至 [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/) 登入 Amazon S3 主控台。

1. 建立 Amazon S3 儲存貯體並上傳您建立的 `data.csv` 檔案 請記下您建立之儲存貯體的 ARN。如需有關建立 Amazon S3 儲存貯體及上傳檔案的詳細資訊，請參閱[《Amazon Simple Storage Service 使用者指南》](https://docs.aws.amazon.com/AmazonS3/latest/userguide/GetStartedWithS3.html)。

1. 前往 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 開啟 IAM 主控台。使用 `AmazonS3ReadOnlyAccess` 政策建立角色。請記下新角色的 ARN。如需有關建立角色的詳細資訊，請參閱[建立角色以委派許可給 Amazon 服務](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-service.html)。如需將政策新增至角色的詳細資訊，請參閱[修改角色](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_manage_modify.html)。

1. 在 中執行下列`DiscoverInputSchema`命令 AWS CLI，取代 Amazon S3 儲存貯體和 IAM 角色ARNs：

   ```
   $aws kinesisanalytics discover-input-schema --s3-configuration '{ "RoleARN": "arn:aws:iam::123456789012:role/service-role/your-IAM-role", "BucketARN": "arn:aws:s3:::your-bucket-name", "FileKey": "data.csv" }' 
   ```

1. 回應看起來類似以下的內容。

   ```
   {
       "InputSchema": {
           "RecordEncoding": "UTF-8",
           "RecordColumns": [
               {
                   "SqlType": "INTEGER",
                   "Name": "COL_year"
               },
               {
                   "SqlType": "INTEGER",
                   "Name": "COL_month"
               },
               {
                   "SqlType": "VARCHAR(4)",
                   "Name": "state"
               },
               {
                   "SqlType": "VARCHAR(64)",
                   "Name": "producer_type"
               },
               {
                   "SqlType": "VARCHAR(4)",
                   "Name": "energy_source"
               },
               {
                   "SqlType": "VARCHAR(16)",
                   "Name": "units"
               },
               {
                   "SqlType": "INTEGER",
                   "Name": "consumption"
               }
           ],
           "RecordFormat": {
               "RecordFormatType": "CSV",
               "MappingParameters": {
                   "CSVMappingParameters": {
                       "RecordRowDelimiter": "\r\n",
                       "RecordColumnDelimiter": ","
                   }
               }
           }
       },
       "RawInputRecords": [
           "year,month,state,producer_type,energy_source,units,consumption\r\n2001,1,AK,TotalElectricPowerIndustry,Coal,ShortTons,47615\r\n2001,1,AK,ElectricGeneratorsElectricUtilities,Coal,ShortTons,16535\r\n2001,1,AK,CombinedHeatandPowerElectricPower,Coal,ShortTons,22890\r\n2001,1,AL,TotalElectricPowerIndustry,Coal,ShortTons,3020601\r\n2001,1,AL,ElectricGeneratorsElectricUtilities,Coal,ShortTons,2987681"
       ],
       "ParsedInputRecords": [
           [
               null,
               null,
               "state",
               "producer_type",
               "energy_source",
               "units",
               null
           ],
           [
               "2001",
               "1",
               "AK",
               "TotalElectricPowerIndustry",
               "Coal",
               "ShortTons",
               "47615"
           ],
           [
               "2001",
               "1",
               "AK",
               "ElectricGeneratorsElectricUtilities",
               "Coal",
               "ShortTons",
               "16535"
           ],
           [
               "2001",
               "1",
               "AK",
               "CombinedHeatandPowerElectricPower",
               "Coal",
               "ShortTons",
               "22890"
           ],
           [
               "2001",
               "1",
               "AL",
               "TotalElectricPowerIndustry",
               "Coal",
               "ShortTons",
               "3020601"
           ],
           [
               "2001",
               "1",
               "AL",
               "ElectricGeneratorsElectricUtilities",
               "Coal",
               "ShortTons",
               "2987681"
           ]
       ]
   }
   ```

# 使用 Lambda 函數預處理資料
<a name="lambda-preprocessing"></a>

**注意**  
2023 年 9 月 12 日之後，如果尚未使用 Kinesis Data Analytics for SQL，您將無法用 Kinesis Data Firehose 做為建立新應用程式的來源。如需詳細資訊，請參閱[限制](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html)。

如果串流中的資料需要格式轉換、轉換、擴充或篩選，您可以使用 AWS Lambda 函數預先處理資料。您可以在應用程式 SQL 程式碼執行之前，或在應用程式從資料串流建立結構描述之前執行此動作。

在下列案例中，使用 Lambda 函數來預先處理記錄非常有用：
+ 將記錄從其他格式 (例如 KPL 或 GZIP) 轉換為 Kinesis Data Analytics 分析可以分析的格式。Kinesis Data Analytics 目前支援 JSON 或 CSV 資料格式。
+ 將資料擴展為彙總或異常偵測等作業更容易存取的格式。舉例來說，如果多個資料值一起存儲在一個字串中，則可以將資料擴展到單獨的資料欄中。
+ 透過其他 Amazon 服務進行資料充實，例如外推法或錯誤修正。
+ 將複雜的字符串轉換應用於記錄欄位。
+ 用於清理資料的數據過濾。

## 使用 Lambda 函數預處理資料
<a name="lambda-preprocessing-use"></a>

建立 Kinesis Data Analytics 應用程式時，您可以在**連接至來源**頁面中啟用 Lambda 預先處理。

**使用 Lambda 函數預先處理 Kinesis Data Analytics 應用程式中的記錄**

1. 登入 AWS 管理主控台 並開啟 Managed Service for Apache Flink 主控台，網址為 [ https：//https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics)。

1. 在應用程式的**連接至來源**頁面上，選擇**記錄預處理方式**] AWS Lambda區段中的**啟用**。

1. 若要使用已建立的 Lambda 函數，請在 **Lambda 函數**下拉式清單中選擇函數。

1. 若要從其中一個 Lambda 預處理範本建立新的 Lambda 函數，請從下拉式清單中選擇範本。然後選擇 **觀看 Lambda 中的 <template name>** 來編輯函數。

1. 選擇**建立新的**來建立新 Lambda 函數。如需有關使用 Lambda 函數的詳細資訊，請參閱《AWS Lambda 開發人員指南》**中的[建立 HelloWorld Lambda 函數並探索主控台](https://docs.aws.amazon.com/lambda/latest/dg/getting-started-create-function.html)。

1. 選擇要使用的 Lambda 函數版本。若要使用最新版本，請選擇 **\$1LATEST**。

當您選擇或建立 Lambda 函數進行記錄預先處理時，系統會在執行應用程式 SQL 程式碼，或應用程式從記錄產生結構描述之前預先處理記錄。

## Lambda 預處理許可
<a name="lambda-preprocessing-policy"></a>

若要使用 Lambda 預處理，應用程式的 IAM 角色需要下列許可政策：

```
     {
       "Sid": "UseLambdaFunction",
       "Effect": "Allow",
       "Action": [
           "lambda:InvokeFunction",
           "lambda:GetFunctionConfiguration"
       ],
       "Resource": "<FunctionARN>"
   }
```

## Lambda 預處理指標
<a name="lambda-preprocessing-metrics"></a>

您可以使用 Amazon CloudWatch 來監控 Lambda 調用的位元數、成功與失敗情況等。如需 Kinesis Data Analytics 使用 Lambda 預處理發出的 CloudWatch 指標相關資訊，請參閱 [Amazon Kinesis Analytics 指標](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html)。

## AWS Lambda 搭配 Kinesis Producer Library 使用
<a name="lambda-preprocessing-deaggregation"></a>

[Kinesis Producer Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) (KPL) 會將使用者格式化的小型記錄彙整成至多 1 MB 的較大型記錄，以便更妥善利用 Amazon Kinesis Data Streams 輸送量。適用於 Java 的 Kinesis Client Library (KCL) 支援取消彙整這類記錄。不過，當您使用 AWS Lambda 做為串流的取用者時，您必須使用特殊模組來取消彙總記錄。

您可以從 GitHub 取得必要的專案程式碼及相關指示，詳情請參閱[Kinesis Producer Library Deaggregation Modules for AWS Lambda](https://github.com/awslabs/kinesis-deaggregation)。您可以使用此專案中的元件，在 Java、Node.js 和 Python AWS Lambda 中的 中處理 KPL 序列化資料。上述元件也可用於建構[多語言 KCL 應用程式](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/package-info.java)。

## 資料預處理事件輸入資料模型/記錄響應模型
<a name="lambda-preprocessing-data-model"></a>

若要預處理記錄，您的 Lambda 函數必須符合所需的事件輸入資料和記錄回應模型。

### 事件輸入資料模型
<a name="lambda-preprocessing-request-model"></a>

Kinesis Data Analytics 會持續從您的 Kinesis 資料串流或 Firehose 交付串流讀取資料。對於擷取的每批記錄，服務會管理每個批次傳遞至 Lambda 函數的方式。您的函數接收記錄列表作為輸入。在函數中，您可以迭代列表並應用業務邏輯以完成預處理需求（例如資料格式轉換或擴充）。

預先處理函數的輸入模型會略有不同，取決於資料是從 Kinesis 資料串流還是 Firehose 交付串流接收。

如果來源是 Firehose 交付串流，則事件輸入資料模型如下所示：

**Kinesis Data Firehose 請求數據模型**


| 欄位 | Description | 
| --- | --- | 
| 欄位 | Description | 
| --- | --- | 
| 欄位 | Description | 
| --- | --- | 
| invocationId | Lambda 調用 ID (隨機 GUID)。 | 
| applicationArn | Kinesis Data Analytics 應用程式的 Amazon Resource Name (ARN) | 
| streamArn | 交付串流 ARN | 
| 紀錄 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | 記錄 ID (隨機 GUID) | 
| kinesisFirehoseRecordMetadata |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| data | Base64 編碼來源記錄承載 | 
| approximateArrivalTimestamp | 交付串流記錄大約到達時間 | 

下列範例顯示來自 Firehose 交付串流的輸入：

```
{
   "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2",
   "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test",
   "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test",
   "records":[
      {
         "recordId":"49572672223665514422805246926656954630972486059535892482",
         "data":"aGVsbG8gd29ybGQ=",
         "kinesisFirehoseRecordMetadata":{
            "approximateArrivalTimestamp":1520280173
         }
      }
   ]
}
```

如果來源是 Kinesis 資料串流，則事件輸入資料模型如下：

**Kinesis 串流請求資料模型**


| 欄位 | Description | 
| --- | --- | 
| 欄位 | Description | 
| --- | --- | 
| 欄位 | Description | 
| --- | --- | 
| invocationId | Lambda 調用 ID (隨機 GUID)。 | 
| applicationArn | Kinesis Data Analytics 應用程式 ARN | 
| streamArn | 交付串流 ARN | 
| 紀錄 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | 以 Kinesis 記錄序號為基礎的記錄 ID | 
| kinesisStreamRecordMetadata |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| 資料 | Base64 編碼來源記錄承載 | 
| sequenceNumber | 來自 Kinesis 串流記錄的序號 | 
| partitionKey | Kinesis 串流記錄中的分割區索引鍵 | 
| shardId | Kinesis 串流記錄的 ShardId | 
| approximateArrivalTimestamp | 交付串流記錄大約到達時間 | 

下列範例顯示 Kinesis 資料串流的輸入：

```
{
  "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2",
  "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test",
  "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test",
  "records": [
    {
      "recordId": "49572672223665514422805246926656954630972486059535892482",
      "data": "aGVsbG8gd29ybGQ=",
      "kinesisStreamRecordMetadata":{
            "shardId" :"shardId-000000000003",
            "partitionKey":"7400791606",
            "sequenceNumber":"49572672223665514422805246926656954630972486059535892482",
            "approximateArrivalTimestamp":1520280173
         }
    }
  ]
}
```

### 紀錄回應模型
<a name="lambda-preprocessing-response-model"></a>

必須傳回送至 Lambda 函數的所有從 Lambda 預處理函數傳回的記錄 (含有記錄 ID)。它們必須包含以下參數，否則 Kinesis Data Analytics 會拒絕這類紀錄，並將其視為資料預處理失敗。資料有效承載部分可以轉換，以完成預處理要求。

**回應資料模型**


| 欄位 | Description | 
| --- | --- | 
| 紀錄 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | 在調用期間，記錄 ID 會從 Kinesis Data Analytics 傳遞至 Lambda。轉換記錄必須包含相同的記錄 ID。原始記錄的 ID 與轉換記錄的 ID 若有任何不符，就會視為資料轉換失敗。 | 
| result | 記錄的資料轉換狀態。可能值如下：[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| data | 已轉換資料承載 (base64 編碼後)。如果應用程式擷取資料格式為 JSON，則每個資料承載都可以包含多個 JSON 文件。或者，如果應用程式擷取資料格式為 CSV，則每個列都可以包含多個 CSV 列 (在每一列中指定資料列分隔符號)。Kinesis Data Analytics 服務能夠在相同資料承載中成功剖析和處理包含多個 JSON 文件或 CSV 列的資料。 | 

以下是 Lambda 函數輸出的範例：

```
{
  "records": [
    {
      "recordId": "49572672223665514422805246926656954630972486059535892482",
      "result": "Ok",
      "data": "SEVMTE8gV09STEQ="
    }
  ]
}
```

## 常見的資料預處理失敗
<a name="lambda-preprocessing-failures"></a>

以下是預處理失敗的常見原因。
+ 並非所有傳送至 Lambda 函數的批次記錄 (具有記錄 ID) 都會傳回 Kinesis Data Analytics 服務。
+ 回應遺失記錄 ID、狀態或資料承載欄位。資料承載欄位對於 `Dropped` 或 `ProcessingFailed` 記錄而言是選擇性的。
+ Lambda 函數逾時不足以預處理資料。
+ Lambda 函數回應超過 AWS Lambda 服務施加的回應限制。

對於資料預處理失敗，Kinesis Data Analytics 會繼續在同一組記錄上重試 Lambda 調用，直到成功為止。您可以監控下列 CloudWatch 指標來深入暸解故障情況。
+ Kinesis Data Analytics 應用程式 `MillisBehindLatest`：指出應用程式從串流來源讀取落後的程度。
+ Kinesis Data Analytics 應用程式 `InputPreprocessing` CloudWatch 指標：指出成功和失敗的次數，以及其他統計資料。如需詳細資訊，請參閱[Amazon Kinesis Analytics 指標](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html)。
+ AWS Lambda 函數 CloudWatch 指標和日誌。

# 建立 Lambda 函數以進行預處理
<a name="lambda-preprocessing-functions"></a>

將記錄擷取至應用程式時，您的 Amazon Kinesis Data Analytics 應用程式可以使用 Lambda 函數來預處理記錄。Kinesis Data Analytics 會在主控台上提供下列範本，作為預處理資料的起點。

**Topics**
+ [

## 在 Node.js 中建立預處理 Lambda 函數
](#lambda-preprocessing-functions-nodejs)
+ [

## 在 Python 中建立預處理 Lambda 函數
](#lambda-preprocessing-functions-python)
+ [

## 在 Java 中建立預處理 Lambda 函數
](#lambda-preprocessing-functions-java)
+ [

## 在 .NET 中建立預處理 Lambda 函數
](#lambda-preprocessing-functions-net)

## 在 Node.js 中建立預處理 Lambda 函數
<a name="lambda-preprocessing-functions-nodejs"></a>

Kinesis Data Analytics 主控台提供了下列在 Node.js 中建立預處理 Lambda 函數的範本：


| Lambda 藍圖 | 語言與版本 | Description | 
| --- | --- | --- | 
| 一般 Kinesis Data Analytics 輸入處理  | Node.js 6.10 |  Kinesis Data Analytics 記錄預處理器，可接收 JSON 或 CSV 記錄做為輸入，然後傳回處理狀態。使用此處理器作為自訂轉換邏輯的起點。  | 
| 壓縮輸入處理 | Node.js 6.10 | Kinesis Data Analytics 記錄預處理器，可接收壓縮的 (GZIP 或 Deflate 壓縮) JSON 或 CSV 記錄做為輸入，然後傳回解壓縮的紀錄與處理狀態。 | 

## 在 Python 中建立預處理 Lambda 函數
<a name="lambda-preprocessing-functions-python"></a>

主控台提供在 Python 中建立預處理 Lambda 函數的下列範本：


| Lambda 藍圖 | 語言與版本 | Description | 
| --- | --- | --- | 
| 一般 Kinesis Analytics 輸入處理  | Python 2.7 |  Kinesis Data Analytics 記錄預處理器，可接收 JSON 或 CSV 記錄做為輸入，然後傳回處理狀態。使用此處理器作為自訂轉換邏輯的起點。  | 
| KPL 輸入處理 | Python 2.7 | 接收 Kinesis 生產者程式庫 (KPL) 的 Kinesis Data Analytics 記錄處理器，彙總 JSON 或 CSV 記錄作為輸入，並傳回具有處理狀態的分解記錄。 | 

## 在 Java 中建立預處理 Lambda 函數
<a name="lambda-preprocessing-functions-java"></a>

如要在 Java 中建立預處理紀錄的 Lambda 函數，請使用 [Java 事件](https://github.com/aws/aws-lambda-java-libs/tree/master/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events)類別。

下列程式碼會示範使用 Java，且可預處理紀錄的範例 Lambda 函數：

```
public class LambdaFunctionHandler implements
        RequestHandler<KinesisAnalyticsStreamsInputPreprocessingEvent, KinesisAnalyticsInputPreprocessingResponse> {

    @Override
    public KinesisAnalyticsInputPreprocessingResponse handleRequest(
            KinesisAnalyticsStreamsInputPreprocessingEvent event, Context context) {
        context.getLogger().log("InvocatonId is : " + event.invocationId);
        context.getLogger().log("StreamArn is : " + event.streamArn);
        context.getLogger().log("ApplicationArn is : " + event.applicationArn);

        List<KinesisAnalyticsInputPreprocessingResponse.Record> records = new ArrayList<KinesisAnalyticsInputPreprocessingResponse.Record>();
        KinesisAnalyticsInputPreprocessingResponse response = new KinesisAnalyticsInputPreprocessingResponse(records);

        event.records.stream().forEach(record -> {
            context.getLogger().log("recordId is : " + record.recordId);
            context.getLogger().log("record aat is :" + record.kinesisStreamRecordMetadata.approximateArrivalTimestamp);
             // Add your record.data pre-processing logic here.                               
            // response.records.add(new Record(record.recordId, KinesisAnalyticsInputPreprocessingResult.Ok, <preprocessedrecordData>));
        });
        return response;
    }

}
```

## 在 .NET 中建立預處理 Lambda 函數
<a name="lambda-preprocessing-functions-net"></a>

如要在 .NET 中建立預處理紀錄的 Lambda 函數，請使用 [.NET 事件](https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents)類別。

下列程式碼會示範使用 C\$1，且可預處理紀錄的範例 Lambda 函數：

```
public class Function
    {
        public KinesisAnalyticsInputPreprocessingResponse FunctionHandler(KinesisAnalyticsStreamsInputPreprocessingEvent evnt, ILambdaContext context)
        {
            context.Logger.LogLine($"InvocationId: {evnt.InvocationId}");
            context.Logger.LogLine($"StreamArn: {evnt.StreamArn}");
            context.Logger.LogLine($"ApplicationArn: {evnt.ApplicationArn}");

            var response = new KinesisAnalyticsInputPreprocessingResponse
            {
                Records = new List<KinesisAnalyticsInputPreprocessingResponse.Record>()
            };

            foreach (var record in evnt.Records)
            {
                context.Logger.LogLine($"\tRecordId: {record.RecordId}");
                context.Logger.LogLine($"\tShardId: {record.RecordMetadata.ShardId}");
                context.Logger.LogLine($"\tPartitionKey: {record.RecordMetadata.PartitionKey}");
                context.Logger.LogLine($"\tRecord ApproximateArrivalTime: {record.RecordMetadata.ApproximateArrivalTimestamp}");
                context.Logger.LogLine($"\tData: {record.DecodeData()}");

                // Add your record preprocessig logic here.

                var preprocessedRecord = new KinesisAnalyticsInputPreprocessingResponse.Record
                {
                    RecordId = record.RecordId,
                    Result = KinesisAnalyticsInputPreprocessingResponse.OK
                };
                preprocessedRecord.EncodeData(record.DecodeData().ToUpperInvariant());
                response.Records.Add(preprocessedRecord);
            }
            return response;
        }
    }
```

如需在 .NET 中建立用於預處理和目的地的 Lambda 函數詳細資訊，請參閱 [https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents](https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents)。

# 平行化輸入串流以提高輸送量
<a name="input-parallelism"></a>

**注意**  
2023 年 9 月 12 日之後，如果尚未使用 Kinesis Data Analytics for SQL，您將無法使用 Kinesis Data Firehose 做為建立新應用程式的來源。如需詳細資訊，請參閱[限制](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html)。

Amazon Kinesis Data Analytics 應用程式可支援多個應用程式內輸入串流，將應用程式擴展到超越單一應用程式內輸入串流的輸送量。如需應用程式內輸入串流的詳細資訊，請參閱 [Amazon Kinesis Data Analytics for SQL 應用程式：運作方式](how-it-works.md)。

在幾乎所有情況下，Amazon Kinesis Data Analytics 都會擴展您的應用程式，以處理饋送至應用程式的 Kinesis 串流或 Firehose 來源串流的容量。不過，如果來源串流的輸送量超過單一應用程式內輸入串流的輸送量，您可以明確增加應用程式使用的應用程式內輸入串流數目。您可以使用 `InputParallelism` 參數來執行此操作。

當 `InputParallelism` 參數大於一時，Amazon Kinesis Data Analytics 會在應用程式內串流中平均分割來源串流的分割區。舉例來說，如果來源串流有 50 個碎片，且您設定 `InputParallelism` 為 `2`，則每個應用程式內輸入串流都會接收來自 25 個來源串流碎片的輸入。

增加應用程式內串流的數量時，您的應用程式必須明確存取每個串流中的資料。如需在程式碼中存取多個應用程式內串流的相關資訊，請參閱 [在 Amazon Kinesis Data Analytics 應用程式中存取個別應用程式內串流](#input-parallelism-code-example)。

雖然 Kinesis Data Streams 和 Firehose 串流碎片在應用程式內串流之間以相同的方式分割，但它們在您的應用程式上顯示的方式有所不同：
+ Kinesis 資料串流中的記錄包含 `shard_id` 欄位，可用來識別記錄來源碎片。
+ Firehose 交付串流中的記錄不包含可識別記錄來源碎片或分割區的欄位。這是因為 Firehose 會將此資訊從您的應用程式抽象化。

## 評估是否增加應用程式內輸入串流的數量
<a name="input-parallelism-evaluating"></a>

在大多數情況下，單一應用程式內輸入串流可以處理單一來源串流的輸送量，視輸入串流的複雜性和資料大小而定。若要判斷是否需要增加應用程式內輸入串流的數量，您可以在 Amazon CloudWatch 中監控 `InputBytes` 和 `MillisBehindLatest` 指標。

如果 `InputBytes` 指標大於每秒 100 MB (或者您預期它會大於此速率)，這可能會導致 `MillisBehindLatest` 提高，並增加應用程式問題的影響。為了解決這個問題，我們建議您為應用程式選擇下列語言：
+ 如果應用程式的擴展需求超過每秒 100 MB，請針對 SQL 應用程式使用多個串流和 Kinesis Data Analytics。
+ 如果您想要使用單一串流和應用程式，請使用[Kinesis Data Analytics for Java 應用程式](/managed-flink/latest/java/what-is.html)。

如果 `MillisBehindLatest` 指標具有下列任一特性，則應增加應用程式的 `InputParallelism` 設定：
+ `MillisBehindLatest` 指標逐漸增加，表示您的應用程式逐漸落後串流中的最新資料。
+ `MillisBehindLatest` 指標一直高於 1000 (一秒)。

如果符合下列條件，就不需要增加應用程式的 `InputParallelism` 設定：
+ `MillisBehindLatest` 指標逐漸減少，表示您的應用程式逐漸趕上串流中的最新資料。
+ `MillisBehindLatest` 指標低於 1000 (一秒)。

如需有關 CloudWatch 的詳細資訊，請參閱 [CloudWatch 使用者指南](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/)。

## 實作多個應用程式內輸入串流
<a name="input-parallelism-implementing"></a>

在使用 [CreateApplication](API_CreateApplication.md) 建立應用程式時，您可以設定應用程式內輸入串流的數目。您可以使用 [UpdateApplication](API_UpdateApplication.md) 在建立應用程式後設定此數字。

**注意**  
您只能使用 Amazon Kinesis Data Analytics API 或 AWS CLI來做出 `InputParallelism` 設定。您無法使用 設定此設定 AWS 管理主控台。如需設定 的資訊 AWS CLI，請參閱 [步驟 2：設定 AWS Command Line Interface (AWS CLI)](setup-awscli.md)。

### 設定新應用程式的輸入串流計數
<a name="input-parallelism-implementing-create"></a>

以下範例示範如何使用 `CreateApplication` API 動作，將新的應用程式輸入串流計數設定為 2。

如需 `CreateApplication` 的相關資訊，請參閱 [CreateApplication](API_CreateApplication.md)。

```
{
   "ApplicationCode": "<The SQL code the new application will run on the input stream>",
   "ApplicationDescription": "<A friendly description for the new application>",
   "ApplicationName": "<The name for the new application>",
   "Inputs": [ 
    { 
      "InputId": "ID for the new input stream",
      "InputParallelism": { 
        "Count": 2
    }],
   "Outputs": [ ... ],
	}]
}
```

### 設定現有應用程式的輸入串流計數
<a name="input-parallelism-implementing-update"></a>

以下範例示範如何使用 `UpdateApplication` API 動作，將現有應用程式輸入串流計數設定為 2。

如需 `Update_Application` 的相關資訊，請參閱 [UpdateApplication](API_UpdateApplication.md)。

```
{
   "InputUpdates": [ 
      { 
         "InputId": "yourInputId",
         "InputParallelismUpdate": { 
            "CountUpdate": 2
         }
      }
   ],
}
```

## 在 Amazon Kinesis Data Analytics 應用程式中存取個別應用程式內串流
<a name="input-parallelism-code-example"></a>

若要在應用程式中使用多個應用程式內輸入串流，您必須從不同的串流中明確選取。下列程式碼範例，示範了如何在入門教學課程建立的應用程式中查詢多個輸入串流。

在下列範例中，會先使用 [COUNT](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-count.html) 彙總每個來源資料流，然後再合併到名為 `in_application_stream001` 的單一應用程式內串流。事先彙總來源串流，有助於確保合併的應用程式內串流可以處理來自多個串流的流量，而不會超載。

**注意**  
若要執行此範例並從應用程式內輸入串流取得結果，請同時更新來源串流中的碎片數目，和應用程式中的 `InputParallelism` 參數。

```
CREATE OR REPLACE STREAM in_application_stream_001 (
    ticker VARCHAR(64),
    ticker_count INTEGER
);

CREATE OR REPLACE PUMP pump001 AS 
INSERT INTO in_application_stream_001
SELECT STREAM ticker_symbol, COUNT(ticker_symbol)
FROM source_sql_stream_001
GROUP BY STEP(source_sql_stream_001.rowtime BY INTERVAL '60' SECOND),
    ticker_symbol; 
        
CREATE OR REPLACE PUMP pump002 AS 
INSERT INTO in_application_stream_001
SELECT STREAM ticker_symbol, COUNT(ticker_symbol)
FROM source_sql_stream_002
GROUP BY STEP(source_sql_stream_002.rowtime BY INTERVAL '60' SECOND),
    ticker_symbol;
```

上述程式碼範例會在 `in_application_stream001` 產生輸出，類似下列所示：

![\[Table showing ROWTIME, TICKER, and TICKER_COUNT columns with sample data entries.\]](http://docs.aws.amazon.com/zh_tw/kinesisanalytics/latest/dev/images/input-parallelism-results.png)


## 其他考量
<a name="input-parallelism-considerations"></a>

當您使用多個輸入串流時，請注意下列事項：
+ 應用程式內輸入串流數量上限為 64 個。
+ 應用程式內的輸入串流，會平均分佈在應用程式輸入串流的碎片之間。
+ 新增應用程式內串流所帶來的效能不會線性擴展。也就是說，加倍應用程式內串流的數量並不會讓輸送量增加一倍。使用典型的資料列大小，每個應用程式內串流可達到每秒約 5,000 至 15,000 個資料列的輸送量。將應用程式內串流計數增加到 10，您可以達到每秒 20,000 到 30,000 個資料列的輸送量。輸送量速度取決於輸入串流中欄位的計數、資料類型和資料大小。
+ 當套用至分割成不同碎片的輸入資料流時，某些彙總函式 (例如 [AVG](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-avg.html)) 可能會產生非預期的結果。因為您必須先在個別碎片上執行彙總作業，然後再將它們合併到彙總串流中，因此結果可能會加權為包含更多記錄的任何資料流。
+ 如果在增加輸入串流數量後，您的應用程式在持續效能不佳 (反映在高 `MillisBehindLatest` 指標)，您可能已達到 Kinesis 處理單元 (KPU) 的上限。如需詳細資訊，請參閱[自動擴展應用程式以增加輸送量](how-it-works-autoscaling.md)。