Amazon Timestream for LiveAnalytics に類似した機能をご希望の場合は Amazon Timestream for InfluxDB をご検討ください。リアルタイム分析に適した、シンプルなデータインジェストと 1 桁ミリ秒のクエリ応答時間を特徴としています。詳細については、こちらを参照してください。
Amazon Kinesis
の使用Amazon Managed Service for Apache Flink
Managed Service for Apache Flink のサンプル Timestream データコネクタを使用して、Kinesis Data Streams から Timestream for LiveAnalytics にデータを送信できます。詳細については、「Amazon Managed Service for Apache Flink for Apache Flink」を参照してください。
EventBridge パイプを使用して Kinesis データを Timestream に送信する
EventBridge パイプを使用して、Kinesis ストリームから Amazon Timestream for LiveAnalytics テーブルにデータを送信できます。
パイプは、サポートされているソースとターゲット間のポイントツーポイント統合を目的としており、高度な変換とエンリッチメントをサポートしています。パイプを使用すると、イベント駆動型アーキテクチャを開発する際に専門知識や統合コードが不要になります。パイプをセットアップするには、ソースを選択し、オプションのフィルタリングを追加し、オプションのエンリッチメントを定義し、イベントデータのターゲットを選択します。
この統合により、データインジェストパイプラインを簡素化しながら、Timestream の時系列データ分析機能を活用できます。
EventBridge パイプを Timestream と併用すると次のようなメリットがあります。
リアルタイムのデータインジェスト: Kinesis から Timestream for LiveAnalytics にデータを直接ストリーミングすることで、リアルタイムの分析とモニタリングが可能になります。
シームレスな統合: EventBridge パイプを利用して、複雑なカスタム統合を必要とせずにデータフローを管理できます。
フィルタリングと変換の強化: 特定のデータ処理要件を満たすため、Kinesis レコードを Timestream に保存される前にフィルタリングまたは変換できます。
スケーラビリティ: 高スループットのデータストリームを処理し、組み込みの並列処理とバッチ処理機能を使用して効率的なデータ処理を実現します。
設定
Kinesis から Timestream にデータをストリーミングするように EventBridge パイプを設定するには、次の手順に従います。
Kinesis Stream を作成する
データを取り込むアクティブな Kinesis データストリームがあることを確認します。
Timestream データベースおよびテーブルを作成する
データを保存する Timestream データベースおよびテーブルを設定します。
EventBridge パイプを設定する:
ソース: ソースとして Kinesis ストリームを選択します。
ターゲット: ターゲットとして Timestream を選択します。
バッチ処理設定: バッチ処理ウィンドウとバッチサイズを定義してデータ処理を最適化し、レイテンシーを短縮します。
重要
パイプを設定するときは、いくつかのレコードを取り込んで、すべての設定の正確性をテストすることをお勧めします。パイプの作成が成功しても、パイプラインの正確性とエラーのないデータフローが保証されるわけではありません。マッピングの適用後に、不適切なテーブル、不適切な動的パスパラメータ、無効な Timestream レコードなどのランタイムエラーが発生し、実際のデータがパイプを通過するときに検出される可能性があります。
以下の設定により、データが取り込まれる速度が決まります。
BatchSize: Timestream for LiveAnalytics に送信されるバッチの最大サイズ。範囲は 0~100 です。スループットを最大化するには、この値を 100 にすることをお勧めします。
MaximumBatchingWindowInSeconds: バッチが Timestream for LiveAnalytics ターゲットに送信される前に batchSize を満たすまでの最大待機時間。この設定により、イベントの受信速度に応じた取り込みの遅延時間が決まります。この値を 10 秒未満にして、データを常にほぼリアルタイムで Timestream に送信することをお勧めします。
ParallelizationFactor: 各シャードから同時に処理するバッチの数です。スループットを最大化し、ほぼリアルタイムで取り込むには、最大値の 10 を使用することをお勧めします。
ストリームが複数のターゲットによって読み取られる場合は、拡張ファンアウトを使用してパイプに専用のコンシューマーを提供すると、高いスループットを実現できます。詳細については、「Kinesis Data Streams ユーザーガイド」で Kinesis Data Streams API を使用して拡張ファンアウトコンシューマーを開発する方法を確認してください。
注記
達成できる最大スループットは、アカウントあたりの同時パイプ実行数によって制限されます。
次の設定によってデータ損失を防止できます。
DeadLetterConfig: ユーザーエラーが原因でイベントを Timestream for LiveAnalytics に取り込むことができなかった場合にデータが失われないよう、DeadLetterConfig を常に設定することをお勧めします。
次の設定によってパイプのパフォーマンスを最適化することで、レコードが原因の速度低下やブロックを防ぐことができます。
MaximumRecordAgeInSeconds: この値より古いレコードは処理されず、DLQ に直接移動されます。この値は、ターゲット Timestream テーブルの設定済みのメモリストア保持期間を超えないように設定することをお勧めします。
MaximumRetryAttempts: レコードが DeadLetterQueue に送信されるまでの再試行回数。この値は 10 に設定することをお勧めします。これによって一時的な問題に対応できます。永続的な問題の場合、レコードは DeadLetterQueue に移動され、残りのストリームのブロックが解除されます。
OnPartialBatchItemFailure: 部分的なバッチ処理をサポートするソースの場合は、これを有効にして、DLQ にドロップ/送信する前に失敗レコードをさらに再試行するよう AUTOMATIC_BISECT に設定することをお勧めします。
設定例
以下は、Kinesis ストリームから Timestream テーブルにデータをストリーミングするように EventBridge パイプを設定する方法の例です。
例 Timestream の IAM ポリシー更新
例 Kinesis ストリーム設定
{ "Source": "arn:aws:kinesis:us-east-1:123456789012:stream/my-kinesis-stream", "SourceParameters": { "KinesisStreamParameters": { "BatchSize": 100, "DeadLetterConfig": { "Arn": "arn:aws:sqs:us-east-1:123456789012:my-sqs-queue" }, "MaximumBatchingWindowInSeconds": 5, "MaximumRecordAgeInSeconds": 1800, "MaximumRetryAttempts": 10, "StartingPosition": "LATEST", "OnPartialBatchItemFailure": "AUTOMATIC_BISECT" } } }
例 Timestream ターゲット設定
{ "Target": "arn:aws:timestream:us-east-1:123456789012:database/my-database/table/my-table", "TargetParameters": { "TimestreamParameters": { "DimensionMappings": [ { "DimensionName": "sensor_id", "DimensionValue": "$.data.device_id", "DimensionValueType": "VARCHAR" }, { "DimensionName": "sensor_type", "DimensionValue": "$.data.sensor_type", "DimensionValueType": "VARCHAR" }, { "DimensionName": "sensor_location", "DimensionValue": "$.data.sensor_loc", "DimensionValueType": "VARCHAR" } ], "MultiMeasureMappings": [ { "MultiMeasureName": "readings", "MultiMeasureAttributeMappings": [ { "MultiMeasureAttributeName": "temperature", "MeasureValue": "$.data.temperature", "MeasureValueType": "DOUBLE" }, { "MultiMeasureAttributeName": "humidity", "MeasureValue": "$.data.humidity", "MeasureValueType": "DOUBLE" }, { "MultiMeasureAttributeName": "pressure", "MeasureValue": "$.data.pressure", "MeasureValueType": "DOUBLE" } ] } ], "SingleMeasureMappings": [], "TimeFieldType": "TIMESTAMP_FORMAT", "TimestampFormat": "yyyy-MM-dd HH:mm:ss.SSS", "TimeValue": "$.data.time", "VersionValue": "$.approximateArrivalTimestamp" } } }
イベントの変換
EventBridge パイプを使用すると、データを Timestream に到達する前に変換できます。変換ルールを定義して、フィールド名の変更など、受信 Kinesis レコードの変更を行うことができます。
Kinesis ストリームに温度と湿度のデータが含まれているとします。EventBridge 変換を使用して、これらのフィールドを Timestream に挿入する前に名前変更できます。
ベストプラクティス
バッチ処理とバッファリング
書き込みレイテンシーと処理効率のバランスを取るように、バッチ処理ウィンドウとバッチサイズを設定します。
バッチ処理ウィンドウを使用して処理前に十分なデータを蓄積し、高頻度の小さなバッチによるオーバーヘッドを削減します。
並列処理
ParallelizationFactor 設定を使用して、特にスループットの高いストリームの同時実行を増やします。これにより、各シャードの複数のバッチを同時に処理できます。
データ変換
EventBridge パイプの変換機能を活用して、レコードを Timestream に保存する前にフィルタリングおよび強化します。これにより、データを分析要件に合わせることができます。
セキュリティ
EventBridge パイプに使用される IAM ロールに、Kinesis からの読み込みと Timestream への書き込みに必要なアクセス許可があることを確認します。
暗号化とアクセス制御手段を使用して、転送中のデータおよび保管中のデータを保護します。
デバッグの失敗
-
パイプの自動無効化
ターゲットが存在しないか、アクセス許可の問題がある場合、パイプは約 2 時間後に自動的に無効になります。
-
Throttles
パイプには、スロットルが減少するまで自動的にバックオフして再試行する機能があります。
-
ログの有効化
ログを ERROR レベルで有効にし、実行データを含めて、失敗に関する追加情報を取得することをお勧めします。失敗が発生した場合、これらのログには Timestream との間で送受信されたリクエスト/レスポンスが記録されます。これにより、関連するエラーを理解し、必要に応じて修正後にレコードを再処理できます。
モニタリング
データフローの問題を検出するために、以下に関するアラームを設定することをお勧めします。
ソースのレコードの最大保存期間
GetRecords.IteratorAgeMilliseconds
パイプの失敗メトリクス
ExecutionFailedTargetStageFailed
Timestream Write API エラー
UserErrors
その他のモニタリングメトリクスについては、「EventBridge ユーザーガイド」の「EventBridge のモニタリング」を参照してください。