

慎重に検討した結果、Amazon Kinesis Data Analytics for SQL アプリケーションを中止することにしました。

1. **2025 年 9 月 1** 日以降、Amazon Kinesis Data Analytics for SQL アプリケーションのバグ修正は提供されません。これは、今後の廃止によりサポートが制限されるためです。

2. **2025 年 10 月 15** 日以降、新しい Kinesis Data Analytics for SQL アプリケーションを作成することはできません。

3. **2026 年 1 月 27 日**以降、アプリケーションは削除されます。Amazon Kinesis Data Analytics for SQL アプリケーションを起動することも操作することもできなくなります。これ以降、Amazon Kinesis Data Analytics for SQL のサポートは終了します。詳細については、「[Amazon Kinesis Data Analytics for SQL アプリケーションのサポート終了](discontinuation.md)」を参照してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# アプリケーション入力の設定
<a name="how-it-works-input"></a>

Amazon Kinesis Data Analytics アプリケーションでは、1 つのストリーミングソースから入力を受け取ることができます。また、オプションで 1 つのリファレンスデータソースを使用できます。詳細については、「[Amazon Kinesis Data Analytics for SQL Applications: 仕組み](how-it-works.md)」を参照してください。このトピックのセクションでは、アプリケーション入力ソースについて説明します。

