

慎重に検討した結果、Amazon Kinesis Data Analytics for SQL アプリケーションを中止することにしました。

1. **2025 年 9 月 1** 日以降、Amazon Kinesis Data Analytics for SQL アプリケーションのバグ修正は提供されません。これは、今後の廃止によりサポートが制限されるためです。

2. **2025 年 10 月 15** 日以降、新しい Kinesis Data Analytics for SQL アプリケーションを作成することはできません。

3. **2026 年 1 月 27 日**以降、アプリケーションは削除されます。Amazon Kinesis Data Analytics for SQL アプリケーションを起動することも操作することもできなくなります。これ以降、Amazon Kinesis Data Analytics for SQL のサポートは終了します。詳細については、「[Amazon Kinesis Data Analytics for SQL アプリケーションのサポート終了](discontinuation.md)」を参照してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon Kinesis Data Analytics for SQL Applications: 仕組み
<a name="how-it-works"></a>

**注記**  
2023 年 9 月 12 日以降、Kinesis Data Analytics for SQL をまだ使用していない場合、Kinesis Data Firehose をソースとして使用して新しいアプリケーションを作成することはできません。詳細については、「[制限](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html)」を参照してください。

アプリケーションは、アカウントで作成できる Amazon Kinesis Data Analytics のプライマリリソースです。 AWS マネジメントコンソール または Kinesis Data Analytics API を使用して、アプリケーションを作成および管理できます。Kinesis Data Analytics は、アプリケーションを管理するための API オペレーションを提供しています。API オペレーションのリストについては、「[アクション](API_Operations.md)」を参照してください。

Kinesis Data Analytics アプリケーションは、ストリーミングデータをリアルタイムで継続的に読み取り処理します。SQL を使用してアプリケーションコードを記述し、受信ストリーミングデータを処理して出力を生成します。こうすることで、Kinesis Data Analytics が出力を設定された宛先に書き込みます。次の図は、一般的なアプリケーションのアーキテクチャーです。

