OpenSearch Ingestion パイプラインを Amazon Managed Streaming for Apache Kafka で使用する - Amazon OpenSearch Service

OpenSearch Ingestion パイプラインを Amazon Managed Streaming for Apache Kafka で使用する

Kafka プラグインを使用して Amazon Managed Streaming for Apache Kafka (Amazon MSK) から OpenSearch Ingestion パイプラインにデータを取り込むことができます。Amazon MSK を使用すると、Apache Kafka をストリーミングデータの処理に使用するアプリケーションを構築および実行できます。OpenSearch Ingestion は AWS PrivateLink を使用して Amazon MSK に接続します。Amazon MSK クラスターと Amazon MSK Serverless クラスターの両方からデータを取り込むことができます。2 つのプロセスの違いは、パイプラインをセットアップする前に実行する必要がある前提条件の手順だけです。

プロビジョニングされた Amazon MSK の前提条件

OpenSearch Ingestion パイプラインを作成する前に、次の手順を実行します。

  1. Amazon Managed Streaming for Apache Kafka 開発者ガイドの「クラスターの作成」の手順に従って Amazon MSK のプロビジョニングされたクラスターを作成します。[ブローカータイプ] では、t3 タイプは OpenSearch Ingestion ではサポートされていないため、それ以外のオプションを選択します。

  2. クラスターのステータスが Active になったら、「マルチ VPC 接続を有効にする」の手順に従います。

  3. クラスターとパイプラインが同じ AWS アカウント にあるかどうかに応じて、「クラスターポリシーを MSK クラスターにアタッチする」のステップに従い、以下のポリシーのいずれかをアタッチします。このポリシーでは、OpenSearch Ingestion は Amazon MSK クラスターへの AWS PrivateLink 接続を作成して、Kafka トピックからデータを読み取ることができます。必ず独自の ARN で resource を更新してください。

    クラスターとパイプラインが同じ AWS アカウント にある場合は、次のポリシーが適用されます。

    JSON
    { "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id" } ] }

    Amazon MSK クラスターがパイプラインとは異なる AWS アカウントにある場合は、次のポリシーをアタッチします。クロスアカウントアクセスは、プロビジョニングされた Amazon MSK クラスターでのみ可能であり、Amazon MSK Serverless クラスターではできないことに注意してください。AWS principal の ARN は、パイプライン設定に指定するのと同じパイプラインロールの ARN である必要があります。

    JSON
    { "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::444455556666:role/pipeline-role" }, "Action": [ "kafka-cluster:*", "kafka:*" ], "Resource": [ "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id", "arn:aws:kafka:us-east-1:111122223333:topic/cluster-name/cluster-id/*", "arn:aws:kafka:us-east-1:111122223333:group/cluster-name/*" ] } ] }
  4. トピックの作成」の手順に従って Kafka トピックを作成します。BootstrapServerString がプライベートエンドポイント (単一 VPC) のブートストラップ URL の 1 つであることを確認してください。--replication-factor の値は、Amazon MSK クラスターのゾーンの数に応じて 2 または 3 を指定します。--partitions の値は少なくとも 10 である必要があります。

  5. データの生成と消費」の手順に従って、データを生成して使用します。BootstrapServerString がプライベートエンドポイント (単一 VPC) のブートストラップ URL の 1 つであることを確認してください。

Amazon MSK Serverless の前提条件

OpenSearch Ingestion パイプラインを作成する前に、次の手順を実行します。

  1. Amazon Managed Streaming for Apache Kafka 開発者ガイドの「MSK Serverless クラスターの作成」の手順に従って Amazon MSK Serverless クラスターを作成します。

  2. クラスターのステータスが [アクティブ] になったら、「クラスターポリシーを MSK クラスターにアタッチする」の手順に従って、次のポリシーをアタッチします。必ず独自の ARN で resource を更新してください。

    JSON
    { "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id" } ] }

    このポリシーでは、OpenSearch Ingestion は Amazon MSK Serverless クラスターへの AWS PrivateLink 接続を作成して、Kafka トピックからデータを読み取ることができます。このポリシーは、クラスターとパイプラインが同じ AWS アカウントにある場合に適用されます。これは、Amazon MSK Serverless がクロスアカウントアクセスをサポートしていないため、必ず該当します。

  3. トピックの作成」の手順に従って Kafka トピックを作成します。BootstrapServerString が Simple Authentication and Security Layer (SASL) IAM ブートストラップ URL の 1 つであることを確認します。--replication-factor の値は、Amazon MSK Serverless クラスターのゾーンの数に応じて 2 または 3 を指定します。--partitions の値は少なくとも 10 である必要があります。

  4. データの生成と消費」の手順に従って、データを生成して使用します。ここでも、BootstrapServerString が Simple Authentication and Security Layer (SASL) IAM ブートストラップ URL の 1 つであることを確認します。

