本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
Amazon OpenSearch Ingestion 中的管道功能概觀
Amazon OpenSearch Ingestion 佈建管道,其中包含來源、緩衝區、零或多個處理器,以及一或多個接收器。擷取管道採用 Data Prepper 做為資料引擎。如需管道各種元件的概觀,請參閱 Amazon OpenSearch Ingestion 中的重要概念。
下列各節提供 Amazon OpenSearch Ingestion 中一些最常用功能的概觀。
注意
這不是可供管道使用的完整功能清單。如需所有可用管道功能的完整文件,請參閱 Data Prepper 文件
持久性緩衝
持久性緩衝會將您的資料儲存在跨多個可用區域的磁碟型緩衝區中,以增強資料耐久性。您可以使用持久性緩衝,從所有支援的推送型來源擷取資料,而無需設定獨立的緩衝。這些來源包括日誌、追蹤和指標的 HTTP 和 OpenTelemetry。若要啟用持久性緩衝,請在建立或更新管道時選擇啟用持久性緩衝。如需詳細資訊,請參閱建立 Amazon OpenSearch 擷取管道。
OpenSearch Ingestion 會動態決定用於持久性緩衝、考量資料來源、串流轉換和目的地的 OCUs 數量。由於它將一些 OCUs 配置為緩衝,您可能需要增加最小和最大 OCUs以維持相同的擷取輸送量。管道會在緩衝區中保留資料長達 72 小時。
如果您為管道啟用持久性緩衝,預設請求承載大小上限如下:
-
HTTP 來源 – 10 MB
-
OpenTelemetry 來源 – 4 MB
對於 HTTP 來源,您可以將承載大小上限提高到 20 MB。請求承載大小包含整個 HTTP 請求,通常包含多個事件。每個事件不能超過 3.5 MB。
具有持久性緩衝的管道會將設定的管道單位分割為運算和緩衝單位。如果管道使用 grok、key-value 或 split string 等 CPU 密集型處理器,則會以 1:1 buffer-to-compute比率配置單位。否則,它會以 3:1 的比例配置它們,一律偏好運算單位。
例如:
-
具有 grok 和最多 2 個單位的管道 – 1 個運算單位和 1 個緩衝單位
-
具有 grok 和最多 5 個單位的管道 – 3 個運算單位和 2 個緩衝單位
-
沒有處理器和最多 2 個單位的管道 – 1 個運算單位和 1 個緩衝單位
-
沒有處理器和最多 4 個單位的管道 – 1 個運算單位和 3 個緩衝單位
-
具有 grok 和最多 5 個單位的管道 – 2 個運算單位和 3 個緩衝單位
根據預設,管道會使用 AWS 擁有的金鑰 來加密緩衝區資料。這些管道不需要管道角色的任何其他許可。
或者,您可以指定客戶受管金鑰,並將下列 IAM 許可新增至管道角色:
如需更多資訊,請參閱 AWS Key Management Service 開發人員指南中的客戶受管金鑰。
注意
如果您停用持久性緩衝,您的管道會開始完全在記憶體內緩衝上執行。
分割
您可以設定 OpenSearch Ingestion 管道,將傳入事件分割為子管道,讓您在相同的傳入事件上執行不同類型的處理。
下列範例管道會將傳入的事件分割成兩個子管道。每個子管道都會使用自己的處理器來充實和操作資料,然後將資料傳送至不同的 OpenSearch 索引。
version: "2" log-pipeline: source: http: ... sink: - pipeline: name: "logs_enriched_one_pipeline" - pipeline: name: "logs_enriched_two_pipeline" logs_enriched_one_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_one_logs" logs_enriched_two_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_two_logs"
鏈接
您可以將多個子管道鏈結在一起,以便在區塊中執行資料處理和擴充。換言之,您可以在一個子管道中以特定處理功能擴充傳入事件,然後將其傳送至另一個子管道,以使用不同的處理器進行額外擴充,最後將其傳送至其 OpenSearch 接收器。
在下列範例中,log_pipeline
子管道會使用一組處理器來充實傳入日誌事件,然後將事件傳送至名為 的 OpenSearch 索引enriched_logs
。管道會將相同的事件傳送至log_advanced_pipeline
子管道,其會處理該事件並將其傳送至名為 的不同 OpenSearch 索引enriched_advanced_logs
。
version: "2" log-pipeline: source: http: ... processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_logs" - pipeline: name: "log_advanced_pipeline" log_advanced_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_advanced_logs"
無效字母佇列
無效字母佇列 (DLQs) 是管道無法寫入目的地的事件目的地。在 OpenSearch Ingestion 中,您必須指定具有適當寫入許可的 Amazon S3 儲存貯體,以用作 DLQ。您可以將 DLQ 組態新增至管道中的每個接收器。當管道遇到寫入錯誤時,它會在設定的 S3 儲存貯體中建立 DLQ 物件。DLQ 物件存在於 JSON 檔案中,做為失敗事件的陣列。
當符合下列任一條件時,管道會將事件寫入 DLQ:
-
OpenSearch 接收器的重試次數上限已用盡。此設定至少需要 16 個 OpenSearch Ingestion。
-
目的地因為錯誤條件而拒絕事件。
組態
若要設定子管道的無效字母佇列,請在設定目的地時選擇啟用 S3 DLQ。然後,指定佇列的必要設定。如需詳細資訊,請參閱 Data Prepper DLQ 文件中的組態
寫入此 S3 DLQ 的檔案具有下列命名模式:
dlq-v${version}-${pipelineName}-${pluginId}-${timestampIso8601}-${uniqueId}
如需手動設定管道角色以允許存取 DLQ 寫入的 S3 儲存貯體的說明,請參閱 寫入 Amazon S3 或無效字母佇列的許可。
範例
請考慮下列範例 DLQ 檔案:
dlq-v2-apache-log-pipeline-opensearch-2023-04-05T15:26:19.152938Z-e7eb675a-f558-4048-8566-dac15a4f8343
以下是無法寫入目的地並傳送至 DLQ S3 儲存貯體以進行進一步分析的資料範例:
Record_0 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "sample log" timestamp "2023-04-14T10:36:01.070Z" Record_1 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "another sample log" timestamp "2023-04-14T10:36:01.071Z"
索引管理
Amazon OpenSearch Ingestion 有許多索引管理功能,包括下列項目。
建立索引
您可以在管道接收器中指定索引名稱,OpenSearch Ingestion 會在佈建管道時建立索引。如果索引已存在,管道會使用它來為傳入事件編製索引。如果您停止並重新啟動管道,或更新其 YAML 組態,管道會嘗試在尚未存在時建立新索引。管道永遠無法刪除索引。
佈建管道時,下列範例接收器會建立兩個索引:
sink: - opensearch: index: apache_logs - opensearch: index: nginx_logs
產生索引名稱和模式
您可以使用傳入事件欄位中的變數來產生動態索引名稱。在接收器組態中,使用 格式string${}
來發出字串插補訊號,並使用 JSON 指標從事件擷取欄位。的選項index_type
為 custom
或 management_disabled
。由於 custom
OpenSearch 網域index_type
預設為 management_disabled
,而 OpenSearch Serverless 集合預設為 ,因此可以保持取消設定。
例如,下列管道會從傳入事件中選取 metadataType
欄位,以產生索引名稱。
pipeline: ... sink: opensearch: index: "metadata-${metadataType}"
下列組態會持續每天或每小時產生新的索引。
pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd}" pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd.HH}"
索引名稱也可以是純字串,並以日期時間模式做為尾碼,例如 my-index-%{yyyy.MM.dd}
。當接收器將資料傳送至 OpenSearch 時,它會將日期時間模式取代為 UTC 時間,並為每天建立新的索引,例如 my-index-2022.01.25
。如需詳細資訊,請參閱 DateTimeFormatter
此索引名稱也可以是格式化字串 (有或沒有日期時間模式尾碼),例如 my-${index}-name
。當接收將資料傳送至 OpenSearch 時,它會將 "${index}"
部分取代為正在處理的事件中的 值。如果格式為 "${index1/index2/index3}"
,則會將 欄位取代index1/index2/index3
為事件中的值。
產生文件 IDs
管道可以在將文件編製索引至 OpenSearch 時產生文件 ID。它可以從傳入事件中的欄位推斷這些文件 IDs。
此範例使用來自傳入事件的 uuid
欄位來產生文件 ID。
pipeline: ... sink: opensearch: index_type: custom index: "metadata-${metadataType}-%{yyyy.MM.dd}" "document_id": "uuid"
在下列範例中,新增項目other_field
傳入事件中的欄位 uuid
和 ,以產生文件 ID。
create
動作可確保具有相同 IDs的文件不會遭到覆寫。管道會捨棄重複的文件,而沒有任何重試或 DLQ 事件。對於使用此動作的管道作者來說,這是合理的期望,因為目標是避免更新現有的文件。
pipeline: ... processor: - add_entries: entries: - key: "my_doc_id_field" format: "${uuid}-${other_field}" sink: - opensearch: ... action: "create" document_id: "my_doc_id"
您可能想要將事件的文件 ID 設定為子物件中的欄位。在下列範例中,OpenSearch 接收器外掛程式會使用 子物件info/id
來產生文件 ID。
sink: - opensearch: ... document_id: info/id
鑑於下列事件,管道將產生 _id
欄位設定為 的文件json001
:
{ "fieldA":"arbitrary value", "info":{ "id":"json001", "fieldA":"xyz", "fieldB":"def" } }
產生路由 IDs
您可以使用 OpenSearch 目的地外掛程式中的 routing_field
選項,將文件路由屬性 (_routing
) 的值設定為來自傳入事件的值。
路由支援 JSON 指標語法,因此巢狀欄位也可用,而不只是最上層欄位。
sink: - opensearch: ... routing_field: metadata/id document_id: id
鑑於下列事件,外掛程式會產生將 _routing
欄位設定為 的文件abcd
:
{ "id":"123", "metadata":{ "id":"abcd", "fieldA":"valueA" }, "fieldB":"valueB" }
如需建立管道可在索引建立期間使用的索引範本的說明,請參閱索引範本
End-to-end確認
OpenSearch Ingestion 使用end-to-end確認,追蹤其在無狀態管道中從來源到目的地的交付,以確保資料的耐用性和可靠性。
注意
目前,只有 S3 來源
透過end-to-end確認,管道來源外掛程式會建立確認集來監控事件批次。當這些事件成功傳送到其目的地時,會收到正面的確認,或當任何事件無法傳送到其目的地時,會收到負面的確認。
如果管道元件發生故障或當機,或來源無法接收確認,則來源會逾時,並採取必要動作,例如重試或記錄故障。如果管道已設定多個接收器或多個子管道,則只有在事件傳送至所有子管道中的所有接收器之後,才會傳送事件層級確認。如果接收器已設定 DLQ,end-to-end確認也會追蹤寫入 DLQ 的事件。
若要啟用end-to-end確認,請展開 Amazon S3 來源組態中的其他選項,然後選擇啟用end-to-end訊息確認。
來源背壓
當管道處理資料忙碌,或其接收器暫時停機或擷取資料的速度變慢時,管道可能會遇到背壓。OpenSearch Ingestion 有不同的處理背壓方式,取決於管道使用的來源外掛程式。
HTTP 來源
使用 HTTP 來源
-
緩衝區 – 當緩衝區已滿時,管道會開始將錯誤碼
REQUEST_TIMEOUT
為 408 的 HTTP 狀態傳回至來源端點。當緩衝區釋放時,管道會再次開始處理 HTTP 事件。 -
來源執行緒 – 當所有 HTTP 來源執行緒都忙於執行請求,且未處理的請求佇列大小已超過允許的最大請求數時,管道會開始將錯誤碼為 429
TOO_MANY_REQUESTS
的 HTTP 狀態傳回來源端點。當請求佇列低於允許的佇列大小上限時,管道會再次開始處理請求。
OTel 來源
當使用 OpenTelemetry 來源 (OTel 日誌REQUEST_TIMEOUT
錯誤碼為 408 的 HTTP 狀態傳回來源端點。當緩衝區釋放時,管道會再次開始處理事件。
S3 來源
當具有 S3
如果目的地關閉或無法擷取資料,且來源已啟用end-to-end確認,管道會停止處理 SQS 通知,直到收到來自所有目的地的成功確認為止。