![\[データ分析アプリケーション、ストリーミング入力ソース、リファレンスデータ、およびアプリケーション出力を示す図。\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/images/kinesis-app.png)


各アプリケーションには、名前、説明、バージョン ID、ステータスがあります。Amazon Kinesis Data Analytics は、最初にアプリケーションを作成するときに、バージョン ID を割り当てます。このバージョン ID は、アプリケーション設定の更新時に更新されます。例えば、入力設定の追加、リファレンスデータソースの追加または削除、または出力設定の追加または削除、またはアプリケーションコードの更新などの際に、Kinesis Data Analytics によって現在のアプリケーションバージョン ID が更新されます。また、Kinesis Data Analytics はアプリケーションの作成時および最終更新時のタイムスタンプも保持します。

これらの基本プロパティに加えて、各アプリケーションは、以下で構成されます。
+ **入力** – アプリケーションのストリーミングソース。ストリーミングソースとして、Kinesis データストリームと Firehose データ配信ストリームのいずれかを選択できます。入力設定で、アプリケーション内入力ストリームにストリーミングソースをマッピングします。アプリケーション内ストリームは、`SELECT` および `INSERT SQL` オペレーションを実行できる、継続的に更新されるテーブルのようなものです。アプリケーションコードで、中間クエリ結果を保存するための追加のアプリケーション内ストリームを作成することもできます。

   

  オプションで、スループットを向上させるために単一のソースストリーミングを複数のアプリケーション内入力ストリームに分割できます。詳細については、「[制限](limits.md)」および「[アプリケーション入力の設定](how-it-works-input.md)」を参照してください。

   

  Amazon Kinesis Data Analytics は、各アプリケーション内ストリームに [タイムスタンプと ROWTIME 列](timestamps-rowtime-concepts.md) というタイムスタンプ列を提供します。この列は時間ベースウィンドウのクエリで使用できます。詳細については、「[ウィンドウクエリ](windowed-sql.md)」を参照してください。

   

  オプションでリファレンスデータソースを設定してアプリケーション内の入力データストリームを強化できます。これはアプリケーション内リファレンステーブルになります。リファレンスデータは S3 バケット内のオブジェクトとして保存する必要があります。アプリケーションが起動すると、Amazon Kinesis Data Analytics は Amazon S3 オブジェクトを読み取り、アプリケーション内テーブルを作成します。詳細については、「[アプリケーション入力の設定](how-it-works-input.md)」を参照してください。

   
+ **アプリケーションコード** – 入力を処理し出力を生成する一連の SQL ステートメントです。アプリケーション内ストリームおよびリファレンステーブルに対して SQL ステートメントを書くことができます。また、JOIN クエリを作成してこれらのソース両方からのデータを結合できます。

   

  Kinesis Data Analytics でサポートされている SQL 言語要素の詳細については、「[Amazon Kinesis Data Analytics SQL Reference](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/analytics-sql-reference.html)」を参照してください。

   

  最もシンプルな形式のアプリケーションコードは、ストリーミング入力から選択して結果をストリーミング出力に挿入する単一の SQL ステートメントになります。また、1 つのフィードを出力して次の SQL ステートメントに入力する一連の SQL ステートメントになることもあります。さらに、入力ストリームを複数のストリームに分割するためのアプリケーションコードを書くことができます。その後、これらのストリームを処理するために追加のクエリを適用できます。詳細については、「[アプリケーションコード](how-it-works-app-code.md)」を参照してください。

   
+ **出力** – アプリケーションコードでは、クエリ結果はアプリケーション内ストリームに入力されます。アプリケーションコードでは、中間結果を保存する 1 つ以上のアプリケーション内ストリームを作成することもできます。その後、オプションでアプリケーション出力を設定してアプリケーション内ストリームのデータを永続化し、アプリケーション出力 (アプリケーション内出力ストリームともいいます) を外部宛先に保持します。外部宛先には、Firehose 配信ストリームまたは Kinesis データストリームを使用できます。これらの宛先について、次の点に注意してください。
  + Firehose 配信ストリームは、結果を Amazon S3、Amazon Redshift、または Amazon OpenSearch Service (OpenSearch Service) に書き込むように設定できます。

     
  + Amazon S3 や Amazon Redshift ではなく、カスタム宛先に出力するようにアプリケーションを作成することもできます。そのためには、出力環境設定で出力先として Kinesis データストリームを指定します。次に、ストリームをポーリングして Lambda 関数を呼び出す AWS Lambda ように を設定します。Lambda 関数コードはストリームデータを入力として受け取ります。Lambda 関数コードで、入力データをカスタム宛先に書き込むことができます。詳細については、[Amazon Kinesis Data Analytics AWS Lambda での の使用](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html)」を参照してください。

  詳細については、「[アプリケーション出力の設定](how-it-works-output.md)」を参照してください。

以下の点にも注意してください。
+ Amazon Kinesis Data Analytics には、ストリーミングソースからレコードを読み取り、アプリケーション出力を外部宛先に書き込むためのアクセス権限が必要です。IAM ロールを使用してこれらのアクセス権限を付与します。

   
+ Kinesis Data Analytics は自動的に各アプリケーションにアプリケーション内エラーストリームを提供します。特定のレコードの処理中にアプリケーションで問題 (たとえばタイプの不一致や到着の遅延など) が発生した場合、そのレコードはエラーストリームに書き込まれます。あとで評価するために、アプリケーション出力を設定して、Kinesis Data Analytics がエラーストリームデータを外部宛先で永続化するようにできます。詳細については、「[エラー処理](error-handling.md)」を参照してください。

   
+ Amazon Kinesis Data Analytics は、アプリケーション出力レコードを設定された宛先に確実に書き込みます。アプリケーションが中断された場合ても、配信モデルでは「少なくとも 1 回」処理を使用します。詳細については、「[アプリケーション出力を外部宛先で永続化する配信モデル](failover-checkpoint.md)」を参照してください。

**Topics**
+ [アプリケーション入力の設定](how-it-works-input.md)
+ [アプリケーションコード](how-it-works-app-code.md)
+ [アプリケーション出力の設定](how-it-works-output.md)
+ [エラー処理](error-handling.md)
+ [アプリケーションを自動的にスケーリングしてスループットを向上させる](how-it-works-autoscaling.md)
+ [タグ付けの使用](how-tagging.md)

# アプリケーション入力の設定
<a name="how-it-works-input"></a>

Amazon Kinesis Data Analytics アプリケーションでは、1 つのストリーミングソースから入力を受け取ることができます。また、オプションで 1 つのリファレンスデータソースを使用できます。詳細については、「[Amazon Kinesis Data Analytics for SQL Applications: 仕組み](how-it-works.md)」を参照してください。このトピックのセクションでは、アプリケーション入力ソースについて説明します。

**Topics**
+ [ストリーミングソースの設定](#source-streaming)
+ [リファレンスソースの設定](#source-reference)
+ [JSONPath の操作](about-json-path.md)
+ [SQL 入力列へのストリーミングソース要素のマッピング](sch-mapping.md)
+ [ストリーミングデータのスキーマ検出機能の使用](sch-dis.md)
+ [静的データに対するスキーマ検出機能の使用](sch-dis-ref.md)
+ [Lambda 関数を使用したデータの事前処理](lambda-preprocessing.md)
+ [スループットの増加に合わせた入力ストリームの並列処理](input-parallelism.md)

## ストリーミングソースの設定
<a name="source-streaming"></a>

アプリケーションを作成するときに、ストリーミングソースを指定します。アプリケーションを作成した後に入力を変更することもできます。Amazon Kinesis Data Analytics では、アプリケーションに対して以下のストリーミングソースがサポートされています。
+ Kinesis データストリーム 
+ Firehose 配信ストリーム

**注記**  
2023 年 9 月 12 日以降、Kinesis Data Analytics for SQL をまだ使用していない場合、Kinesis Data Firehose をソースとして使用して新しいアプリケーションを作成することはできません。`KinesisFirehoseInput` と一緒に Kinesis Data Analytics for SQL アプリケーションを使用している既存のお客様は、Kinesis Data Analytics を使用して既存のアカウント内で、引き続き `KinesisFirehoseInput` でアプリケーションを追加できます。既存のお客様で、`KinesisFirehoseInput` と一緒に Kinesis Data Analytics for SQL アプリケーションを使用して新規アカウントを作成される場合は、サービス制限拡大フォームを通してケースを作成できます。詳細については、[AWS サポート センター](https://console.aws.amazon.com/support/home#/)を参照してください。新しいアプリケーションを本番環境に移行する前に、必ずテストすることをお勧めします。

**注記**  
Kinesis データストリームが暗号化されている場合、Kinesis Data Analytics は暗号化されたストリームのデータにシームレスにアクセスし、それ以上の設定は必要ありません。Kinesis Data Analytics では、Kinesis Data Streams から読み取った暗号化されていないデータは保存されません。詳細については、「[Kinesis Data Streams 用のサーバー側の暗号化とは](https://docs.aws.amazon.com/streams/latest/dev/what-is-sse.html)」を参照してください。

Kinesis Data Analytics は、ストリーミングソースに新しいデータがあるか継続的にポーリングし、入力設定に応じてアプリケーション内ストリームに取り込みます。

**注記**  
アプリケーションの入力として Kinesis ストリームを追加しても、ストリーム内のデータには影響を与えません。同じ Kinesis ストリームに Firehose 配信ストリームなどの別のリソースもアクセスした場合、Firehose 配信ストリームと Kinesis Data Analytics アプリケーションの両方が同じデータを受け取ります。ただし、スループットとスロットリングは影響を受ける可能性があります。

アプリケーションコードは、アプリケーション内ストリームをクエリできます。入力設定の一部として以下を指定します。
+ **ストリーミングソース** – ストリームの Amazon リソースネーム (ARN) と、Kinesis Data Analytics がユーザーに代わってストリームにアクセスするために引き受けることができる IAM ロールを指定します。
+ **アプリケーション内ストリーム名のプレフィックス** – アプリケーションを起動すると、Kinesis Data Analytics によって指定されたアプリケーション内ストリームが作成されます。この名前を使用して、アプリケーションコードでアプリケーション内ストリームにアクセスします。

  オプションで、ストリーミングリソースを複数のアプリケーション内ストリームにマッピングできます。詳細については、「[制限](limits.md)」を参照してください。この場合、Amazon Kinesis Data Analytics は、*prefix*`_001`、*prefix*`_002`、*prefix*`_003` などの名前で、アプリケーション内ストリームを指定された数だけ作成します。デフォルトでは、Kinesis Data Analytics はストリーミングソースを *prefix*`_001` という名前の 1 つのアプリケーション内ストリームにマッピングします。

  アプリケーション内ストリームに挿入できる行は、速度の制限があります。そのため、Kinesis Data Analytics では複数のアプリケーション内ストリームをサポートして、より高速にアプリケーションにレコードを届けます。アプリケーションがストリーミングソースのデータをアップし続けることができない場合は、並列処理ユニットを追加してパフォーマンスを向上させることができます。
+ **マッピングスキーマ** – ストリーミングソースでのレコード形式 (JSON、CSV) を指定します。また、ストリームの各レコードが、作成されるアプリケーション内ストリーム内の列にマッピングされる方法も指定します。ここで、列名とデータ型を指定します。

**注記**  
Kinesis Data Analytics は、入力アプリケーション内ストリームの作成時に、識別子 (ストリーム名および列名) に引用符を追加します。このストリームと列をクエリする場合は、完全一致 (大文字と小文字が正確に一致) を使用して引用符内を指定する必要があります。識別子の詳細については、「Amazon Managed Service for Apache Flink SQL リファレンス」の「[Identifiers](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-identifiers.html)」を参照してください。

Amazon Kinesis Data Analytics コンソールでアプリケーションを作成し、入力を設定できます。その後、コンソールは必要な API コールを行います。アプリケーション入力の設定は、新しいアプリケーション API の作成時、または既存のアプリケーションに入力設定を追加するときに行うことができます。詳細については、「[CreateApplication](API_CreateApplication.md)」および「[AddApplicationInput](API_AddApplicationInput.md)」を参照してください。以下は `Createapplication` API リクエストボディの入力設定部分です。

```
 "Inputs": [
        {
            "InputSchema": {
                "RecordColumns": [
                    {
                        "Mapping": "string",
                        "Name": "string",
                        "SqlType": "string"
                    }
                ],
                "RecordEncoding": "string",
                "RecordFormat": {
                    "MappingParameters": {
                        "CSVMappingParameters": {
                            "RecordColumnDelimiter": "string",
                            "RecordRowDelimiter": "string"
                        },
                        "JSONMappingParameters": {
                            "RecordRowPath": "string"
                        }
                    },
                    "RecordFormatType": "string"
                }
            },
            "KinesisFirehoseInput": {
                "ResourceARN": "string",
                "RoleARN": "string"
            },
            "KinesisStreamsInput": {
                "ResourceARN": "string",
                "RoleARN": "string"
            },
            "Name": "string"
        }
    ]
```

## リファレンスソースの設定
<a name="source-reference"></a>

オプションで、既存のアプリケーションにリファレンスデータソースを追加して、ストリーミングソースから送信されるデータを強化することもできます。リファレンスデータは Amazon S3 バケット内のオブジェクトとして保存する必要があります。アプリケーションが起動すると、Amazon Kinesis Data Analytics は Amazon S3 オブジェクトを読み取り、アプリケーション内リファレンステーブルを作成します。その後、アプリケーションコードでこれをアプリケーション内ストリームと結合できます。

サポートされている形式 (CSV、JSON) で、Amazon S3 オブジェクトにリファレンスデータを保存します。たとえば、アプリケーションで株注文を分析すると仮定します。ストリーミングソースには次に示すレコード形式があるとします。

```
Ticker, SalePrice, OrderId

AMZN     $700        1003
XYZ      $250        1004
...
```

この場合、会社名などの詳細を株価ティッカーに提供するリファレンスデータソースを保持することを検討する場合があります。

```
Ticker, Company
AMZN, Amazon
XYZ, SomeCompany
...
```

アプリケーションのリファレンスデータソースは、API またはコンソールで追加できます。Amazon Kinesis Data Analytics では、以下の API アクションで、リファレンスデータソースを管理します。
+  [AddApplicationReferenceDataSource](API_AddApplicationReferenceDataSource.md)
+ [UpdateApplication](API_UpdateApplication.md)

コンソールを使用してリファレンスデータを追加する方法の詳細については、「[例: Kinesis Data Analytics アプリケーションにリファレンスデータを追加する](app-add-reference-data.md)」を参照してください。

次の点に注意してください。
+ アプリケーションが実行されている場合、Kinesis Data Analytics はアプリケーション内リファレンステーブルを作成し、ただちにリファレンスデータをロードします。
+ アプリケーションが実行されていない (例えば、準備完了状態など) 場合、Kinesis Data Analytics は更新された入力設定を保存するだけです。アプリケーションの実行が始まると、Kinesis Data Analytics はリファレンスデータをテーブルとしてアプリケーションにロードします。

Kinesis Data Analytics がアプリケーション内リファレンステーブルを作成した後で、データを更新するとします。Amazon S3 オブジェクトを更新したり、別の Amazon S3 オブジェクトを使用したいという場合があるかもしれません。この場合は、[UpdateApplication](API_UpdateApplication.md) を明示的に呼び出すか、コンソールで [**アクション**]、[**リファレンスデータテーブルを同期**] の順に選択します。Kinesis Data Analytics では、アプリケーション内リファレンステーブルは自動的に更新されません。

リファレンスデータソースとして作成できる Amazon S3 オブジェクトには、サイズの制限があります。詳細については、「[制限](limits.md)」を参照してください。オブジェクトのサイズが制限を超えた場合には、Kinesis Data Analytics でデータをロードできません。アプリケーションの状態は実行中と表示されますが、データが読み込まれません。

リファレンスデータソースを追加する場合は、次の情報を指定します。
+ **S3 バケットおよびオブジェクトキー名** – バケット名およびオブジェクトキーに加えて、ユーザーの代わりにオブジェクトを読み取るために Kinesis Data Analytics が引き受けることができる IAM ロールも指定します。
+ **アプリケーション内リファレンステーブル名** – Kinesis Data Analytics がこのアプリケーション内テーブルを作成し、Amazon S3 オブジェクトを読み取ってそこに入力します。これは、アプリケーションコードで指定するテーブルの名前です。
+ **マッピングスキーマ** – レコード形式 (JSON, CSV)、Amazon S3 オブジェクトに保存されたデータのエンコードを記述します。また、各データ要素がどのようにアプリケーション内リファレンステーブルにマッピングされるかも記述します。

以下に、`AddApplicationReferenceDataSource` API リクエストの本文を示します。

```
{
    "applicationName": "string",
    "CurrentapplicationVersionId": number,
    "ReferenceDataSource": {
        "ReferenceSchema": {
            "RecordColumns": [
                {
                    "IsDropped": boolean,
                    "Mapping": "string",
                    "Name": "string",
                    "SqlType": "string"
                }
            ],
            "RecordEncoding": "string",
            "RecordFormat": {
                "MappingParameters": {
                    "CSVMappingParameters": {
                        "RecordColumnDelimiter": "string",
                        "RecordRowDelimiter": "string"
                    },
                    "JSONMappingParameters": {
                        "RecordRowPath": "string"
                    }
                },
                "RecordFormatType": "string"
            }
        },
        "S3ReferenceDataSource": {
            "BucketARN": "string",
            "FileKey": "string",
            "ReferenceRoleARN": "string"
        },
        "TableName": "string"
    }
}
```

# JSONPath の操作
<a name="about-json-path"></a>

**注記**  
2023 年 9 月 12 日以降、Kinesis Data Analytics for SQL をまだ使用していない場合、Kinesis Data Firehose をソースとして使用して新しいアプリケーションを作成することはできません。詳細については、「[制限](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html)」を参照してください。

JSONPath は、JSON オブジェクトの要素をクエリする標準化された方法です。JSONPath はパス式を使用して、JSON ドキュメントの要素、入れ子要素、配列に移動します。JSON の詳細については、「[JSON の入門](http://www.json.org/)」を参照してください。

Amazon Kinesis Data Analytics はアプリケーションのソーススキーマで JSONPath 式を使用して、JSON 形式のデータがあるストリーミングソースのデータ要素を識別します。

アプリケーションの入力ストリームにストリーミングデータをマッピングする方法の詳細については、「[SQL 入力列へのストリーミングソース要素のマッピング](sch-mapping.md)」を参照してください。

## JSONPath を使用した JSON 要素へのアクセス
<a name="about-json-path-elements"></a>

JSONPath 式を使用して JSON 形式のさまざまなデータにアクセスする方法について説明します。このセクションの例では、ソースストリームに次の JSON レコードが含まれていると想定します。

```
{
  "customerName":"John Doe",
  "address":
  {
    "streetAddress":
    [
      "number":"123",
      "street":"AnyStreet"
    ],
    "city":"Anytown"
  }
  "orders":
  [
    { "orderId":"23284", "itemName":"Widget", "itemPrice":"33.99" },
    { "orderId":"63122", "itemName":"Gadget", "itemPrice":"22.50" },
    { "orderId":"77284", "itemName":"Sprocket", "itemPrice":"12.00" }
  ]
}
```

### JSON 要素へのアクセス
<a name="about-json-path-firstlevel"></a>

JSONPath を使用して JSON データの要素をクエリするには、次の構文を使用します。ここで、`$` はデータ階層のルート、`elementName` はクエリを実行する要素ノードの名前を表します。

```
$.elementName
```

次の式では、前述の JSON の例の `customerName` 要素をクエリします。

```
$.customerName
```

前述の式では、前述の JSON のレコードから次のように返ります。

```
John Doe
```

**注記**  
Path 式では、大文字と小文字が区別されます。式 (`$.customername`) では、前述の JSON の例から `null` が返ります。

**注記**  
パス式で指定した場所に要素が表示されない場合、式は `null` を返します。次の式の例では、前述の JSON の例から `null` が返ります。これは一致する要素がないためです。  

```
$.customerId
```

### ネストされた JSON 要素へのアクセス
<a name="about-json-path-nested"></a>

ネストされた JSON 要素をクエリするには、次の構文を使用します。

```
$.parentElement.element
```

次の式では、前述の JSON の例の `city` 要素をクエリします。

```
$.address.city
```

前述の式では、前述の JSON のレコードから次のように返ります。

```
Anytown
```

以下の構文を使用して、深いレベルのサブ要素をクエリすることができます。

```
$.parentElement.element.subElement
```

次の式では、前述の JSON の例の `street` 要素をクエリします。

```
$.address.streetAddress.street
```

前述の式では、前述の JSON のレコードから次のように返ります。

```
AnyStreet
```

### 配列へのアクセス
<a name="about-json-path-arrays"></a>

JSON 配列内のデータにアクセスするには、以下の方法があります。
+ 配列内のすべての要素を単一の行として取得します。
+ 配列内の各要素を別々の行として取得します。

#### 配列内のすべての要素を単一の行として取得する
<a name="about-json-path-arrays-row"></a>

配列のコンテンツ全体を単一の行としてクエリを実行するには、次の構文を使用します。

```
$.arrayObject[0:]
```

次の式では、このセクションで使用した前述の JSON の例の `orders` 要素をクエリします。配列の内容は、単一行の 1 つの列で返ります。

```
$.orders[0:]
```

前述の式は、このセクションで使用した JSON レコード例から次を返します。

```
[{"orderId":"23284","itemName":"Widget","itemPrice":"33.99"},{"orderId":"61322","itemName":"Gadget","itemPrice":"22.50"},{"orderId":"77284","itemName":"Sprocket","itemPrice":"12.00"}]
```

#### 配列内のすべての要素を別々の行として取得する
<a name="about-json-path-arrays-separate"></a>

配列内の各要素を別々の行としてクエリを実行するには、次の構文を使用します。

```
$.arrayObject[0:].element
```

次の式では、前述の JSON の例の `orderId` 要素をクエリし、配列内の各要素は別々の行で返ります。

```
$.orders[0:].orderId
```

前述の例では、前述の JSON レコードから以下のように返ります。その際、各データ項目は別々の行で返ります。


****  

|  | 
| --- |
|  23284  | 
|  63122  | 
|  77284  | 

**注記**  
非配列要素をクエリする式が各配列要素をクエリするスキーマに含まれている場合、配列内の各要素に対して非配列要素が繰り返されます。たとえば、前述の JSON の例のスキーマに、次の式が含まれていると仮定します。  
\$1.customerName
\$1.orders[0:].orderId
この場合、サンプルの入力ストリーム要素から返されるデータ行は次のようになります。`orderId` 要素それぞれに対して、`name` 要素が繰り返されます。  


****  

|  |  | 
| --- |--- |
|  John Doe  |  23284  | 
|  John Doe  |  63122  | 
|  John Doe  |  77284  | 

**注記**  
Amazon Kinesis Data Analytics の配列式には、次の制限が適用されます。  
配列式でサポートされる参照解除レベルは 1 つのみです。次の式形式はサポートされていません。  

  ```
  $.arrayObject[0:].element[0:].subElement
  ```
スキーマにフラット化できる配列は 1 つのみです。複数の配列を参照する場合、配列のすべての要素を含む 1 つの行で返されます。ただし、個々の行として返る各要素には、1 つの配列のみ持つことができます。  
次の形式の要素を含むスキーマは有効です。この形式では、1 つの列として 2 番目の配列の内容が返り、最初の配列の各要素が繰り返されます。  

  ```
  $.arrayObjectOne[0:].element
  $.arrayObjectTwo[0:]
  ```
次の形式の要素を含むスキーマは無効です。  

  ```
  $.arrayObjectOne[0:].element
  $.arrayObjectTwo[0:].element
  ```

## その他の考慮事項
<a name="about-json-path-other"></a>

JSONPath を操作するには、他にも次のような考慮事項があります。
+ アプリケーションスキーマの JSONPath 式に各要素からアクセスされる配列がない場合、処理される JSON レコードごとに単一の行がアプリケーションの入力ストリームに作成されます。
+ 配列がフラット化された場合 (つまり、要素が個別の行として返される場合)、null 値となる欠落した要素はすべてアプリケーション内のストリームで作成されます。
+ 配列は常に少なくとも 1 つの行にフラット化されます。返される値がない場合 (つまり、配列が空またはクエリされている要素が存在しない)、すべての値が null で 1 つの行が返ります。

  次の式では、前述の JSON の例から null 値を含むレコードが返ります。これは、指定のパスに一致する要素がないためです。

  ```
  $.orders[0:].itemId
  ```

  前述の例では、前述の JSON のサンプルレコードから次のように返ります。  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/about-json-path.html)

## 関連トピック
<a name="about-json-path.Related"></a>
+ [JSON のご紹介](http://www.json.org/)

# SQL 入力列へのストリーミングソース要素のマッピング
<a name="sch-mapping"></a>

**注記**  
2023 年 9 月 12 日以降、Kinesis Data Analytics for SQL をまだ使用していない場合、Kinesis Data Firehose をソースとして使用して新しいアプリケーションを作成することはできません。詳細については、「[制限](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html)」を参照してください。

Amazon Kinesis Data Analytics で、標準 SQL を使用して JSON 形式または CSV 形式でストリーミングデータを処理および分析します。
+ ストリーミングする CSV データを処理して分析するには、入力ストリームの列に列名とデータ型を割り当てます。入力ストリームから列定義ごとに 1 つの列が順番にアプリケーションでインポートされます。

  アプリケーションの入力ストリームにすべての列を含める必要はありませんが、ソースストリームから列をスキップすることはできません。たとえば、5 つの要素を含む入力ストリームから最初の 3 つの列をインポートすることはできますが、列 1、2、4 のみインポートすることはできません。
+ ストリーミング JSON データを処理して分析するには、JSONPath 式を使用して、ストリーミングソースの JSON 要素を入力ストリームの SQL 列にマッピングします。Amazon Kinesis Data Analytics で JSONPath を使用する方法の詳細については、「[JSONPath の操作](about-json-path.md)」を参照してください。SQL テーブルの列は、JSON 型からマッピングされたデータ型です。サポートされているデータ型のリストについては、「[データ型](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-data-types.html)」を参照してください。JSON データを SQL データに変換する方法については、「[JSON データ型から SQL データ型へのマッピング](#sch-mapping-datatypes)」を参照してください。

入力ストリームの設定方法の詳細については、「[アプリケーション入力の設定](how-it-works-input.md)」を参照してください。

## SQL 列への JSON データのマッピング
<a name="sch-mapping-json"></a>

 AWS マネジメントコンソール または Kinesis Data Analytics API を使用して、JSON 要素を入力列にマッピングできます。
+ コンソールを使用して要素を列にマッピングするには、「[スキーマエディタの使用](console-summary-edit-schema.md)」を参照してください。
+ Kinesis Data Analytics API を使用して要素を列にマッピングするには、次のセクションを参照してください。

JSON 要素をアプリケーション内の入力ストリームの列にマッピングするには、スキーマで、以下の情報が各列に必要です。
+ **ソース式:** 列のデータの場所を識別する JSONPath 式。
+ **列名:** SQL クエリでデータの参照に使用する名前。
+ **データ型: **列の SQL データ型。

## API を使用する場合
<a name="sf-map-api"></a>

ストリーミングソースから入力列に要素をマッピングするには、Kinesis Data Analytics API ([CreateApplication](API_CreateApplication.md)) のアクションを使用できます。アプリケーション内ストリームを作成するには、SQL で使用するスキーマ化されたバージョンにデータを変換するスキーマを指定します。この [CreateApplication](API_CreateApplication.md) アクションでは、単一のストリーミングソースから入力を受信するようにアプリケーションを設定します。JSON 要素または CSV 列を SQL 列にマッピングするには、「[RecordColumn](API_RecordColumn.md)オブジェクト[SourceSchema](API_SourceSchema.md)」を `RecordColumns` 配列内に作成します。[RecordColumn](API_RecordColumn.md) オブジェクトには以下のスキーマがあります。

```
{ 
    "Mapping": "String",
    "Name": "String",
    "SqlType": "String"
}
```

[RecordColumn](API_RecordColumn.md) オブジェクトのフィールドには、次の値があります。
+ `Mapping`: JSONPath 式は、入力ストリームレコードのデータの場所を識別します。ソースストリームの入力スキーマで、この値は CSV 形式で表示されません。
+ `Name`: アプリケーション内 SQL データストリームの列名。
+ `SqlType`: アプリケーション内 SQL データストリームのデータのデータ型。

### JSON 入力スキーマの例
<a name="sf-map-api-json-example"></a>

次の例では、JSON スキーマの `InputSchema` の値の形式について説明します。

```
"InputSchema": {
    "RecordColumns": [
        {
            "SqlType": "VARCHAR(4)",
            "Name": "TICKER_SYMBOL",
            "Mapping": "$.TICKER_SYMBOL"
        },
        {
            "SqlType": "VARCHAR(16)",
            "Name": "SECTOR",
            "Mapping": "$.SECTOR"
        },
        {
            "SqlType": "TINYINT",
            "Name": "CHANGE",
            "Mapping": "$.CHANGE"
        },
        {
            "SqlType": "DECIMAL(5,2)",
            "Name": "PRICE",
            "Mapping": "$.PRICE"
        }
    ],
    "RecordFormat": {
        "MappingParameters": {
            "JSONMappingParameters": {
                "RecordRowPath": "$"
            }
        },
        "RecordFormatType": "JSON"
    },
    "RecordEncoding": "UTF-8"
}
```

### CSV 入力スキーマの例
<a name="sf-map-api-csv-example"></a>

次の例では、カンマ区切り (CSV) 形式の `InputSchema` の値の形式について説明します。

```
"InputSchema": {
    "RecordColumns": [
        {
            "SqlType": "VARCHAR(16)",
            "Name": "LastName"
        },
        {
            "SqlType": "VARCHAR(16)",
            "Name": "FirstName"
        },
        {
            "SqlType": "INTEGER",
             "Name": "CustomerId"
        }
    ],
    "RecordFormat": {
        "MappingParameters": {
            "CSVMappingParameters": {
                "RecordColumnDelimiter": ",",
                "RecordRowDelimiter": "\n"
            }
        },
        "RecordFormatType": "CSV"
    },
    "RecordEncoding": "UTF-8"
}
```

## JSON データ型から SQL データ型へのマッピング
<a name="sch-mapping-datatypes"></a>

JSON データ型は、アプリケーションの入力スキーマに基づき、対応する SQL データ型に変換されます。サポートされている SQL データ型の詳細については、「[データ型](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-data-types.html)」を参照してください。Amazon Kinesis Data Analytics では、次のルールに基づき、JSON データ型から SQL データ型に変換されています。

### Null リテラル
<a name="sch-mapping-datatypes-null"></a>

JSON 入力ストリームの null リテラル (例: `"City":null`) は、送信先のデータ型に関係なく、SQL null に変換されます。

### ブールリテラル
<a name="sch-mapping-datatypes-boolean"></a>

JSON 入力ストリームのブールリテラル (例: `"Contacted":true`) は、次のように SQL データに変換されます。
+ 数値型 (DECIMAL、INT など): `true` は 1、`false` は 0 に変換されます。
+ バイナリ (BINARY または VARBINARY): 
  + `true`: 結果には、設定されている最下位ビットとクリアされた残りのビットが表示されます。
  + `false`: 結果にはクリアされたビットがすべて表示されます。

  VARBINARY へ変換すると、1 バイトのデータ値になります。
+ BOOLEAN: 対応する SQL BOOLEAN 値に変換されます。
+ 文字型 (CHAR または VARCHAR): 対応する文字列値 (`true` または `false`) に変換されます。値は、フィールドの長さに合わせて切り捨てられます。
+ 日時型 (DATE、TIME、TIMESTAMP): 変換は失敗し、強制エラーがエラーストリームに書き込まれます。

### Number
<a name="sch-mapping-datatypes-number"></a>

JSON 入力ストリームの数値リテラル (例: `"CustomerId":67321`) は、次のように SQL データに変換されます。
+ 数値 (DECIMAL、INT など): 直接変換されます。変換後の値が、対象のデータ型 (つまり、`123.4` を INT に変換) のサイズまたは精度を超過した場合、変換は失敗し、強制エラーがエラーストリームに書き込まれます。
+ バイナリ (BINARY または VARBINARY): 変換は失敗し、強制エラーがエラーストリームに書き込まれます。
+ BOOLEAN: 
  + `0`: `false` に変換します。
  + 他のすべての数値: `true` に変換します。
+ 文字 (CHAR または VARCHAR): 数値の文字列表現に変換します。
+ 日時型 (DATE、TIME、TIMESTAMP): 変換は失敗し、強制エラーがエラーストリームに書き込まれます。

### String
<a name="sch-mapping-datatypes-string"></a>

JSON 入力ストリームの文字列値 (例: `"CustomerName":"John Doe"`) は、SQL データを次のように変換します。
+ 数値 (DECIMAL、INT など): Amazon Kinesis Data Analytics は値をターゲットデータ型に変換しようとします。値を変換できない場合、変換が失敗し、強制エラーがエラーストリームに書き込まれます。
+ バイナリ (BINARY または VARBINARY): ソース文字列が有効なバイナリリテラル (つまり、`X'3F67A23A'`、偶数 f) の場合、値は対象のデータ型に変換されます。それ以外の場合、変換は失敗し、強制エラーがエラーストリームに書き込まれます。
+ BOOLEAN: ソース文字列が `"true"` の場合、`true` に変換されます。この比較では、大文字小文字を区別しません。それ以外の場合は、`false` に変換されます。
+ 文字型 (CHAR または VARCHAR): 入力の文字列値に変換します。対象のデータ型よりも値が長い場合は切り捨てられるため、エラーストリームに書き込まれるエラーはありません。
+ 日時型 (DATE、TIME または TIMESTAMP): ソース文字列がターゲット値に変換できる形式の場合、値は変換されます。それ以外の場合、変換は失敗し、強制エラーがエラーストリームに書き込まれます。

  有効な日時型形式は以下のとおりです。
  + "1992-02-14"
  + "1992-02-14 18:35:44.0"

### 配列またはオブジェクト
<a name="sch-mapping-datatypes-array"></a>

JSON 入力ストリームの配列またはオブジェクトは、次のように SQL データに変換されます。
+ 文字型 (CHAR または VARCHAR): 配列またはオブジェクトのソーステキストに変換します。「[配列へのアクセス](about-json-path.md#about-json-path-arrays)」を参照してください。
+ それ以外のデータ型: 変換は失敗し、強制エラーがエラーストリームに書き込まれます。

JSON 配列の例については、「[JSONPath の操作](about-json-path.md)」を参照してください。

## 関連トピック
<a name="sch-mapping.Related"></a>
+ [アプリケーション入力の設定](how-it-works-input.md)
+ [データ型](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-data-types.html)
+ [スキーマエディタの使用](console-summary-edit-schema.md)
+ [CreateApplication](API_CreateApplication.md)
+ [RecordColumn](API_RecordColumn.md)
+ [SourceSchema](API_SourceSchema.md)

# ストリーミングデータのスキーマ検出機能の使用
<a name="sch-dis"></a>

**注記**  
2023 年 9 月 12 日以降、Kinesis Data Analytics for SQL をまだ使用していない場合、Kinesis Data Firehose をソースとして使用して新しいアプリケーションを作成することはできません。詳細については、「[制限](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html)」を参照してください。

ストリーミング入力のレコードがアプリケーション内ストリームにどのようにマッピングされるかを説明する入力スキーマを指定すると、面倒でエラーが発生しやすくなります。「[DiscoverInputSchema](API_DiscoverInputSchema.md)」 API (*検出 API* と呼ばれます) を使用してスキーマを推測できます。ストリーミングソースのレコードのランダムなサンプルを使用して、API はスキーマ (列名、データ型、受信データ内のデータ要素の位置) を推測できます。

**注記**  
検出 API を使用して Amazon S3 に格納されたファイルからスキーマを生成する方法については、「[静的データに対するスキーマ検出機能の使用](sch-dis-ref.md)」を参照してください。

コンソールは Discovery API を使用して、指定されたストリーミングソースのスキーマを生成します。コンソールを使用して、列の追加や削除、列名やデータ型の変更など、スキーマを更新することもできます。ただし、無効なスキーマを作成しないように、変更は注意深く実行します。

アプリケーション内ストリームのスキーマをファイナライズした後、文字列値と日時値の操作に使用できる関数があります。結果のアプリケーション内ストリームの行で作業をする場合は、このような関数をアプリケーションコードで使用します。詳細については、「[例: DateTime 値の変換](app-string-datetime-manipulation.md)」を参照してください。

## スキーマ検出時に命名する列
<a name="sch-dis-column-names"></a>

スキーマ検出時、 Amazon Kinesis Data Analytics は、ストリーミング入力ソースのオリジナルの列名をできるだけ保持しようとします。ただし、次の場合を除きます。
+ ソースストリームの列名は、予約された SQL キーワード (例: `TIMESTAMP`、`USER`、`VALUES`、`YEAR`) です。
+ ソースストリーム列名に無効な文字が含まれています。文字、数字、下線文字 (\$1) のみサポートされています。
+ ソースストリーム列名の先頭が数字になっています。
+ ソースストリーム列名の文字数が 100 文字を超えています。

列名を変更した場合、変更後のスキーマ列は `COL_` で始まります。名前全体が無効な文字の場合など、元の列名を保持することができない場合があります。このような場合、列の名前は `COL_#` に変更されます。「\$1」には、列の順序内の場所を示す数値が入ります。

検出が完了すると、コンソールを使用してスキーマを更新し、列の追加または削除、列名、データ型、データサイズの変更を行うことができます。

### 検出が推奨される列名の例
<a name="sch-dis-column-names-examples"></a>


****  

| ソースストリームの列名 | 検出推奨列名 | 
| --- | --- | 
|  USER  |  COL\$1USER  | 
|  USER@DOMAIN  |  COL\$1USERDOMAIN  | 
|  @@  |  COL\$10  | 

## スキーマ検出の問題
<a name="sch-dis-when-fails"></a>

Kinesis Data Analytics が任意のストリーミングソースのスキーマを推測しない場合、どうなるでしょうか。

Kinesis Data Analytics は、CSV や JSON のような、UTF-8 でエンコードされた列形式に対してスキーマを推測します。Kinesis Data Analytics は、(アプリケーションログやカスタムの列および行区切りを持つレコードなどの未加工テキストを含む) UTF-8 でエンコードされたレコードをサポートしています。Kinesis Data Analytics がスキーマを推測しない場合は、コンソールのスキーマエディタ (または API) を使用して、手動でスキーマを定義できます。

 データがパターンに合わない場合は (スキーマエディタを使用して指定できます)、スキーマを VARCHAR(N) 型の単一列として定義できます。N はレコードに含まれる最大文字数です。ここから、文字列および日付時刻操作を使用して、データをアプリケーション内ストリームに入力された後に構築できます。例については「[例: DateTime 値の変換](app-string-datetime-manipulation.md)」を参照してください。

# 静的データに対するスキーマ検出機能の使用
<a name="sch-dis-ref"></a>

**注記**  
2023 年 9 月 12 日以降、Kinesis Data Analytics for SQL をまだ使用していない場合、Kinesis Data Firehose をソースとして使用して新しいアプリケーションを作成することはできません。詳細については、「[制限](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html)」を参照してください。

スキーマ検出機能は、Amazon S3 バケットに格納されている静的ファイルのストリームやデータからスキーマを生成できます。参照する場合や、ストリーミングデータを利用できない場合に、Kinesis Data Analytics アプリケーションのスキーマを生成するとします。ストリーミングデータまたはリファレンスデータで期待される形式のデータのサンプルを含む静的ファイルで、スキーマ検出機能を使用できます。Kinesis Data Analytics は、Amazon S3 バケットに格納されている JSON ファイルまたは CSV ファイルのサンプルデータに対してスキーマ検出を実行できます。データファイルでスキーマ検出を使用するには、コンソールか、[DiscoverInputSchema](API_DiscoverInputSchema.md) パラメータを指定した `S3Configuration` API を使用します。

## コンソールを使用したスキーマ検出を実行する
<a name="sch-dis-ref-console"></a>

コンソールを使用して静的ファイルで検出を実行するには、以下の操作を行います。

1. リファレンスデータオブジェクトを S3 バケットに追加します。

1. Kinesis Data Analytics コンソールで、アプリケーションのメインページの [**リファレンスデータを接続**] を選択します。

1. リファレンスデータを含む Amazon S3 オブジェクトにアクセスするために、バケット、パス、IAM ロールデータを指定します。

1. [**スキーマの検出**] を選択します。

コンソールでリファレンスデータを追加し、スキーマを検出する方法の詳細については、「[例: Kinesis Data Analytics アプリケーションにリファレンスデータを追加する](app-add-reference-data.md)」を参照してください。

## API を使用したスキーマ検出を実行する
<a name="sch-dis-ref-api"></a>

API を使用して静的ファイルで検出を実行するには、API に以下の情報を含む `S3Configuration` 構造を指定します。
+ `BucketARN`: ファイルを含む Amazon S3 バケットの Amazon リソースネーム (ARN)。Amazon S3 バケット ARN の形式については、「[Amazon リソースネーム (ARN) と Amazon サービスの名前空間: Amazon Simple Storage Service (Amazon S3)](https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html#arn-syntax-s3)」を参照してください。
+ `RoleARN`: `AmazonS3ReadOnlyAccess` ポリシーを持つ IAM ロールの ARN。ロールにポリシーを追加する方法については、「[ロールの修正](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_manage_modify.html)」を参照してください。
+ `FileKey`: オブジェクトのファイル名。

**`DiscoverInputSchema` API を使用して Amazon S3 オブジェクトからスキーマを生成するには**

1.  AWS CLI が設定されていることを確認します。詳細については、「はじめに」セクションの「[ステップ 2: AWS Command Line Interface (AWS CLI) を設定する](setup-awscli.md)」を参照してください。

1. 次の内容で、`data.csv` という名前のファイルを作成します。

   ```
   year,month,state,producer_type,energy_source,units,consumption
   2001,1,AK,TotalElectricPowerIndustry,Coal,ShortTons,47615
   2001,1,AK,ElectricGeneratorsElectricUtilities,Coal,ShortTons,16535
   2001,1,AK,CombinedHeatandPowerElectricPower,Coal,ShortTons,22890
   2001,1,AL,TotalElectricPowerIndustry,Coal,ShortTons,3020601
   2001,1,AL,ElectricGeneratorsElectricUtilities,Coal,ShortTons,2987681
   ```

1. Amazon S3 コンソール ([https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)) にサインインします。

1. Amazon S3 バケットを作成し、作成した `data.csv` ファイルをアップロードします。作成されたバケットの ARN に注意してください。Amazon S3 バケットの作成およびファイルのアップロードの詳細については、「[Amazon Simple Storage Service の開始方法](https://docs.aws.amazon.com/AmazonS3/latest/userguide/GetStartedWithS3.html)」を参照してください。

1. IAM コンソール ([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)) を開きます。`AmazonS3ReadOnlyAccess` ポリシーを使用してロールを作成します。新しいロールの ARN に注意してください。ロールの作成の詳細については、「[Amazon　Service にアクセス許可を委任するロールの作成](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-service.html)」を参照してください。ロールにポリシーを追加する方法については、「[ロールの修正](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_manage_modify.html)」を参照してください。

1. で次の`DiscoverInputSchema`コマンドを実行し AWS CLI、Amazon S3 バケットと IAM ロールの ARNs を置き換えます。

   ```
   $aws kinesisanalytics discover-input-schema --s3-configuration '{ "RoleARN": "arn:aws:iam::123456789012:role/service-role/your-IAM-role", "BucketARN": "arn:aws:s3:::your-bucket-name", "FileKey": "data.csv" }' 
   ```

1. 応答は次の例のようになります。

   ```
   {
       "InputSchema": {
           "RecordEncoding": "UTF-8",
           "RecordColumns": [
               {
                   "SqlType": "INTEGER",
                   "Name": "COL_year"
               },
               {
                   "SqlType": "INTEGER",
                   "Name": "COL_month"
               },
               {
                   "SqlType": "VARCHAR(4)",
                   "Name": "state"
               },
               {
                   "SqlType": "VARCHAR(64)",
                   "Name": "producer_type"
               },
               {
                   "SqlType": "VARCHAR(4)",
                   "Name": "energy_source"
               },
               {
                   "SqlType": "VARCHAR(16)",
                   "Name": "units"
               },
               {
                   "SqlType": "INTEGER",
                   "Name": "consumption"
               }
           ],
           "RecordFormat": {
               "RecordFormatType": "CSV",
               "MappingParameters": {
                   "CSVMappingParameters": {
                       "RecordRowDelimiter": "\r\n",
                       "RecordColumnDelimiter": ","
                   }
               }
           }
       },
       "RawInputRecords": [
           "year,month,state,producer_type,energy_source,units,consumption\r\n2001,1,AK,TotalElectricPowerIndustry,Coal,ShortTons,47615\r\n2001,1,AK,ElectricGeneratorsElectricUtilities,Coal,ShortTons,16535\r\n2001,1,AK,CombinedHeatandPowerElectricPower,Coal,ShortTons,22890\r\n2001,1,AL,TotalElectricPowerIndustry,Coal,ShortTons,3020601\r\n2001,1,AL,ElectricGeneratorsElectricUtilities,Coal,ShortTons,2987681"
       ],
       "ParsedInputRecords": [
           [
               null,
               null,
               "state",
               "producer_type",
               "energy_source",
               "units",
               null
           ],
           [
               "2001",
               "1",
               "AK",
               "TotalElectricPowerIndustry",
               "Coal",
               "ShortTons",
               "47615"
           ],
           [
               "2001",
               "1",
               "AK",
               "ElectricGeneratorsElectricUtilities",
               "Coal",
               "ShortTons",
               "16535"
           ],
           [
               "2001",
               "1",
               "AK",
               "CombinedHeatandPowerElectricPower",
               "Coal",
               "ShortTons",
               "22890"
           ],
           [
               "2001",
               "1",
               "AL",
               "TotalElectricPowerIndustry",
               "Coal",
               "ShortTons",
               "3020601"
           ],
           [
               "2001",
               "1",
               "AL",
               "ElectricGeneratorsElectricUtilities",
               "Coal",
               "ShortTons",
               "2987681"
           ]
       ]
   }
   ```

# Lambda 関数を使用したデータの事前処理
<a name="lambda-preprocessing"></a>

**注記**  
2023 年 9 月 12 日以降、Kinesis Data Analytics for SQL をまだ使用していない場合、Kinesis Data Firehose をソースとして使用して新しいアプリケーションを作成することはできません。詳細については、「[制限](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html)」を参照してください。

ストリーム内のデータに形式変換、変換、エンリッチメント、フィルタリングが必要な場合は、 AWS Lambda 関数を使用してデータを前処理できます。アプリケーションの SQL コードが実行される前、またはアプリケーションがデータストリームからスキーマを作成する前に、これを行うことができます。

Lambda 関数によるレコードの事前処理は、次のシナリオで役立ちます。
+ 他のフォーマット (KPL や GZIP など) から Kinesis Data Analytics が分析できる形式にレコードを変換します。Kinesis Data Analytics は、現在 JSON データ形式または CSV データ形式をサポートしています。
+ 集計検出や異常検出などの操作でよりアクセスしやすい形式にデータを拡張します。たとえば、複数のデータ値が文字列にまとめて格納されている場合は、データを別々の列に拡張できます。
+ 外挿やエラー修正などの他の Amazon サービスによるデータの強化。
+ レコードのフィールドに複雑な文字列変換を適用します。
+ データをクリーンアップするためのデータフィルタリング。

## レコードを事前処理するための Lambda 関数の使用
<a name="lambda-preprocessing-use"></a>

Kinesis Data Analytics アプリケーションを作成するときは、[**ソースに接続**] ページで Lambda 事前処理を有効にします。

**Lambda 関数を使用して Kinesis Data Analytics アプリケーションでレコードを事前処理するには**

1. にサインイン AWS マネジメントコンソール し、[https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics) で Managed Service for Apache Flink コンソールを開きます。

1. アプリケーションの [**ソースに接続**] ページの [**レコード事前処理**] セクションで [**有効化 AWS Lambda**] を選択します。

1. 既に作成した Lambda 関数を使用するには、[**Lambda 関数**] ドロップダウンリストで関数を選択します。

1. Lambda 事前処理テンプレートの 1 つから新規の Lambda 関数を作成する場合は、ドロップダウンリストからテンプレートを選択します。次に、[**View <template name> in Lambda (Lambda で <テンプレート名> を表示)**] を選択して関数を編集します。

1. 新しい Lambda 関数を作成するには、[**新規作成**] を選択します。Lambda 関数の作成については、AWS Lambda 開発者ガイドの「[HelloWorld Lambda 関数を作成してコンソールを探る](https://docs.aws.amazon.com/lambda/latest/dg/getting-started-create-function.html)」を参照してください。

1. 使用する Lambda 関数のバージョンを選択します。最新のバージョンを使用するには、[**\$1LATEST**] を選択します。

レコードの事前処理に Lambda 関数を選択または作成すると、アプリケーションの SQL コードがレコードからスキーマを実行したり、アプリケーションがレコードからスキーマを生成したりする前に、レコードが事前処理されます。

## Lambda 事前処理アクセス権限
<a name="lambda-preprocessing-policy"></a>

Lambda 事前処理を使用するには、アプリケーションの IAM ロールに次のアクセス許可ポリシーが必要です。

```
     {
       "Sid": "UseLambdaFunction",
       "Effect": "Allow",
       "Action": [
           "lambda:InvokeFunction",
           "lambda:GetFunctionConfiguration"
       ],
       "Resource": "<FunctionARN>"
   }
```

## Lambda 事前処理メトリクス
<a name="lambda-preprocessing-metrics"></a>

Amazon CloudWatch を使用して、Lambda 呼び出しの数、処理されたバイト数、成功と失敗の数などをモニタリングすることができます。Lambda の事前処理で出力される CloudWatch メトリクスについては、「[Amazon Kinesis Analytics のメトリクス](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html)」を参照してください。

## Kinesis プロデューサーライブラリ AWS Lambda での の使用
<a name="lambda-preprocessing-deaggregation"></a>

[Kinesis Producer Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) (KPL) は、小さなユーザーフォーマットレコードを最大 1 MB のレコードに集約して、Amazon Kinesis Data Streams スループットを有効に利用できます。Kinesis Client Library (KCL) for Java は、これらのレコードの集約解除をサポートしています。ただし、ストリームのコンシューマー AWS Lambda として を使用する場合は、特別なモジュールを使用してレコードを集約解除する必要があります。

必要なプロジェクトコードと手順については、GitHub で [AWS Lambda用の Kinesis プロデューサーライブラリの集約解除モジュール](https://github.com/awslabs/kinesis-deaggregation)について参照してください。このプロジェクトのコンポーネントを使用して、Java、Node.js、Python AWS Lambda で 内の KPL シリアル化されたデータを処理できます。これらのコンポーネントは、[複数言語 KCL アプリケーション](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/package-info.java)の一部として使用することもできます。

## データ事前処理イベント入力データモデル / レコードレスポンスモデル
<a name="lambda-preprocessing-data-model"></a>

レコードを事前処理するには、Lambda 関数が、必要なイベント入力データおよびレコードレスポンスモデルに準拠している必要があります。

### イベント入力データモデル
<a name="lambda-preprocessing-request-model"></a>

Kinesis Data Analytics は、Kinesis データストリームまたは Firehose 配信ストリームから継続的にデータを読み取ります。取得したレコードの各バッチが Lambda 関数にどのように渡されたか、サービスが管理しています。関数はレコードのリストを入力として受け取ります。関数内では、リストを繰り返し処理し、ビジネスロジックを適用して、事前処理要件 (データ形式の変換や強化など) を実行します。

事前処理関数への入力モデルは、データが Kinesis データストリームから受信されたか、Firehose 配信ストリームから受信されたかによってわずかに異なります。

ソースが Firehose 配信ストリームの場合、イベント入力データモデルは次のようになります。

**Kinesis Data Firehose のリクエストデータモデル**


| フィールド | 説明 | 
| --- | --- | 
| フィールド | 説明 | 
| --- | --- | 
| フィールド | 説明 | 
| --- | --- | 
| invocationId | Lambda 呼び出し ID (ランダム GUID)。 | 
| applicationArn | Kinesis Data Analytics アプリケーションの Amazon リソースネーム (ARN) | 
| streamArn | 配信ストリーム ARN | 
| レコード [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | レコード ID (ランダム GUID) | 
| kinesisFirehoseRecordMetadata |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| data | Base64 でエンコードされたソースレコードのペイロード | 
| approximateArrivalTimestamp | 配信ストリームレコードの概算到着時間 | 

次の例は、Firehose 配信ストリームからの入力を示しています。

```
{
   "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2",
   "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test",
   "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test",
   "records":[
      {
         "recordId":"49572672223665514422805246926656954630972486059535892482",
         "data":"aGVsbG8gd29ybGQ=",
         "kinesisFirehoseRecordMetadata":{
            "approximateArrivalTimestamp":1520280173
         }
      }
   ]
}
```

ソースが Kinesis データストリームの場合、イベント入力データモデルは次のとおりです。

**Kinesis ストリームのリクエストデータモデル**


| フィールド | 説明 | 
| --- | --- | 
| フィールド | 説明 | 
| --- | --- | 
| フィールド | 説明 | 
| --- | --- | 
| invocationId | Lambda 呼び出し ID (ランダム GUID)。 | 
| applicationArn | Kinesis Data Analytics アプリケーション ARN | 
| streamArn | 配信ストリーム ARN | 
| レコード [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | Kinesis レコードのシーケンス番号に基づいたレコード ID | 
| kinesisStreamRecordMetadata |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| data | Base64 でエンコードされたソースレコードのペイロード | 
| sequenceNumber | Kinesis ストリームレコードからのシーケンス番号 | 
| partitionKey | Kinesis ストリームレコードからのパーティションキー | 
| shardId | Kinesis ストリームレコードからの ShardId | 
| approximateArrivalTimestamp | 配信ストリームレコードの概算到着時間 | 

次の例は、Kinesis データストリームからの入力を示しています。

```
{
  "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2",
  "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test",
  "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test",
  "records": [
    {
      "recordId": "49572672223665514422805246926656954630972486059535892482",
      "data": "aGVsbG8gd29ybGQ=",
      "kinesisStreamRecordMetadata":{
            "shardId" :"shardId-000000000003",
            "partitionKey":"7400791606",
            "sequenceNumber":"49572672223665514422805246926656954630972486059535892482",
            "approximateArrivalTimestamp":1520280173
         }
    }
  ]
}
```

### レコードレスポンスモデル
<a name="lambda-preprocessing-response-model"></a>

Lambda 関数に送信された Lambda 事前処理関数 (レコード ID 付き) から返されたすべてのレコードは返される必要があります。レコードには次のパラメータが含まれている必要があります。含まれていない場合、Kinesis Data Analytics がレコードを拒否し、データ事前処理を失敗とみなします。レコードのデータペイロード部分は、事前処理要件を達成するために変換できます。

**レスポンスデータモデル**


| フィールド | 説明 | 
| --- | --- | 
| レコード [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| recordId | レコード ID は呼び出し時に Kinesis Data Analytics から Lambda に渡されます。変換されたレコードには、同じレコード ID が含まれる必要があります。元のレコードの ID と変換されたレコードの ID との不一致は、データ事前処理の失敗として扱われます。 | 
| result | レコードのデータ変換のステータス。指定できる値は以下のとおりです。[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/lambda-preprocessing.html)  | 
| data | base64 エンコード後の変換されたデータペイロード。アプリケーションの取り込みデータ形式が JSON である場合、各データペイロードには複数の JSON ドキュメントを含めることができます。または、アプリケーションの取り込みデータ形式が CSV である場合、それぞれに複数の CSV 行を含めることができます (各行には行の区切り文字が入ります)。Kinesis Data Analytics サービスは、同じデータペイロード内の複数の JSON ドキュメントまたは CSV 行のいずれかを使用して、データを正常に解析して処理します。 | 

次の例は、Lambda 関数からの出力を示しています。

```
{
  "records": [
    {
      "recordId": "49572672223665514422805246926656954630972486059535892482",
      "result": "Ok",
      "data": "SEVMTE8gV09STEQ="
    }
  ]
}
```

## 一般的なデータ事前処理の失敗
<a name="lambda-preprocessing-failures"></a>

事前処理が失敗する一般的な理由は次のとおりです。
+ Lambda 関数に送信されるバッチのレコード (レコード ID 付き) の一部が Kinesis Data Analytics サービスに返されていません。
+ レスポンスにレコード ID、ステータス、データペイロードフィールドのいずれかが欠落しています。データペイロードフィールドは、`Dropped` または `ProcessingFailed` レコードの場合はオプションです。
+ Lambda 関数のタイムアウトが、データを事前処理するのに十分ではありません。
+ Lambda 関数のレスポンスが、 AWS Lambda サービスによって定められたレスポンスの上限を超えています。

データの事前処理が失敗した場合、Kinesis Data Analytics は、成功するまで同じレコードセットで Lambda 呼び出しを再試行し続けます。次の CloudWatch メトリクスを監視して、失敗から洞察を得ることができます。
+ Kinesis Data Analytics アプリケーション (`MillisBehindLatest`): アプリケーションの読み取りがストリーミングソースからどれだけ離れているかを示します。
+ Kinesis Data Analytics アプリケーション (`InputPreprocessing`) の CloudWatch メトリクス: 統計の中でも、特に成功と失敗の数を示します。詳細については、「[Amazon Kinesis Analytics Metrics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html)」を参照してください。
+ AWS Lambda 関数 CloudWatch メトリクスとログ。

# 事前処理用の Lambda 関数の作成
<a name="lambda-preprocessing-functions"></a>

Amazon Kinesis Data Analytics アプリケーションは、アプリケーションに取り込まれる際に、Lambda 関数をレコードの事前処理に使用できます。Kinesis Data Analytics では、データの事前処理用の開始点として使用するため、コンソールで以下のテンプレートが用意されています。

**Topics**
+ [Node.js での事前処理 Lambda 関数の作成](#lambda-preprocessing-functions-nodejs)
+ [Python での事前処理 Lambda 関数の作成](#lambda-preprocessing-functions-python)
+ [Java での事前処理 Lambda 関数の作成](#lambda-preprocessing-functions-java)
+ [.NET での事前処理 Lambda 関数の作成](#lambda-preprocessing-functions-net)

## Node.js での事前処理 Lambda 関数の作成
<a name="lambda-preprocessing-functions-nodejs"></a>

事前処理の Lambda 関数を Node.js で作成するための次のテンプレートが Kinesis Data Analytics コンソールで利用できます。


| Lambda の設計図 | 言語とバージョン | 説明 | 
| --- | --- | --- | 
| 一般的な Kinesis Data Analytics の入力処理  | Node.js 6.10 |  JSON レコードまたは CSV レコードを入力として受け取り、処理ステータスで返す Kinesis Data Analytics レコードのプリプロセッサ。このプロセッサをカスタム変換ロジックの開始点として使用します。  | 
| 圧縮入力処理 | Node.js 6.10 | 圧縮された (圧縮 GZIP または Deflate) JSON レコードまたは CSV レコードを入力として受け取り、圧縮解除されたレコードを処理ステータスで返す Kinesis Data Analytics のレコードプロセッサ。 | 

## Python での事前処理 Lambda 関数の作成
<a name="lambda-preprocessing-functions-python"></a>

事前処理の Lambda 関数を Python で作成するための次のテンプレートがコンソールで利用できます。


| Lambda の設計図 | 言語とバージョン | 説明 | 
| --- | --- | --- | 
| 一般的な Kinesis Analytics の入力処理  | Python 2.7 |  JSON レコードまたは CSV レコードを入力として受け取り、処理ステータスで返す Kinesis Data Analytics レコードのプリプロセッサ。このプロセッサをカスタム変換ロジックの開始点として使用します。  | 
| KPL 入力処理 | Python 2.7 | Kinesis Producer Library (KPL) の JSON レコードまたは CSV レコードの集計を入力として受け取り、処理ステータスと一緒に集約解除されたレコードを返す Kinesis Data Analytics のレコードプロセッサ。 | 

## Java での事前処理 Lambda 関数の作成
<a name="lambda-preprocessing-functions-java"></a>

Java でレコードの事前処理用の Lambda 関数を作成するには、[Java イベント](https://github.com/aws/aws-lambda-java-libs/tree/master/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events)クラスを使用します。

次のコードは、Java を使用してレコードを事前処理する Lambda 関数のサンプルを示しています。

```
public class LambdaFunctionHandler implements
        RequestHandler<KinesisAnalyticsStreamsInputPreprocessingEvent, KinesisAnalyticsInputPreprocessingResponse> {

    @Override
    public KinesisAnalyticsInputPreprocessingResponse handleRequest(
            KinesisAnalyticsStreamsInputPreprocessingEvent event, Context context) {
        context.getLogger().log("InvocatonId is : " + event.invocationId);
        context.getLogger().log("StreamArn is : " + event.streamArn);
        context.getLogger().log("ApplicationArn is : " + event.applicationArn);

        List<KinesisAnalyticsInputPreprocessingResponse.Record> records = new ArrayList<KinesisAnalyticsInputPreprocessingResponse.Record>();
        KinesisAnalyticsInputPreprocessingResponse response = new KinesisAnalyticsInputPreprocessingResponse(records);

        event.records.stream().forEach(record -> {
            context.getLogger().log("recordId is : " + record.recordId);
            context.getLogger().log("record aat is :" + record.kinesisStreamRecordMetadata.approximateArrivalTimestamp);
             // Add your record.data pre-processing logic here.                               
            // response.records.add(new Record(record.recordId, KinesisAnalyticsInputPreprocessingResult.Ok, <preprocessedrecordData>));
        });
        return response;
    }

}
```

## .NET での事前処理 Lambda 関数の作成
<a name="lambda-preprocessing-functions-net"></a>

.NET でレコードの事前処理用の Lambda 関数を作成するには、[.NET イベント](https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents)クラスを使用します。

次のコードは、C\$1 を使用してレコードを前処理する Lambda 関数のサンプルを示しています。

```
public class Function
    {
        public KinesisAnalyticsInputPreprocessingResponse FunctionHandler(KinesisAnalyticsStreamsInputPreprocessingEvent evnt, ILambdaContext context)
        {
            context.Logger.LogLine($"InvocationId: {evnt.InvocationId}");
            context.Logger.LogLine($"StreamArn: {evnt.StreamArn}");
            context.Logger.LogLine($"ApplicationArn: {evnt.ApplicationArn}");

            var response = new KinesisAnalyticsInputPreprocessingResponse
            {
                Records = new List<KinesisAnalyticsInputPreprocessingResponse.Record>()
            };

            foreach (var record in evnt.Records)
            {
                context.Logger.LogLine($"\tRecordId: {record.RecordId}");
                context.Logger.LogLine($"\tShardId: {record.RecordMetadata.ShardId}");
                context.Logger.LogLine($"\tPartitionKey: {record.RecordMetadata.PartitionKey}");
                context.Logger.LogLine($"\tRecord ApproximateArrivalTime: {record.RecordMetadata.ApproximateArrivalTimestamp}");
                context.Logger.LogLine($"\tData: {record.DecodeData()}");

                // Add your record preprocessig logic here.

                var preprocessedRecord = new KinesisAnalyticsInputPreprocessingResponse.Record
                {
                    RecordId = record.RecordId,
                    Result = KinesisAnalyticsInputPreprocessingResponse.OK
                };
                preprocessedRecord.EncodeData(record.DecodeData().ToUpperInvariant());
                response.Records.Add(preprocessedRecord);
            }
            return response;
        }
    }
```

事前処理および宛先の Lambda 関数を .NET で作成する場合の詳細については、「[https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents](https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents)」を参照してください。

# スループットの増加に合わせた入力ストリームの並列処理
<a name="input-parallelism"></a>

**注記**  
2023 年 9 月 12 日以降、Kinesis Data Analytics for SQL をまだ使用していない場合、Kinesis Data Firehose をソースとして使用して新しいアプリケーションを作成することはできません。詳細については、「[制限](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html)」を参照してください。

Amazon Kinesis Data Analytics アプリケーションでは、アプリケーション内入力ストリームのスループットを超えるアプリケーションをスケーリングするために、複数のアプリケーション内入力ストリームをサポートできます。アプリケーション内入力ストリームの詳細については、「[Amazon Kinesis Data Analytics for SQL Applications: 仕組み](how-it-works.md)」を参照してください。

ほとんどの場合、Amazon Kinesis Data Analytics では、アプリケーションにフィードされる Kinesis ストリームまたは Firehose ソースストリームの容量を処理できるように、アプリケーションがスケールされます。ただし、ソースストリームのスループットが、単一のアプリケーション内入力ストリームのスループットを超える場合は、アプリケーションで使用されるアプリケーション内入力ストリームの数を明示的に増やすことができます。そのためには、`InputParallelism` パラメータを使用します。

`InputParallelism` パラメータが 1 以上の場合、Amazon Kinesis Data Analytics は、アプリケーション内ストリーム間のソースストリームのパーティションを均等に分割します。たとえば、ソースストリームに 50 シャードあり、`InputParallelism` を `2` に設定した場合、アプリケーション内入力ストリームはそれぞれ、25 のソースストリームのシャードから入力を受け取ります。

アプリケーション内ストリームの数を増やす場合は、アプリケーションから、各ストリームのデータに明示的にアクセスする必要があります。コードで複数のアプリケーション内ストリームにアクセスする方法については、「[Amazon Kinesis Data Analytics アプリケーションでの別のアプリケーション内ストリームへのアクセス](#input-parallelism-code-example)」を参照してください。

Kinesis Data Streams と Firehose ストリームのシャードは、どちらも同様にアプリケーション内ストリーム間で分割されますが、アプリケーションに認識される形式は異なります。
+ Kinesis データストリームのレコードには、`shard_id` フィールドが含まれており、レコードのソースシャードを識別できます。
+ Firehose 配信ストリームのレコードには、レコードのソースシャードまたはパーティションを識別するフィールドは含まれていません。これは、Firehose がこの情報をアプリケーションから抽象化するためです。

## アプリケーション内入力ストリームの数の増加を評価する
<a name="input-parallelism-evaluating"></a>

ほとんどの場合、入力ストリームの複雑性やデータサイズに応じて、1 つのアプリケーション内入力ストリームで、1 つのソースストリームのスループットを処理することができます。アプリケーション内入力ストリームの数を増やす必要の有無を判断するには、Amazon CloudWatch の `InputBytes` および `MillisBehindLatest` メトリクスをモニタリングします。

`InputBytes` メトリクスが 100 MB/秒より大きい場合 (または、このレートより大きくなることが予想される場合)、`MillisBehindLatest` が増えたり、アプリケーションの問題の影響が大きくなったりする可能性があります。この問題に対応するため、アプリケーションに対して次の言語を選択することをお勧めします。
+ アプリケーションのスケーリングニーズが 100 MB/秒を超える場合は、複数のストリームと Kinesis Data Analytics for SQL アプリケーションを使用します。
+ 1 つのストリームとアプリケーションを使用する場合は、[Kinesis Data Analytics for Java Applications](/managed-flink/latest/java/what-is.html) を使用します。

`MillisBehindLatest` メトリクスに次のいずれかの特性がある場合は、アプリケーションの `InputParallelism` 設定を増やす必要があります。
+ `MillisBehindLatest` メトリクスが増加しつつあります。これは、アプリケーションにおいて、ストリーム内の最新データが遅延していることを意味します。
+ `MillisBehindLatest` メトリクスは一貫して 1,000 (1 秒あたり) を超えています。

以下が真の場合は、アプリケーションの `InputParallelism` 設定を増やす必要はありません。
+ `MillisBehindLatest` メトリクスが減少しつつあります。これは、アプリケーションにおいて、ストリーム内の最新データの遅れを取り戻していることを意味します。
+ `MillisBehindLatest` メトリクスは一貫して 1,000 (1 秒あたり) を下回っています。

CloudWatch の使用方法の詳細については、「[CloudWatch ユーザーガイド](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/)」を参照してください。

## 複数のアプリケーション内入力ストリームの実装
<a name="input-parallelism-implementing"></a>

「[CreateApplication](API_CreateApplication.md)」を使用してアプリケーションを作成する際、アプリケーション内入力ストリームの数を設定できます。この数は、「[UpdateApplication](API_UpdateApplication.md)」を使用してアプリケーションを作成した後に設定します。

**注記**  
`InputParallelism` 設定は、Amazon Kinesis Data Analytics API または AWS CLIを使ってのみ設定できます。を使用してこの設定を設定することはできません AWS マネジメントコンソール。のセットアップについては AWS CLI、「」を参照してください[ステップ 2: AWS Command Line Interface (AWS CLI) を設定する](setup-awscli.md)。

### 新しいアプリケーションの入力ストリームカウントの設定
<a name="input-parallelism-implementing-create"></a>

次の例では、API アクション (`CreateApplication`) を使用して、新しいアプリケーションの入力ストリームカウントを 2 に設定する方法について解説します。

`CreateApplication` の詳細については、「[CreateApplication](API_CreateApplication.md)」を参照してください。

```
{
   "ApplicationCode": "<The SQL code the new application will run on the input stream>",
   "ApplicationDescription": "<A friendly description for the new application>",
   "ApplicationName": "<The name for the new application>",
   "Inputs": [ 
    { 
      "InputId": "ID for the new input stream",
      "InputParallelism": { 
        "Count": 2
    }],
   "Outputs": [ ... ],
	}]
}
```

### 既存アプリケーションの入力ストリームカウントの設定
<a name="input-parallelism-implementing-update"></a>

次の例では、API アクション (`UpdateApplication`) を使用して、既存アプリケーションの入力ストリームカウントを 2 に設定する方法について解説します。

`Update_Application` の詳細については、「[UpdateApplication](API_UpdateApplication.md)」を参照してください。

```
{
   "InputUpdates": [ 
      { 
         "InputId": "yourInputId",
         "InputParallelismUpdate": { 
            "CountUpdate": 2
         }
      }
   ],
}
```

## Amazon Kinesis Data Analytics アプリケーションでの別のアプリケーション内ストリームへのアクセス
<a name="input-parallelism-code-example"></a>

複数のアプリケーション内入力ストリームをアプリケーションで使用するには、別のストリームから明示的に選択する必要があります。次のコード例では、入門チュートリアルで作成した複数の入力ストリームをアプリケーションでクエリを行う方法について説明します。

次の例では、`in_application_stream001` という 1 つのアプリケーション内ストリームに結合される前に、まず [COUNT](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-count.html) を使用して各ソースストリームが集約されます。事前にソースストリームを集約すると、結合されたアプリケーション内ストリームで、負荷をかけ過ぎることなく複数のストリームからのトラフィックを処理しやすくなります。

**注記**  
この例を実行して、両方のアプリケーション内入力ストリームから結果を得るには、ソースストリームのシャード数とアプリケーションの `InputParallelism` パラメータを両方とも更新します。

```
CREATE OR REPLACE STREAM in_application_stream_001 (
    ticker VARCHAR(64),
    ticker_count INTEGER
);

CREATE OR REPLACE PUMP pump001 AS 
INSERT INTO in_application_stream_001
SELECT STREAM ticker_symbol, COUNT(ticker_symbol)
FROM source_sql_stream_001
GROUP BY STEP(source_sql_stream_001.rowtime BY INTERVAL '60' SECOND),
    ticker_symbol; 
        
CREATE OR REPLACE PUMP pump002 AS 
INSERT INTO in_application_stream_001
SELECT STREAM ticker_symbol, COUNT(ticker_symbol)
FROM source_sql_stream_002
GROUP BY STEP(source_sql_stream_002.rowtime BY INTERVAL '60' SECOND),
    ticker_symbol;
```

前述のコード例では、以下のような出力を `in_application_stream001` に生成します。

![\[Table showing ROWTIME, TICKER, and TICKER_COUNT columns with sample data entries.\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/images/input-parallelism-results.png)


## 追加の考慮事項
<a name="input-parallelism-considerations"></a>

複数の入力ストリームを使用する場合は、以下の点に注意してください。
+ アプリケーション内入力ストリームの最大数は 64 です。
+ アプリケーション内入力ストリームは、アプリケーションの入力ストリームのシャード間で均等に分散されます。
+ アプリケーション内ストリームの追加により向上するパフォーマンスは、直線的にスケールしません。つまり、アプリケーション内ストリームの数を 2 倍にしても、スループットは 2 倍になりません。一般的な行サイズを使用すると、アプリケーション内ストリームはそれぞれ、1 秒あたり約 5,000～15,000 行のスループットを達成します。アプリケーション内ストリームカウントを 10 に増やすことによって、1 秒あたり 20,000～30,000 行のスループットを達成できます。スループット速度は、入力ストリームのフィールドのカウント、データ型、サイズによって異なります。
+ 一部の集計関数 ([AVG](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-avg.html)) では、別のシャードに分割されている入力ストリームに適用されると、予期しない結果が生成される場合があります。集計ストリームに結合する前に、個々のシャードで集計オペレーションを実行する必要があるため、レコードが多く含まれているストリームに関係なく加重される場合があります。
+ 入力ストリームの数を増やした後にアプリケーションのパフォーマンスが低下し続ける (高い `MillisBehindLatest` メトリクスにより反映される) 場合は、Kinesis 処理ユニット (KPU) の上限に達している可能性があります。詳細については、「[アプリケーションを自動的にスケーリングしてスループットを向上させる](how-it-works-autoscaling.md)」を参照してください。

# アプリケーションコード
<a name="how-it-works-app-code"></a>

アプリケーションコードは、入力を処理し出力を生成する一連の SQL ステートメントです。この SQL ステートメントはアプリケーション内ストリームおよびリファレンステーブルで動作します。詳細については、「[Amazon Kinesis Data Analytics for SQL Applications: 仕組み](how-it-works.md)」を参照してください。

Kinesis Data Analytics でサポートされている SQL 言語要素の詳細については、「[Amazon Kinesis Data Analytics SQL Reference](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/analytics-sql-reference.html)」を参照してください。

リレーショナルデータベースでは、レコードを追加する INSERT ステートメントと、データをクエリする SELECT ステートメントを使用して、テーブルで作業を行います。Amazon Kinesis Data Analytics では、ストリームを操作します。これらのストリームをクエリする SQL ステートメントを作成できます。1 つのアプリケーション内ストリームをクエリした結果は、常に別のアプリケーション内ストリームに送信されます。複雑な分析を実行する場合は、分析の中間結果を保持する複数のアプリケーション内ストリームを作成する場合があります。最終的には、アプリケーション出力を設定して、(1 つまたは複数のアプリケーション内ストリームからの) 最終分析を外部宛先で永続化します。要約すると、アプリケーションコードを作成する一般的なパターンは以下のとおりです。
+ SELECT ステートメントは、常に INSERT ステートメントのコンテキストで使用されます。つまり、行を選択すると、結果を別のアプリケーション内ストリームに挿入します。
+ INSERT ステートメントは、常にポンプのコンテキストで使用されます。つまり、ポンプを使用してアプリケーション内ストリームに書き込みます。



次のアプリケーションコードの例では、あるアプリケーション内 (SOURCE\$1SQL\$1STREAM\$1001) ストリームからレコードを読み取り、別のアプリケーション内ストリーム (DESTINATION\$1SQL\$1STREAM) に書き込みます。次のように、ポンプを使用してレコードをアプリケーション内ストリームに挿入できます。

```
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ticker_symbol VARCHAR(4), 
                                                   change DOUBLE, 
                                                   price DOUBLE);
-- Create a pump and insert into output stream.
CREATE OR REPLACE PUMP "STREAM_PUMP" AS 

  INSERT INTO "DESTINATION_SQL_STREAM" 
    SELECT STREAM ticker_symbol, change,price
    FROM   "SOURCE_SQL_STREAM_001";
```

**注記**  
ストリーム名および列名に指定する識別子は標準 SQL の命名規則に従います。たとえば、識別子を引用符で囲むと、識別子で大文字と小文字が区別されるようになります。囲まない場合は、識別子はデフォルトで大文字になります。識別子の詳細については、「Amazon Managed Service for Apache Flink SQL リファレンス」の「[Identifiers](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-identifiers.html)」を参照してください。

アプリケーションコードは、多数の SQL ステートメントで構成できます。以下に例を示します。
+ 1 つの SQL ステートメントの結果を次の SQL ステートメントにフィードするシーケンシャルな SQL クエリを作成できます。
+ また、相互に独立して実行される SQL クエリを作成することもできます。たとえば、同じアプリケーション内ストリームをクエリするが、異なるアプリケーション内ストリームに出力を送信する、2 つの SQL ステートメントを作成できます。その後、新しく作成されたアプリケーション内ストリームを個別にクエリできます。

中間結果を保存するアプリケーション内ストリームを作成できます。ポンプを使用してアプリケーション内ストリームにデータを挿入します。詳細については、「[アプリケーション内ストリームとポンプ](streams-pumps.md)」を参照してください。

アプリケーション内リファレンステーブルを追加する場合は、アプリケーション内ストリームとリファレンステーブルのデータを結合する SQL を作成できます。詳細については、「[例: Kinesis Data Analytics アプリケーションにリファレンスデータを追加する](app-add-reference-data.md)」を参照してください。

アプリケーションの出力設定に従って、Amazon Kinesis Data Analytics はアプリケーションの出力設定に従って特定のアプリケーション内ストリームからのデータを外部宛先に書き込みます。アプリケーションコードが、出力設定で指定されたアプリケーション内ストリームに書き込むことを確認してください。

詳細については、以下の各トピックを参照してください。
+  [ストリーミング SQL の概念](streaming-sql-concepts.md)
+ [Amazon Kinesis Data Analytics SQL Reference](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/analytics-sql-reference.html)

# アプリケーション出力の設定
<a name="how-it-works-output"></a>



アプリケーションコードでは、SQL ステートメントの出力を 1 つ以上のアプリケーション内ストリームに書き込みます。必要に応じて、出力設定をアプリケーションに追加できます。 は、アプリケーション内ストリームに書き込まれたすべてを Amazon Kinesis データストリーム、Firehose 配信ストリーム、 AWS Lambda 関数などの外部宛先に保持します。

アプリケーション出力の永続化に使用できる外部宛先の数には制限があります。詳細については、「[制限](limits.md)」を参照してください。

**注記**  
エラーを精査するためにアプリケーション内エラーストリームのデータを永続化する外部宛先は、1 つにすることをお勧めします。



これらの出力設定ごとに、以下を指定します。
+ **アプリケーション内ストリーム名** – 外部宛先で永続化するストリームです。

  Kinesis Data Analytics は、出力設定で指定されたアプリケーション内ストリームを検索します。(ストリーム名では大文字と小文字が区別され、正確に一致する必要があります）。アプリケーションコードでこのアプリケーション内ストリームが作成されていることを確認します。
+ **外部宛先** – データを Kinesis データストリーム、Firehose データ配信ストリーム、または Lambda 関数に永続化できます。ストリームまたは関数の Amazon リソースネーム (ARN) を指定します。また、Amazon Kinesis Analytics がユーザーに代わってストリームまたは Lambda 関数に書き込むために引き受けることができる IAM ロールも指定します。外部宛先に書き込むときに Kinesis Data Analytics が使用するレコード形式 (JSON, CSV) も記述します。

Kinesis Data Analytics でストリーミングまたは Lambda 宛先に書き込むことができない場合、サービスは無限に試行を続けます。これはバックプレッシャーを生み出し、アプリケーションに遅延が生じます。この問題が解決しない場合、アプリケーションは最終的に新しいデータの処理を停止します。[Kinesis Data Analytics Metrics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html) をモニタリングし、障害のアラームを設定できます。メトリクスとアラームの詳細については、[Amazon CloudWatch メトリクスを使用する](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html)と[Amazon CloudWatch アラームを作成する](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html)を参照してください。

 AWS マネジメントコンソールを使用してアプリケーション出力を設定できます。コンソールは API コールを実行して設定を保存します。

## を使用した出力の作成 AWS CLI
<a name="how-it-works-output-cli"></a>

このセクションでは、`CreateApplication` または `AddApplicationOutput` オペレーションのリクエストボディの `Outputs` セクションを作成する方法について説明します。

### Kinesis ストリーム出力を作成する
<a name="how-it-works-output-cli-streams"></a>

次の JSON フラグメントは、Amazon Kinesis データストリームの宛先を作成する `CreateApplication` リクエストボディの `Outputs` セクションを示しています。

```
"Outputs": [
   {
       "DestinationSchema": {
           "RecordFormatType": "string"
       },
       "KinesisStreamsOutput": {
           "ResourceARN": "string",
           "RoleARN": "string"
       },
       "Name": "string"
   }
 
]
```

### Firehose 配信ストリーム出力を作成する
<a name="how-it-works-output-cli-firehose"></a>

次の JSON フラグメントは、Amazon Data Firehose 配信ストリームの宛先を作成する `CreateApplication` リクエスト本文の `Outputs` セクションを示しています。

```
"Outputs": [
   {
       "DestinationSchema": {
           "RecordFormatType": "string"
       },
       "KinesisFirehoseOutput": {
           "ResourceARN": "string",
           "RoleARN": "string"
       },
       "Name": "string"
   }
]
```

### Lambda 関数出力を作成する
<a name="how-it-works-output-cli-lambda"></a>

次の JSON フラグメントは、 AWS Lambda 関数の送信先を作成するための`CreateApplication`リクエスト本文の `Outputs`セクションを示しています。

```
"Outputs": [
   {
       "DestinationSchema": {
           "RecordFormatType": "string"
       },
       "LambdaOutput": {
           "ResourceARN": "string",
           "RoleARN": "string"
       },
       "Name": "string"
   }
]
```

# 出力としての Lambda 関数の使用
<a name="how-it-works-output-lambda"></a>

を送信先 AWS Lambda として使用すると、最終送信先に送信する前に SQL 結果の後処理をより簡単に実行できます。一般的な後処理タスクには次のものがあります。
+ 複数の行を 1 つのレコードに集約する
+ 現在の結果と過去の結果を組み合わせて、遅れて届くデータに対処する
+ 情報のタイプに基づいて異なる送信先に配信する
+ レコード形式の変換 (Protobuf への変換など)
+ 文字列操作または変換
+ 分析処理後のデータの強化
+ 地理空間ユースケースのカスタム処理
+ データ暗号化

Lambda 関数は、次のようなさまざまな AWS サービスやその他の送信先に分析情報を配信できます。
+ [Amazon Simple Storage Service (Amazon S3)](https://docs.aws.amazon.com/AmazonS3/latest/userguide/)
+ カスタム API
+ [Amazon DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/)
+ [Amazon Aurora](http://aurora.apache.org/)
+ [Amazon Redshift](https://docs.aws.amazon.com/redshift/latest/dg/)
+ [Amazon Simple Notification Service (Amazon SNS)](https://docs.aws.amazon.com/sns/latest/dg/)
+ [Amazon Simple Queue Service](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/) (Amazon SQS)
+ [Amazon CloudWatch](https://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/)

Lambda アプリケーションの作成の詳細については、「[AWS Lambdaのご利用開始にあたって](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html)」を参照してください。

**Topics**
+ [出力許可としての Lambda](#how-it-works-output-lambda-perms)
+ [出力メトリクスとしての Lambda](#how-it-works-output-lambda-metrics)
+ [出力イベント入力データモデルおよびレコードレスポンスモデルとしての Lambda](#how-it-works-output-lambda-model)
+ [Lambda 出力呼び出しの頻度](#how-it-works-output-lambda-frequency)
+ [出力として使用するための Lambda 関数の追加](#how-it-works-output-lambda-procedure)
+ [出力エラーとしてよく見られる Lambda](#how-it-works-output-lambda-troubleshooting)
+ [アプリケーションの送信先の Lambda 関数の作成](how-it-works-output-lambda-functions.md)

## 出力許可としての Lambda
<a name="how-it-works-output-lambda-perms"></a>

出力として Lambda を使用するには、アプリケーションの Lambda 出力 IAM ロールに次のアクセス許可ポリシーが必要です。

```
{
   "Sid": "UseLambdaFunction",
   "Effect": "Allow",
   "Action": [
       "lambda:InvokeFunction",
       "lambda:GetFunctionConfiguration"
   ],
   "Resource": "FunctionARN"
}
```

## 出力メトリクスとしての Lambda
<a name="how-it-works-output-lambda-metrics"></a>

Amazon CloudWatch を使用して、送信バイト数、成功および失敗などをモニタリングします。出力として Lambda を使用する Kinesis Data Analytics によって出力される CloudWatch メトリクスについては、「[Amazon Kinesis Analytics Metrics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html)」を参照してください。

## 出力イベント入力データモデルおよびレコードレスポンスモデルとしての Lambda
<a name="how-it-works-output-lambda-model"></a>

Kinesis Data Analytics 出力レコードを送信する場合、Lambda 関数は、必要なイベント入力データおよびレコードレスポンスモデルに準拠している必要があります。

### イベント入力データモデル
<a name="how-it-works-output-lambda-model-request"></a>

Kinesis Data Analytics は、次のリクエストモデルの出力関数として、アプリケーションから Lambda に出力レコードを継続的に送信します。関数内では、リストを繰り返し処理し、ビジネスロジックを適用して、出力要件 (最終的な送信先に送信する前のデータ変換など) を実行します。


| フィールド | 説明 | 
| --- | --- | 
| フィールド | 説明 | 
| --- | --- | 
| フィールド | 説明 | 
| --- | --- | 
| invocationId | Lambda 呼び出し ID (ランダム GUID)。 | 
| applicationArn | Kinesis Data Analytics アプリケーションの Amazon リソースネーム (ARN)。 | 
| レコード [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/how-it-works-output-lambda.html)  | 
| recordId | レコード ID (ランダム GUID) | 
| lambdaDeliveryRecordMetadata |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/how-it-works-output-lambda.html)  | 
| data | Base64 でエンコードされた出力レコードのペイロード | 
| retryHint | 配信再試行回数 | 

**注記**  
`retryHint` は配信失敗ごとに増加する値です。この値は永続的に保持されず、アプリケーションが中断された場合にリセットされます。

### レコードレスポンスモデル
<a name="how-it-works-output-lambda-model-response"></a>

出力関数として `Ok` に (レコード ID と共に) 送信される各レコードは、 または `DeliveryFailed` のどちらかで確認される必要があり、次のパラメータを含める必要があります。それ以外の場合、Kinesis Data Analytics はそれらを配信失敗として扱います。


| フィールド | 説明 | 
| --- | --- | 
| レコード [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/how-it-works-output-lambda.html)  | 
| recordId | レコード ID は呼び出し時に Kinesis Data Analytics から Lambda に渡されます。元のレコードの ID と確認されたレコードの ID との不一致は、配信失敗として扱われます。 | 
| result | レコード配信のステータス。以下の値を指定できます。[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/kinesisanalytics/latest/dev/how-it-works-output-lambda.html)  | 

## Lambda 出力呼び出しの頻度
<a name="how-it-works-output-lambda-frequency"></a>

Kinesis Data Analytics アプリケーションは、出力レコードをバッファして AWS Lambda 宛先関数を頻繁に呼び出します。
+ タンブリングウィンドウとしてデータ分析アプリケーション内の送信先アプリケーション内ストリームにレコードが出力された場合、送信 AWS Lambda 先関数はタンブリングウィンドウトリガーごとに呼び出されます。例えば、タンブリングウィンドウを 60 秒に設定してレコードを宛先のアプリケーション内ストリームに出力すると、Lambda 関数は、60 秒ごとに 1 回呼び出されます。
+ アプリケーション内で連続するクエリまたはスライディングウィンドウとしてレコードがアプリケーション内ストリームに出力される場合、Lambda 宛先関数は約 1 秒に 1 回呼び出されます。

**注記**  
[Lambda 関数あたりの呼び出しリクエストのペイロードサイズの制限](https://docs.aws.amazon.com/lambda/latest/dg/limits.html)が適用されます。これらの制限を超えると、出力レコードが分割され、複数の Lambda 関数呼び出しに分けて送信されます。

## 出力として使用するための Lambda 関数の追加
<a name="how-it-works-output-lambda-procedure"></a>

次の手順では、Kinesis Data Analytics アプリケーションの出力として Lambda 関数を追加する方法を示しています。

1. にサインイン AWS マネジメントコンソール し、[https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics) で Managed Service for Apache Flink コンソールを開きます。

1. リストからアプリケーションを選択し、[**Application details**] を選択します。

1. [**宛先**] セクションで、[**Connect new destination**] を選択します。

1. [**宛先**] 項目に、[**AWS Lambda 関数**] を選択します。

1. [** AWS Lambdaにレコードを配信**] セクションで、既存の Lambda 関数とバージョンを選択するか、[**新規作成**] を選択します。

1. 新しい Lambda 関数を作成する場合は、次の操作を行います。

   1. 提供されているいずれかのテンプレートのいずれかを選択します。詳細については、[アプリケーションの送信先の Lambda 関数の作成](how-it-works-output-lambda-functions.md)。

   1. [**関数の作成**] ページが新しいブラウザタブで開きます。[**Name (名前)**] ボックスで、関数にわかりやすい名前を付けます (例: **myLambdaFunction**)。

   1. アプリケーションの後処理機能のテンプレートを更新します。Lambda 関数作成の詳細については、AWS Lambda 開発者ガイドの[入門ガイド](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html)を参照してください。

   1. Kinesis Data Analytics コンソールの [**Lambda 関数**] リストで、先ほど作成した Lambda 関数を選択します。Lambda 関数のバージョンは **[\$1最新]** を選択します。

1. [**In-application stream**] セクションで、[**Choose an existing in-application stream**] を選択します。[**In-application stream name**] に、アプリケーションの出力ストリームを選択します。選択した出力ストリームからの結果は、Lambda 出力関数に送信されます。

1. 残りのフォームはデフォルト値のままにして、[**Save and continue**] を選択します。

アプリケーションはアプリケーション内ストリームから Lambda 関数にレコードを送信するようになりました。Amazon CloudWatch コンソールのデフォルトテンプレートの結果を確認できます。`AWS/KinesisAnalytics/LambdaDelivery.OkRecords` メトリクスをモニタリングして、Lambda 関数に配信されるレコードの数を確認します。

## 出力エラーとしてよく見られる Lambda
<a name="how-it-works-output-lambda-troubleshooting"></a>

以下は、Lambda 関数への配信が失敗する可能性のある一般的な理由です。
+ Lambda 関数に送信されるバッチのレコード (レコード ID 付き) の一部が Kinesis Data Analytics サービスに返されていません。
+ レスポンスにレコード ID、またはステータスフィールドのいずれかが欠落しています。
+ Lambda 関数のタイムアウトが Lambda 関数内のビジネスロジックを達成するのに十分ではありません。
+ Lambda 関数内のビジネスロジックは、すべてのエラーをキャッチしないため、処理されない例外のためにタイムアウトとバックプレッシャーが生じます。これらのメッセージは、「ポイズンピル」と呼ばれることが少なくありません。

データ配信が失敗した場合、Kinesis Data Analytics は、成功するまで同じレコードセットで Lambda 呼び出しを再試行し続けます。次の CloudWatch メトリクスを監視して、失敗から情報を得ることができます。
+ Kinesis Data Analytics アプリケーションの Output CloudWatch メトリクスとしてのLambda : 統計の中でも、特に成功と失敗の数を示します。詳細については、「[Amazon Kinesis Analytics Metrics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/aka-metricscollected.html)」を参照してください。
+ AWS Lambda 関数 CloudWatch メトリクスとログ。

# アプリケーションの送信先の Lambda 関数の作成
<a name="how-it-works-output-lambda-functions"></a>

Kinesis Data Analytics アプリケーションは、 AWS Lambda 関数を出力として使用できます。Kinesis Data Analytics はアプリケーションの送信先として使用する Lambda 関数を作成するテンプレートを提供します。これらのテンプレートは、アプリケーションからの後処理出力の開始点として使用します。

**Topics**
+ [Node.js での Lambda 関数の送信先の作成](#how-it-works-lambda-dest-nodejs)
+ [Python での Lambda 関数の送信先の作成](#how-it-works-lambda-dest-python)
+ [Java での Lambda 関数の送信先の作成](#how-it-works-lambda-dest-java)
+ [.NET での Lambda 関数の送信先の作成](#how-it-works-lambda-net)

## Node.js での Lambda 関数の送信先の作成
<a name="how-it-works-lambda-dest-nodejs"></a>

Lambda 関数の宛先を Node.js で作成するための次のテンプレートがコンソールで利用できます。


| 出力ブループリントとしての Lambda | 言語とバージョン | 説明 | 
| --- | --- | --- | 
| kinesis-analytics-output | Node.js 12.x | Kinesis Data Analytics アプリケーションからカスタム送信先に出力レコードを配信します。 | 

## Python での Lambda 関数の送信先の作成
<a name="how-it-works-lambda-dest-python"></a>

Lambda 関数の宛先を Python で作成するための次のテンプレートがコンソールで利用できます。


| 出力ブループリントとしての Lambda | 言語とバージョン | 説明 | 
| --- | --- | --- | 
| kinesis-analytics-output-sns | Python 2.7 | Kinesis Data Analytics アプリケーションから Amazon SNS に出力レコードを配信します。 | 
| kinesis-analytics-output-ddb | Python 2.7 | Kinesis Data Analytics アプリケーションから Amazon DynamoDB に出力レコードを配信します。 | 

## Java での Lambda 関数の送信先の作成
<a name="how-it-works-lambda-dest-java"></a>

Java で Lambda 関数の送信先を作成するには、[Java イベント](https://github.com/aws/aws-lambda-java-libs/tree/master/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events)クラスを使用します。

次のコードは、Java を使用したサンプルの送信先 Lambda 関数を示しています。

```
public class LambdaFunctionHandler
        implements RequestHandler<KinesisAnalyticsOutputDeliveryEvent, KinesisAnalyticsOutputDeliveryResponse> {

    @Override
    public KinesisAnalyticsOutputDeliveryResponse handleRequest(KinesisAnalyticsOutputDeliveryEvent event,
            Context context) {
        context.getLogger().log("InvocatonId is : " + event.invocationId);
        context.getLogger().log("ApplicationArn is : " + event.applicationArn);

        List<KinesisAnalyticsOutputDeliveryResponse.Record> records = new ArrayList<KinesisAnalyticsOutputDeliveryResponse.Record>();
        KinesisAnalyticsOutputDeliveryResponse response = new KinesisAnalyticsOutputDeliveryResponse(records);

        event.records.stream().forEach(record -> {
            context.getLogger().log("recordId is : " + record.recordId);
            context.getLogger().log("record retryHint is :" + record.lambdaDeliveryRecordMetadata.retryHint);
            // Add logic here to transform and send the record to final destination of your choice.
            response.records.add(new Record(record.recordId, KinesisAnalyticsOutputDeliveryResponse.Result.Ok));
        });
        return response;
    }

}
```

## .NET での Lambda 関数の送信先の作成
<a name="how-it-works-lambda-net"></a>

.NET で Lambda 関数の送信先を作成するには、[.NET イベント](https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents)クラスを使用します。

次のコードは、C\$1 を使用したサンプルの送信先 Lambda 関数を示しています。

```
public class Function
    {
        public KinesisAnalyticsOutputDeliveryResponse FunctionHandler(KinesisAnalyticsOutputDeliveryEvent evnt, ILambdaContext context)
        {
            context.Logger.LogLine($"InvocationId: {evnt.InvocationId}");
            context.Logger.LogLine($"ApplicationArn: {evnt.ApplicationArn}");

            var response = new KinesisAnalyticsOutputDeliveryResponse
            {
                Records = new List<KinesisAnalyticsOutputDeliveryResponse.Record>()
            };

            foreach (var record in evnt.Records)
            {
                context.Logger.LogLine($"\tRecordId: {record.RecordId}");
                context.Logger.LogLine($"\tRetryHint: {record.RecordMetadata.RetryHint}");
                context.Logger.LogLine($"\tData: {record.DecodeData()}");

                // Add logic here to send to the record to final destination of your choice.

                var deliveredRecord = new KinesisAnalyticsOutputDeliveryResponse.Record
                {
                    RecordId = record.RecordId,
                    Result = KinesisAnalyticsOutputDeliveryResponse.OK
                };
                response.Records.Add(deliveredRecord);
            }
            return response;
        }
    }
```

事前処理および宛先の Lambda 関数を .NET で作成する場合の詳細については、「[https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents](https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents)」を参照してください。

# アプリケーション出力を外部宛先で永続化する配信モデル
<a name="failover-checkpoint"></a>

Amazon Kinesis Data Analytics は、設定された宛先へのアプリケーション出力に「1 回以上」の配信モデルを使用します。アプリケーションの実行時に、Kinesis Data Analytics は内部チェックポイントを取ります。このチェックポイントは、出力レコードが宛先に配信されデータ損失がない場合のポイントを時間で示すものです。サービスでは必要に応じてチェックポイントを使用し、アプリケーション出力が少なくとも 1 回、設定された宛先に配信されたことを確認します。

通常の状況では、アプリケーションは受信データを継続的に処理します。Kinesis Data Analytics は、Kinesis データストリームや Firehose 配信ストリームなど、設定された宛先に出力を書き込みます。ただし、次の例に示すように、アプリケーションはときどき中断される可能性があります。
+ アプリケーションを停止して、後で再起動する場合。
+ 設定された宛先に Kinesis Data Analytics がアプリケーション出力を書き込むために必要な IAM ロールを削除した場合。IAM ロールがない場合、Kinesis Data Analytics にはユーザーの代わりに外部宛先に書き込むアクセス権限がありません。
+ ネットワークの停止またはその他の内部サービスの障害により、一時的にアプリケーションが実行を停止した場合。

アプリケーションが再起動すると、Kinesis Data Analytics は、障害の発生時またはその前の時点からの出力の処理および書き込みを続けます。これにより、設定された宛先にアプリケーション出力が確実に配信されます。

同じアプリケーション内ストリームから複数の宛先を設定したとします。アプリケーションが障害から復旧したら、Kinesis Data Analytics は、設定された宛先への永続的出力を、最も低速な宛先に配信された最後のレコードから再開します。これにより、同じ出力レコードが別の宛先に複数回配信される場合があります。この場合は、外部で、宛先での重複を処理する必要があります。

# エラー処理
<a name="error-handling"></a>

Amazon Kinesis Data Analytics は、API や SQL のエラーを直接ユーザーに返します。API オペレーションの詳細については、「[アクション](API_Operations.md)」を参照してください。SQL エラーの処理の詳細については、「[Amazon Kinesis Data Analytics SQL Reference](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/analytics-sql-reference.html)」を参照してください。

Amazon Kinesis Data Analytics は、`error_stream` というアプリケーション内エラーストリームを使用して、ランタイムエラーをレポートします。



## アプリケーション内エラーストリームを使用してエラーをレポートする
<a name="error-handling-errorstream"></a>

Amazon Kinesis Data Analytics は、`error_stream` というアプリケーション内エラーストリームを使用して、ランタイムエラーをレポートします。発生する可能性のあるエラーの例を以下に示します。
+ ストリーミングソースから読み取られたレコードが入力スキーマに適合していない。
+ アプリケーションコードがゼロでの除算を指定している。
+ 行が入れ替わっている (たとえば、ユーザーによって `ROWTIME` 値が変更されたレコードがストリームに現れると、レコードの順序が乱れます)。
+ ソースストリームのデータをスキーマで指定されたデータ型に変換することはできません (強制エラー)。変換できるデータ型の詳細については、「[JSON データ型から SQL データ型へのマッピング](sch-mapping.md#sch-mapping-datatypes)」を参照してください。

これらのエラーは、SQL コードでプログラム的に処理するか、外部宛先へのエラーストリームにデータを保持することをお勧めします。これには、アプリケーションに出力設定を追加する必要があります (「[アプリケーション出力の設定](how-it-works-output.md)」を参照)。アプリケーション内エラーストリームの動作の例については、「[例: アプリケーション内エラーストリームの確認](app-explore-error-stream.md)」を参照してください。

**注記**  
エラーストリームはシステムアカウントを使用して作成されるため、Kinesis Data Analytics アプリケーションはプログラムでエラーストリームにアクセスすることや、エラーストリームを変更することはできません。エラー出力を使用して、アプリケーションで発生する可能性のあるエラーを確認する必要があります。次に、アプリケーションの SQL コードを記述して、予期されるエラー条件を処理します。

### エラーストリームのスキーマ
<a name="error-handling-errorstream-schema"></a>

エラーストリームには、次のスキーマがあります。


****  

|  |  |  | 
| --- |--- |--- |
| *フィールド* | *データ型* | *Notes* (メモ) | 
| ERROR\$1TIME | TIMESTAMP | エラーが発生した時刻。 | 
| ERROR\$1LEVEL | VARCHAR(10) |  | 
| ERROR\$1NAME | VARCHAR(32) |  | 
| MESSAGE | VARCHAR(4096) |  | 
| DATA\$1ROWTIME | TIMESTAMP | 受信レコードの ROWTIME。 | 
| DATA\$1ROW | VARCHAR(49152) |  元の行の 16 進エンコードデータ。標準ライブラリを使用してこの値を 16 進数でデコードするか、この [Hex to String Converter](http://string-functions.com/hex-string.aspx) などのウェブリソースを使用できます。 | 
| PUMP\$1NAME | VARCHAR(128) |  `CREATE PUMP` で定義されている送信ポンプ。 | 

# アプリケーションを自動的にスケーリングしてスループットを向上させる
<a name="how-it-works-autoscaling"></a>

Amazon Kinesis Data Analytics は、ほとんどのシナリオでソースストリームのデータスループットとクエリの複雑さに対応するようにアプリケーションを柔軟に拡張します。Kinesis Data Analytics は、Kinesis 処理ユニット (KPU) の形式で容量をプロビジョニングします。単一の KPU には、メモリ (4 GB) とそれに対応するコンピューティングとネットワークがあります。

アプリケーションの KPU のデフォルト制限は 64 です。この制限の拡大をリクエストする方法については、「[Amazon サービスの制限](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html)」にある「**制限の拡大をリクエストするには**」を参照してください。

# タグ付けの使用
<a name="how-tagging"></a>

このセクションでは、Kinesis Data Analytics アプリケーションに key-value メタデータタグを追加する方法について説明します。これらのタグは以下の目的に使用できます。
+ 個々の Kinesis Data Analytics アプリケーションに対する請求を決定する。詳細については、「Billing and Cost Management ユーザーガイド」の「[AWS コスト配分タグの使用](https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/cost-alloc-tags.html)」を参照してください。
+ タグに基づいてアプリケーションリソースへのアクセスをコントロールする。詳細については、[ ユーザーガイド](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_tags.html)の*タグを使用したアクセス制御*を参照してください。
+ ユーザー定義の目的で。ユーザータグに基づいてアプリケーションの機能を定義できます。

タグ付けに関する以下の情報に注意してください。
+ アプリケーションタグの最大数にはシステムタグが含まれます。ユーザー定義のアプリケーションタグの最大数は 50 です。
+ アクションに含まれているタグリストで `Key` 値が重複している場合、サービスは `InvalidArgumentException` をスローします。

**Topics**
+ [アプリケーション作成時のタグの追加](#how-tagging-create)
+ [既存のアプリケーションに対するタグの追加または更新](#how-tagging-add)
+ [アプリケーションのタグの一覧表示](#how-tagging-list)
+ [アプリケーションからのタグの削除](#how-tagging-remove)

## アプリケーション作成時のタグの追加
<a name="how-tagging-create"></a>

タグの追加は、[CreateApplication](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/API_CreateApplication.html) アクションの `tags` パラメータを使ってアプリケーションを作成する際に行います。

以下のリクエスト例では、`CreateApplication` リクエストの `Tags` ノードを示しています。

```
"Tags": [ 
    { 
        "Key": "Key1",
        "Value": "Value1"
    },
    { 
        "Key": "Key2",
        "Value": "Value2"
    }
]
```

## 既存のアプリケーションに対するタグの追加または更新
<a name="how-tagging-add"></a>

[TagResource](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/API_TagResource.html) アクションを使用して、アプリケーションにタグを追加します。[UpdateApplication](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/API_UpdateApplication.html) アクションを使用して、アプリケーションにタグを追加することはできません。

既存のタグを更新するには、既存のタグのものと同じキーを含むタグを追加します。

`TagResource` アクションの以下のリクエスト例では、新しいタグを追加するか、既存のタグを更新します。

```
{
   "ResourceARN": "string",
   "Tags": [ 
      { 
         "Key": "NewTagKey",
         "Value": "NewTagValue"
      },
      { 
         "Key": "ExistingKeyOfTagToUpdate",
         "Value": "NewValueForExistingTag"
      }
   ]
}
```

## アプリケーションのタグの一覧表示
<a name="how-tagging-list"></a>

既存のタグを一覧表示するには、[ListTagsForResource](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/API_ListTagsForResource.html) アクションを使用します。

`ListTagsForResource` アクションの以下のリクエスト例では、アプリケーションのタグを一覧表示します。

```
{
   "ResourceARN": "arn:aws:kinesisanalytics:us-west-2:012345678901:application/MyApplication"
}
```

## アプリケーションからのタグの削除
<a name="how-tagging-remove"></a>

アプリケーションからタグを削除するには、[UntagResource](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/API_UntagResource.html) アクションを使用します。

`UntagResource` アクションの以下のリクエスト例では、アプリケーションからタグを削除します。

```
{
   "ResourceARN": "arn:aws:kinesisanalytics:us-west-2:012345678901:application/MyApplication",
   "TagKeys": [ "KeyOfFirstTagToRemove", "KeyOfSecondTagToRemove" ]
}
```