ステップ 1: パイプラインロールを設定する

Amazon MSK をプロビジョニングし、サーバーレスクラスターを設定したら、パイプライン設定で使用するパイプラインロールに次の Kafka アクセス許可を追加します。

JSON
{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:AlterCluster", "kafka-cluster:DescribeCluster", "kafka:DescribeClusterV2", "kafka:GetBootstrapBrokers" ], "Resource": [ "arn:aws:kafka:us-east-1:111122223333:cluster/cluster-name/cluster-id" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:*Topic*", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:us-east-1:111122223333:topic/cluster-name/cluster-id/topic-name" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": [ "arn:aws:kafka:us-east-1:111122223333:group/cluster-name/*" ] } ] }

ステップ 2: パイプラインを作成する

そして、ソースとして Kafka を指定する OpenSearch Ingestion パイプラインを次のように設定できます。

version: "2" log-pipeline: source: kafka: acknowledgements: true topics: - name: "topic-name" group_id: "grouplambd-id" aws: msk: arn: "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-id" region: "us-west-2" processor: - grok: match: message: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1es.amazonaws.com"] index: "index_name" aws_region: "region" aws_sigv4: true

事前設定された Amazon MSK ブループリントを使用して、このパイプラインを作成できます。詳細については、「ブループリントの使用」を参照してください。

ステップ 3: (オプション) AWS Glue スキーマレジストリを使用する

Amazon MSK で OpenSearch Ingestion を使用するとき、AWS Glue スキーマレジストリでホストされているスキーマに AVRO データ形式を使用できます。AWS Glue スキーマレジストリを使用すると、データストリームスキーマを一元的に検出、制御、および展開できます。

このオプションを使用するには、パイプライン設定で type スキーマを有効にします。

schema: type: "aws_glue"

パイプラインロールで AWS Glue に読み取りアクセス許可を付与する必要もあります。AWSGlueSchemaRegistryReadonlyAccess という AWS マネージドポリシーを使用できます。さらに、レジストリは OpenSearch Ingestion パイプラインと同じ AWS アカウントとリージョンにある必要があります。

ステップ 4: (オプション) Amazon MSK パイプラインの推奨コンピューティングユニット (OCU) を設定する

各コンピューティングユニットには、トピックごとに 1 つのコンシューマーがあります。ブローカーは、特定のトピックについて、これらのコンシューマー間でパーティションのバランスを取ります。ただし、パーティションの数がコンシューマーの数よりも多い場合、Amazon MSK は各コンシューマーで複数のパーティションをホストします。OpenSearch Ingestion には、CPU 使用率またはパイプライン内の保留中のレコード数に基づいてスケールアップまたはスケールダウンする自動スケーリングが組み込まれています。

最適なパフォーマンスを得るには、パーティションを多くのコンピューティングユニットに分散して並列処理を行います。トピックに多くのパーティションがある場合 (パイプラインあたりの最大数である 96 以上の OCU がある場合など)、1 ~ 96 個の OCU でパイプラインを設定することをお勧めします。これは、必要に応じて自動的にスケールするためです。トピックのパーティション数が少ない場合 (96 未満の場合など)、最大コンピューティングユニットをパーティションの数と同じにします。

パイプラインに複数のトピックがある場合は、最大コンピューティングユニットを設定する参照としてパーティション数が最も多いトピックを選択します。新しい OCU セットを含むパイプラインを同じトピックとコンシューマーグループに追加すると、スループットをほぼ直線的にスケールすることができます。