搭配 使用 OpenSearch 擷取管道 Amazon Managed Streaming for Apache Kafka - Amazon OpenSearch Service

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

搭配 使用 OpenSearch 擷取管道 Amazon Managed Streaming for Apache Kafka

您可以使用 Kafka 外掛程式,將資料從 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 擷取至 OpenSearch Ingestion 管道。透過 Amazon MSK,您可以建置和執行使用 Apache Kafka 處理串流資料的應用程式。OpenSearch Ingestion 使用 AWS PrivateLink 連線到 Amazon MSK。您可以從 Amazon MSK 和 Amazon MSK Serverless 叢集擷取資料。這兩個程序的唯一區別是設定管道之前必須採取的先決條件。

佈建的 Amazon MSK 先決條件

建立 OpenSearch Ingestion 管道之前,請執行下列步驟:

  1. 按照 Amazon Managed Streaming for Apache Kafka 開發人員指南中的建立叢集中的步驟建立 Amazon MSK 佈建叢集。 對於中介裝置類型,請選擇 t3類型以外的任何選項,因為 OpenSearch Ingestion 不支援這些選項。

  2. 叢集處於作用中狀態後,請遵循開啟多 VPC 連線中的步驟。

  3. 根據您的叢集和管道是否位於相同位置,遵循將叢集政策連接至 MSK 叢集中的步驟,以連接下列其中一個政策 AWS 帳戶。此政策允許 OpenSearch Ingestion 建立 Amazon MSK 叢集的 AWS PrivateLink 連線,並從 Kafka 主題讀取資料。請務必resource使用自己的 ARN 更新 。

    當您的叢集和管道位於相同的 時,適用下列政策 AWS 帳戶:

    JSON
    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-id" } ] }

    如果您的 Amazon MSK 叢集與管道 AWS 帳戶 位於不同的 中,請改為連接下列政策。請注意,跨帳戶存取僅適用於佈建的 Amazon MSK 叢集,而不適用於 Amazon MSK Serverless 叢集。的 AWS principal ARN 應該是您提供給管道組態的相同管道角色的 ARN:

    JSON
    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:region:msk-account-id:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:region:msk-account-id:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::pipeline-account-id:role/pipeline-role" }, "Action": [ "kafka-cluster:*", "kafka:*" ], "Resource": [ "arn:aws:kafka:us-east-1:msk-account-id:cluster/cluster-name/cluster-id", "arn:aws:kafka:us-east-1:msk-account-id:topic/cluster-name/cluster-id/*", "arn:aws:kafka:us-east-1:msk-account-id:group/cluster-name/*" ] } ] }
  4. 依照建立主題中的步驟建立 Kafka 主題。確定 BootstrapServerString是其中一個私有端點 (單一 VPC) 引導 URLs。根據 Amazon MSK 叢集擁有的區域數量3, 的值--replication-factor應為 2或 。的值--partitions應至少為 10

  5. 遵循生產和使用資料中的步驟來生產和使用資料。同樣地,請確定 BootstrapServerString是您的私有端點 (單一 VPC) 引導 URLs之一。

Amazon MSK Serverless 先決條件

建立 OpenSearch Ingestion 管道之前,請執行下列步驟:

  1. 按照 Amazon Managed Streaming for Apache Kafka 開發人員指南中的建立 MSK Serverless 叢集中的步驟建立 Amazon MSK Serverless 叢集

  2. 叢集處於作用中狀態後,請遵循將叢集政策連接至 MSK 叢集中的步驟,以連接下列政策。請務必resource使用自己的 ARN 更新 。

    JSON
    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-id" } ] }

    此政策允許 OpenSearch Ingestion 建立 Amazon MSK Serverless 叢集的 AWS PrivateLink 連線,並從 Kafka 主題讀取資料。當您的叢集和管道位於相同 時,此政策適用 AWS 帳戶,因為 Amazon MSK Serverless 不支援跨帳戶存取,因此此政策必須是 true。

  3. 依照建立主題中的步驟建立 Kafka 主題。請確定 BootstrapServerString是您的簡易身分驗證和安全層 (SASL) IAM 引導 URLs之一。根據 Amazon MSK Serverless 叢集擁有的區域數量3, 的值--replication-factor應為 2或 。的值--partitions應至少為 10

  4. 遵循生產和使用資料中的步驟來生產和使用資料。同樣地,請確定 BootstrapServerString是您的簡易身分驗證和安全層 (SASL) IAM 引導 URLs之一。

步驟 1:設定管道角色

在您設定 Amazon MSK 提供或無伺服器叢集之後,請在管道組態中要使用的管道角色中新增下列 Kafka 許可:

JSON
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:AlterCluster", "kafka-cluster:DescribeCluster", "kafka:DescribeClusterV2", "kafka:GetBootstrapBrokers" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-id" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:*Topic*", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:region:account-id:topic/cluster-name/cluster-id/topic-name" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": [ "arn:aws:kafka:region:account-id:group/cluster-name/*" ] } ] }

步驟 2:建立管道

然後,您可以如下所示設定 OpenSearch Ingestion 管道,將 Kafka 指定為來源:

version: "2" log-pipeline: source: kafka: acknowledgements: true topics: - name: "topic-name" group_id: "grouplambd-id" aws: msk: arn: "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-id" region: "us-west-2" processor: - grok: match: message: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1es.amazonaws.com"] index: "index_name" aws_region: "region" aws_sigv4: true

您可以使用預先設定的 Amazon MSK 藍圖來建立此管道。如需詳細資訊,請參閱使用藍圖

步驟 3:(選用) 使用 AWS Glue 結構描述登錄檔

當您搭配 Amazon MSK 使用 OpenSearch Ingestion 時,您可以將 AVRO 資料格式用於 AWS Glue 結構描述登錄檔中託管的結構描述。使用AWS Glue 結構描述登錄檔,您可以集中探索、控制和發展資料串流結構描述。

若要使用此選項,請在管道組態type中啟用結構描述:

schema: type: "aws_glue"

您還必須在管道角色中提供 AWS Glue 讀取存取許可。您可以使用稱為 AWSGlueSchemaRegistryReadonlyAccess 的 AWS 受管政策。此外,您的登錄檔必須與 OpenSearch Ingestion 管道位於相同的 AWS 帳戶 和 區域。

步驟 4:(選用) 為 Amazon MSK 管道設定建議的運算單位 (OCUs)

每個運算單位每個主題都有一個取用者。中介裝置在指定主題的這些取用者之間平衡分割區。不過,當分割區數量大於取用者數量時,Amazon MSK 會在每個取用者上託管多個分割區。OpenSearch Ingestion 具有內建的自動擴展功能,可根據 CPU 用量或管道中待定記錄的數量來擴展或縮減規模。

為了獲得最佳效能,請將分割區分散到許多運算單位以進行平行處理。如果主題具有大量分割區 (例如,超過 96 個,也就是每個管道的最大 OCUs),建議您使用 1–96 OCUs 設定管道。這是因為它將視需要自動擴展。如果主題具有少量分割區 (例如,小於 96),請保持最大運算單位與分割區數量相同。

當管道有多個主題時,請選擇分割區數量最高的主題做為設定最大運算單位的參考。透過將具有一組新 OCUs的另一個管道新增至相同的主題和取用者群組,您可以幾乎線性地擴展輸送量。