Amazon MSK - Amazon Timestream

Amazon Timestream for LiveAnalytics に類似した機能をご希望の場合は Amazon Timestream for InfluxDB をご検討ください。リアルタイム分析に適した、シンプルなデータインジェストと 1 桁ミリ秒のクエリ応答時間を特徴としています。詳細については、こちらを参照してください。

Amazon MSK

Managed Service for Apache Flink を使用して Timestream for LiveAnalytics に Amazon MSK データを送信する

Managed Service for Apache Flink のサンプル Timestream データコネクタと類似するデータコネクタを構築することで、Amazon MSK から Timestream にデータを送信できます。詳細については、「Amazon Managed Service for Apache Flink」を参照してください。

Kafka Connect を使用して Amazon MSK データを Timestream for LiveAnalytics に送信する

Kafka Connect を使用して、時系列データを Amazon MSK から Timestream for LiveAnalytics に直接取り込むことができます。

Timestream 用のサンプル Kafka シンクコネクタを作成しました。また、データを Kafka トピックに発行するためのサンプル Apache JMeter テストプランを作成しました。これにより、データは Timestream Kafka シンクコネクタを介してトピックから Timestream for LiveAnalytics テーブルに流れます。これらのアーティファクトはすべて GitHub で入手できます。

注記

Timestream Kafka シンクコネクタを使用するための推奨バージョンは、Java 11 です。複数の Java バージョンがある場合は、Java 11 を JAVA_HOME 環境変数にエクスポートしてください。

サンプルアプリケーションを作成する

使用を開始するには、以下の手順を実行します。

  1. Timestream for LiveAnalytics で、kafkastream という名前のデータベースを作成します。

    詳細については、手順「データベースを作成する」を参照してください。

  2. Timestream for LiveAnalytics で、purchase_history という名前のテーブルを作成します。

    詳細については、手順「テーブルを作成する」を参照してください。

  3. 共有されている手順に従って、以下を作成します。

    • Amazon MSK クラスター

    • Kafka プロデューサークライアントマシンとして設定されている Amazon EC2 インスタンス

    • Kafka トピック

    詳細な手順については、kafka_ingestor プロジェクトの「Prerequisites」を参照してください。

  4. Timestream Kafka シンクコネクタリポジトリのクローンを作成します。

    詳細な手順については、GitHub の「Cloning a repository」を参照してください。

  5. プラグインコードをコンパイルします。

    詳細な手順については、GitHub の「Connector – Build from source」を参照してください。

  6. 説明されている手順に従って、次のファイルを S3 バケットにアップロードします。

    • /target ディレクトリの jar ファイル (kafka-connector-timestream->VERSION<-jar-with-dependencies.jar)

    • サンプル JSON スキーマファイル purchase_history.json

    詳細な手順については、「Amazon S3 ユーザーガイド」の「オブジェクトのアップロード」を参照してください。

  7. VPC エンドポイントを 2 つ作成します。これらのエンドポイントは、MSK コネクタが AWS PrivateLink を使用してリソースにアクセスするために使用されます。

    • Amazon S3 バケットにアクセスするためのエンドポイント

    • Timestream for LiveAnalytics テーブルにアクセスするためのエンドポイント。

    詳細な手順については、「VPC Endpoints」を参照してください。

  8. アップロードされた jar ファイルを使用してカスタムプラグインを作成します。

    詳細な手順については、「Amazon MSK デベロッパーガイド」の「カスタムプラグインの作成」を参照してください。

  9. Worker Configuration parameters」で説明されている JSON コンテンツを使用し、説明されている手順に従ってカスタムワーカー設定を作成します。

    詳細な手順については、「Amazon MSK デベロッパーガイド」のカスタムワーカー設定の作成についてのドキュメントを参照してください。

  10. サービス実行 IAM ロールを作成します。

    詳細な手順については、「IAM Service Role」を参照してください。

  11. 前のステップで作成したカスタムプラグイン、カスタムワーカー設定、サービス実行 IAM ロールと、サンプルコネクタ設定を使用して Amazon MSK コネクタを作成します。

    詳細な手順については、「Amazon MSK デベロッパーガイド」の「コネクタを理解する」を参照してください。

    以下の設定パラメータの値は、それぞれの値で更新してください。詳細については、「Connector Configuration parameters」を参照してください。

    • aws.region

    • timestream.schema.s3.bucket.name

    • timestream.ingestion.endpoint

    コネクタの作成が完了するまでに 5 分~10 分かかります。パイプラインの準備が完了すると、ステータスが Running に変わります。

  12. 作成された Kafka トピックにデータを書き込むための継続的なメッセージのストリームを発行します。

    詳細な手順については、「How to use it」を参照してください。

  13. 1 つ以上のクエリを実行して、データが Amazon MSK から MSK Connect、さらに Timestream for LiveAnalytics テーブルに送信されていることを確認します。

    詳細については、手順「クエリを実行する」を参照してください。

その他のリソース

ブログ「Real-time serverless data ingestion from your Kafka clusters into Amazon Timestream using Kafka Connect」では、Timestream for LiveAnalytics Kafka シンクコネクタを使用したエンドツーエンドのパイプラインの設定について説明しています。これには、Apache JMeter テストプランを使用して数千のサンプルメッセージを Kafka トピックに発行する Kafka プロデューサークライアントマシンから、Timestream for LiveAnalytics テーブルの取り込まれたレコードの検証までが含まれます。