**Topics**
+ [ストリーミングソースの設定](#source-streaming)
+ [リファレンスソースの設定](#source-reference)
+ [JSONPath の操作](about-json-path.md)
+ [SQL 入力列へのストリーミングソース要素のマッピング](sch-mapping.md)
+ [ストリーミングデータのスキーマ検出機能の使用](sch-dis.md)
+ [静的データに対するスキーマ検出機能の使用](sch-dis-ref.md)
+ [Lambda 関数を使用したデータの事前処理](lambda-preprocessing.md)
+ [スループットの増加に合わせた入力ストリームの並列処理](input-parallelism.md)

## ストリーミングソースの設定
<a name="source-streaming"></a>

アプリケーションを作成するときに、ストリーミングソースを指定します。アプリケーションを作成した後に入力を変更することもできます。Amazon Kinesis Data Analytics では、アプリケーションに対して以下のストリーミングソースがサポートされています。
+ Kinesis データストリーム 
+ Firehose 配信ストリーム

**注記**  
2023 年 9 月 12 日以降、Kinesis Data Analytics for SQL をまだ使用していない場合、Kinesis Data Firehose をソースとして使用して新しいアプリケーションを作成することはできません。`KinesisFirehoseInput` と一緒に Kinesis Data Analytics for SQL アプリケーションを使用している既存のお客様は、Kinesis Data Analytics を使用して既存のアカウント内で、引き続き `KinesisFirehoseInput` でアプリケーションを追加できます。既存のお客様で、`KinesisFirehoseInput` と一緒に Kinesis Data Analytics for SQL アプリケーションを使用して新規アカウントを作成される場合は、サービス制限拡大フォームを通してケースを作成できます。詳細については、[AWS サポート センター](https://console.aws.amazon.com/support/home#/)を参照してください。新しいアプリケーションを本番環境に移行する前に、必ずテストすることをお勧めします。

**注記**  
Kinesis データストリームが暗号化されている場合、Kinesis Data Analytics は暗号化されたストリームのデータにシームレスにアクセスし、それ以上の設定は必要ありません。Kinesis Data Analytics では、Kinesis Data Streams から読み取った暗号化されていないデータは保存されません。詳細については、「[Kinesis Data Streams 用のサーバー側の暗号化とは](https://docs.aws.amazon.com/streams/latest/dev/what-is-sse.html)」を参照してください。

Kinesis Data Analytics は、ストリーミングソースに新しいデータがあるか継続的にポーリングし、入力設定に応じてアプリケーション内ストリームに取り込みます。

**注記**  
アプリケーションの入力として Kinesis ストリームを追加しても、ストリーム内のデータには影響を与えません。同じ Kinesis ストリームに Firehose 配信ストリームなどの別のリソースもアクセスした場合、Firehose 配信ストリームと Kinesis Data Analytics アプリケーションの両方が同じデータを受け取ります。ただし、スループットとスロットリングは影響を受ける可能性があります。

アプリケーションコードは、アプリケーション内ストリームをクエリできます。入力設定の一部として以下を指定します。
+ **ストリーミングソース** – ストリームの Amazon リソースネーム (ARN) と、Kinesis Data Analytics がユーザーに代わってストリームにアクセスするために引き受けることができる IAM ロールを指定します。
+ **アプリケーション内ストリーム名のプレフィックス** – アプリケーションを起動すると、Kinesis Data Analytics によって指定されたアプリケーション内ストリームが作成されます。この名前を使用して、アプリケーションコードでアプリケーション内ストリームにアクセスします。

  オプションで、ストリーミングリソースを複数のアプリケーション内ストリームにマッピングできます。詳細については、「[制限](limits.md)」を参照してください。この場合、Amazon Kinesis Data Analytics は、*prefix*`_001`、*prefix*`_002`、*prefix*`_003` などの名前で、アプリケーション内ストリームを指定された数だけ作成します。デフォルトでは、Kinesis Data Analytics はストリーミングソースを *prefix*`_001` という名前の 1 つのアプリケーション内ストリームにマッピングします。

  アプリケーション内ストリームに挿入できる行は、速度の制限があります。そのため、Kinesis Data Analytics では複数のアプリケーション内ストリームをサポートして、より高速にアプリケーションにレコードを届けます。アプリケーションがストリーミングソースのデータをアップし続けることができない場合は、並列処理ユニットを追加してパフォーマンスを向上させることができます。
+ **マッピングスキーマ** – ストリーミングソースでのレコード形式 (JSON、CSV) を指定します。また、ストリームの各レコードが、作成されるアプリケーション内ストリーム内の列にマッピングされる方法も指定します。ここで、列名とデータ型を指定します。

**注記**  
Kinesis Data Analytics は、入力アプリケーション内ストリームの作成時に、識別子 (ストリーム名および列名) に引用符を追加します。このストリームと列をクエリする場合は、完全一致 (大文字と小文字が正確に一致) を使用して引用符内を指定する必要があります。識別子の詳細については、「Amazon Managed Service for Apache Flink SQL リファレンス」の「[Identifiers](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-identifiers.html)」を参照してください。

Amazon Kinesis Data Analytics コンソールでアプリケーションを作成し、入力を設定できます。その後、コンソールは必要な API コールを行います。アプリケーション入力の設定は、新しいアプリケーション API の作成時、または既存のアプリケーションに入力設定を追加するときに行うことができます。詳細については、「[CreateApplication](API_CreateApplication.md)」および「[AddApplicationInput](API_AddApplicationInput.md)」を参照してください。以下は `Createapplication` API リクエストボディの入力設定部分です。

```
 "Inputs": [
        {
            "InputSchema": {
                "RecordColumns": [
                    {
                        "Mapping": "string",
                        "Name": "string",
                        "SqlType": "string"
                    }
                ],
                "RecordEncoding": "string",
                "RecordFormat": {
                    "MappingParameters": {
                        "CSVMappingParameters": {
                            "RecordColumnDelimiter": "string",
                            "RecordRowDelimiter": "string"
                        },
                        "JSONMappingParameters": {
                            "RecordRowPath": "string"
                        }
                    },
                    "RecordFormatType": "string"
                }
            },
            "KinesisFirehoseInput": {
                "ResourceARN": "string",
                "RoleARN": "string"
            },
            "KinesisStreamsInput": {
                "ResourceARN": "string",
                "RoleARN": "string"
            },
            "Name": "string"
        }
    ]
```

## リファレンスソースの設定
<a name="source-reference"></a>

オプションで、既存のアプリケーションにリファレンスデータソースを追加して、ストリーミングソースから送信されるデータを強化することもできます。リファレンスデータは Amazon S3 バケット内のオブジェクトとして保存する必要があります。アプリケーションが起動すると、Amazon Kinesis Data Analytics は Amazon S3 オブジェクトを読み取り、アプリケーション内リファレンステーブルを作成します。その後、アプリケーションコードでこれをアプリケーション内ストリームと結合できます。

サポートされている形式 (CSV、JSON) で、Amazon S3 オブジェクトにリファレンスデータを保存します。たとえば、アプリケーションで株注文を分析すると仮定します。ストリーミングソースには次に示すレコード形式があるとします。

```
Ticker, SalePrice, OrderId

AMZN     $700        1003
XYZ      $250        1004
...
```

この場合、会社名などの詳細を株価ティッカーに提供するリファレンスデータソースを保持することを検討する場合があります。

```
Ticker, Company
AMZN, Amazon
XYZ, SomeCompany
...
```

アプリケーションのリファレンスデータソースは、API またはコンソールで追加できます。Amazon Kinesis Data Analytics では、以下の API アクションで、リファレンスデータソースを管理します。
+  [AddApplicationReferenceDataSource](API_AddApplicationReferenceDataSource.md)
+ [UpdateApplication](API_UpdateApplication.md)

コンソールを使用してリファレンスデータを追加する方法の詳細については、「[例: Kinesis Data Analytics アプリケーションにリファレンスデータを追加する](app-add-reference-data.md)」を参照してください。

次の点に注意してください。
+ アプリケーションが実行されている場合、Kinesis Data Analytics はアプリケーション内リファレンステーブルを作成し、ただちにリファレンスデータをロードします。
+ アプリケーションが実行されていない (例えば、準備完了状態など) 場合、Kinesis Data Analytics は更新された入力設定を保存するだけです。アプリケーションの実行が始まると、Kinesis Data Analytics はリファレンスデータをテーブルとしてアプリケーションにロードします。

Kinesis Data Analytics がアプリケーション内リファレンステーブルを作成した後で、データを更新するとします。Amazon S3 オブジェクトを更新したり、別の Amazon S3 オブジェクトを使用したいという場合があるかもしれません。この場合は、[UpdateApplication](API_UpdateApplication.md) を明示的に呼び出すか、コンソールで [**アクション**]、[**リファレンスデータテーブルを同期**] の順に選択します。Kinesis Data Analytics では、アプリケーション内リファレンステーブルは自動的に更新されません。

リファレンスデータソースとして作成できる Amazon S3 オブジェクトには、サイズの制限があります。詳細については、「[制限](limits.md)」を参照してください。オブジェクトのサイズが制限を超えた場合には、Kinesis Data Analytics でデータをロードできません。アプリケーションの状態は実行中と表示されますが、データが読み込まれません。

リファレンスデータソースを追加する場合は、次の情報を指定します。
+ **S3 バケットおよびオブジェクトキー名** – バケット名およびオブジェクトキーに加えて、ユーザーの代わりにオブジェクトを読み取るために Kinesis Data Analytics が引き受けることができる IAM ロールも指定します。
+ **アプリケーション内リファレンステーブル名** – Kinesis Data Analytics がこのアプリケーション内テーブルを作成し、Amazon S3 オブジェクトを読み取ってそこに入力します。これは、アプリケーションコードで指定するテーブルの名前です。
+ **マッピングスキーマ** – レコード形式 (JSON, CSV)、Amazon S3 オブジェクトに保存されたデータのエンコードを記述します。また、各データ要素がどのようにアプリケーション内リファレンステーブルにマッピングされるかも記述します。

以下に、`AddApplicationReferenceDataSource` API リクエストの本文を示します。

```
{
    "applicationName": "string",
    "CurrentapplicationVersionId": number,
    "ReferenceDataSource": {
        "ReferenceSchema": {
            "RecordColumns": [
                {
                    "IsDropped": boolean,
                    "Mapping": "string",
                    "Name": "string",
                    "SqlType": "string"
                }
            ],
            "RecordEncoding": "string",
            "RecordFormat": {
                "MappingParameters": {
                    "CSVMappingParameters": {
                        "RecordColumnDelimiter": "string",
                        "RecordRowDelimiter": "string"
                    },
                    "JSONMappingParameters": {
                        "RecordRowPath": "string"
                    }
                },
                "RecordFormatType": "string"
            }
        },
        "S3ReferenceDataSource": {
            "BucketARN": "string",
            "FileKey": "string",
            "ReferenceRoleARN": "string"
        },
        "TableName": "string"
    }
}
```