

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon Data Firehose でソースデータを変換する
<a name="data-transformation"></a>

Amazon Data Firehoseでは、Lambda 関数を呼び出して、着信ソースデータを変換してから宛先に配信できます。Amazon Data Firehose のデータ変換は、Firehose ストリームの作成時に有効にすることができます。

## データ変換フローを理解する
<a name="data-transformation-flow"></a>

Firehose のデータ変換を有効にすると、Firehose は着信データをバッファリングします。バッファリングサイズのヒントの範囲は 0.2 MB～3 MB です。デフォルトの Lambda バッファリングサイズのヒントは、Splunk と Snowflake を除くすべての宛先で 1 MB です。Splunk と Snowflake の場合、デフォルトのバッファリングのヒントは 256 KB です。Lambda バッファリング間隔のヒントの範囲は 0～900 秒です。デフォルトの Lambda バッファリング間隔のヒントは、Snowflake を除くすべての宛先で 60 秒です。Snowflake の場合、デフォルトのバッファリングのヒントの間隔は 30 秒です。バッファリングサイズを調整するには、[CreateDeliveryStream](https://docs.aws.amazon.com/firehose/latest/APIReference/API_CreateDeliveryStream.html) または [UpdateDestination](https://docs.aws.amazon.com/firehose/latest/APIReference/API_UpdateDestination.html) API の [ProcessingConfiguration](https://docs.aws.amazon.com/firehose/latest/APIReference/API_ProcessingConfiguration.html) パラメータを、`BufferSizeInMBs` および `IntervalInSeconds` という [ProcessorParameter](https://docs.aws.amazon.com/firehose/latest/APIReference/API_ProcessorParameter.html) で設定します。次に、Firehose は、同期呼び出しモードを使用して、バッファされた各バッチと同期的に指定された Lambda AWS Lambda 関数を呼び出します。変換されたデータは、Lambda から Firehose に送信されます。その後、変換されたデータは、指定された宛先のバッファリングサイズとバッファリング間隔のいずれかに到達したときに、Firehose より宛先に配信されます。到達順序は関係ありません。

**重要**  
Lambda 同期呼び出しモードには、リクエストとレスポンスの両方について、ペイロードサイズに 6 MB の制限があります。関数にリクエストを送信するためのバッファサイズが 6 MB 以下であることを確認してください。また、関数より返るレスポンスが 6 MB を超えないことを確認します。

## Lambda の呼び出し期間
<a name="data-transformation-execution-duration"></a>

Amazon Data Firehose では、最大 5 分の Lambda の呼び出し時間がサポートされます。Lambda 関数が完了するまでに 5 分以上かかると、次のエラーが発生します。Firehose は AWS Lambda を呼び出すときにタイムアウトエラーを検出しました。サポートされている関数のタイムアウトは最大 5 分です。

このようなエラーが発生した場合の Amazon Data Firehose による処理の詳細については、「[データ変換の失敗を処理する](data-transformation-failure-handling.md)」を参照してください。

# データ変換に必要なパラメータ
<a name="data-transformation-status-model"></a>

Lambda からのすべての変換されたレコードには、次のパラメータが含まれる必要があります。含まれない場合、Amazon Data Firehose はそれらのレコードを拒否し、データ変換の失敗として処理します。

------
#### [ For Kinesis Data Streams and Direct PUT ]

Lambda から変換されたレコードすべてで、次のパラメータが必要です。
+ `recordId` – レコード ID は呼び出し時に Amazon Data Firehose から Lambda に渡されます。変換されたレコードには、同じレコード ID が含まれる必要があります。元のレコードの ID と変換されたレコードの ID との不一致は、データ変換失敗として扱われます。
+ `result` – レコードのデータ変換のステータス。指定できる値は次のとおりです: `Ok` (レコードが正常に変換された)、`Dropped` (レコードが処理ロジックによって意図的に削除された)、`ProcessingFailed` (レコードを変換できなかった)。レコードのステータスが `Ok` または `Dropped` の場合、Amazon Data Firehose はレコードが正常に処理されたとみなします。それ以外の場合、Amazon Data Firehose はそれが正常に処理できなかったとみなします。
+ `data` – base64 エンコード後の変換されたデータペイロード。

  以下は、Lambda の結果の出力例です。

  ```
   {
      "recordId": "<recordId from the Lambda input>",
      "result": "Ok",
      "data": "<Base64 encoded Transformed data>"
  }
  ```

------
#### [ For Amazon MSK ]

Lambda から変換されたレコードすべてで、次のパラメータが必要です。
+ `recordId` – レコード ID は呼び出し時に Firehose から Lambda に渡されます。変換されたレコードには、同じレコード ID が含まれる必要があります。元のレコードの ID と変換されたレコードの ID との不一致は、データ変換失敗として扱われます。
+ `result` – レコードのデータ変換のステータス。指定できる値は次のとおりです: `Ok` (レコードが正常に変換された)、`Dropped` (レコードが処理ロジックによって意図的に削除された)、`ProcessingFailed` (レコードを変換できなかった)。レコードのステータスが `Ok` または `Dropped` の場合、Firehose はレコードが正常に処理されたとみなします。それ以外の場合、Firehose はそれが正常に処理できなかったとみなします。
+ `KafkaRecordValue` – base64 エンコード後の変換されたデータペイロード。

  以下は、Lambda の結果の出力例です。

  ```
   {
      "recordId": "<recordId from the Lambda input>",
      "result": "Ok",
      "kafkaRecordValue": "<Base64 encoded Transformed data>"
  }
  ```

------

# サポートされている Lambda ブループリント
<a name="lambda-blueprints"></a>

これらの設計図は、 AWS Lambda 関数を作成して使用し、Amazon Data Firehose データストリーム内のデータを変換する方法を示しています。

**AWS Lambda コンソールで使用可能なブループリントを表示するには**

1. にサインイン AWS マネジメントコンソール し、[https://console.aws.amazon.com/lambda/](https://console.aws.amazon.com/lambda/) で AWS Lambda コンソールを開きます。

1. [**関数の作成**]、[**Use a blueprint (設計図の使用)**] の順に選択します。

1. **[ブループリント]** フィールドで、キーワード `firehose` で検索して Amazon Data Firehose Lambda ブループリントを見つけます。

ブループリントのリスト:
+ **Amazon Data Firehose ストリームに送信されたレコードを処理する (Node.js、Python)**

  このブループリントは、 AWS Lambda を使用して Firehose データストリーム内のデータを処理する方法の基本的な例を示しています。

  *最終リリース日:* 2016 年 11 月 

  *リリースノート:* なし
+ **Firehose に送信された CloudWatch Logs を処理する**

  このブループリントは非推奨です。このブループリントは使用しないでください。解凍された CloudWatch Logs データが 6 MB (Lambda の制限) を超えると、高額な料金が発生する可能性があります。Firehose に送信された CloudWatch Logs の処理については、「[Writing to Firehose Using CloudWatch Logs](https://docs.aws.amazon.com/firehose/latest/dev/writing-with-cloudwatch-logs.html)」を参照してください。
+ **Syslog 形式の Amazon Data Firehose ストリームレコードを JSON (Node.js) に変換する**

  このブループリントは、RFC3164 Syslog 形式の入力レコードを JSON に変換する方法を示しています。

  *最終リリース日:* 2016 年 11 月 

  *リリースノート:* なし 

**で使用できるブループリントを表示するには AWS Serverless Application Repository**

1. [AWS Serverless Application Repository](https://aws.amazon.com/serverless/serverlessrepo) に移動します。

1. [**すべてのアプリケーションを参照**] を選択します。

1. [**アプリケーション**] フィールドで、キーワード `firehose` を検索します。

設計図を使用せずに Lambda 関数を作成することもできます。[AWS 「Lambda の開始方法](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html)」を参照してください。

# データ変換の失敗を処理する
<a name="data-transformation-failure-handling"></a>

ネットワークタイムアウトのために、または Lambda 呼び出しの制限に達したために、Lambda 関数呼び出しが失敗した場合、Amazon Data Firehose は呼び出しをデフォルトで 3 回再試行します。呼び出しが成功しなければ、Amazon Data Firehose はそのレコードのバッチをスキップします。スキップされたレコードは処理失敗として扱われます。[CreateDeliveryStream](https://docs.aws.amazon.com/firehose/latest/APIReference/API_CreateDeliveryStream.html) または `[UpdateDestination](https://docs.aws.amazon.com/firehose/latest/APIReference/API_UpdateDestination.html)` API を使用して、再試行オプションを指定または上書きできます。このタイプの失敗の場合、呼び出しエラーログを Amazon CloudWatch Logs に出力できます。詳細については、「[CloudWatch Logs を使用して Amazon Data Firehose をモニタリングする](monitoring-with-cloudwatch-logs.md)」を参照してください。

レコードのデータ変換のステータスが `ProcessingFailed` の場合、Amazon Data Firehose はそのレコードを処理失敗として扱います。このタイプの失敗の場合、エラーログを Lambda 関数から Amazon CloudWatch Logs に出力できます。詳細については、*AWS Lambda 開発者ガイド*の「[AWS Lambdaの Amazon CloudWatch Logs にアクセス](https://docs.aws.amazon.com/lambda/latest/dg/monitoring-functions-logs.html)」を参照してください。

データ変換が失敗した場合、処理に失敗したレコードは S3 バケットの `processing-failed` フォルダに配信されます。レコードの形式は以下のとおりです。

```
{
    "attemptsMade": "count",
    "arrivalTimestamp": "timestamp",
    "errorCode": "code",
    "errorMessage": "message",
    "attemptEndingTimestamp": "timestamp",
    "rawData": "data",
    "lambdaArn": "arn"
}
```

`attemptsMade`  
呼び出しリクエストの試行回数。

`arrivalTimestamp`  
Amazon Data Firehose がレコードを受信した時間。

`errorCode`  
Lambda から返された HTTP エラーコード。

`errorMessage`  
Lambda から返されたエラーメッセージ。

`attemptEndingTimestamp`  
Amazon Data Firehose が Lambda 呼び出しの試行を停止した時間。

`rawData`  
base64 エンコード後のレコードデータ。

`lambdaArn`  
 Lambda 関数の Amazon リソースネーム (ARN)。

# ソースレコードのバックアップ
<a name="data-transformation-source-record-backup"></a>

Amazon Data Firehose は、変換されたレコードを宛先に配信すると同時に、変換されなかったすべてのレコードを S3 バケットにバックアップできます。ソースレコードのバックアップは、Firehose ストリームの作成または更新時に有効にすることができます。ソースレコードのバックアップは、有効にした後で無効にすることはできません。