

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# Amazon OpenSearch Ingestion 中的管道功能概觀
<a name="osis-features-overview"></a>

Amazon OpenSearch Ingestion 佈建*管道*，其中包含來源、緩衝區、零或多個處理器，以及一或多個接收器。擷取管道採用 Data Prepper 做為資料引擎。如需管道各種元件的概觀，請參閱 [Amazon OpenSearch Ingestion 中的重要概念](ingestion-process.md)。

下列各節提供 Amazon OpenSearch Ingestion 中一些最常用功能的概觀。

**注意**  
這不是可供管道使用的完整功能清單。如需所有可用管道功能的完整文件，請參閱 [Data Prepper 文件](https://opensearch.org/docs/latest/data-prepper/pipelines/pipelines/)。請注意，OpenSearch Ingestion 會限制您可以使用的外掛程式和選項。如需詳細資訊，請參閱[Amazon OpenSearch Ingestion 管道支援的外掛程式和選項](pipeline-config-reference.md)。

**Topics**
+ [持久性緩衝](#persistent-buffering)
+ [分割](#osis-features-splitting)
+ [鏈接](#osis-features-chaining)
+ [無效信件佇列](#osis-features-dlq)
+ [索引管理](#osis-features-index-management)
+ [End-to-end認可](#osis-features-e2e)
+ [來源背壓](#osis-features-backpressure)

## 持久性緩衝
<a name="persistent-buffering"></a>

持久性緩衝會將您的資料儲存在跨多個可用區域的磁碟型緩衝區中，以增強資料耐久性。您可以使用持久性緩衝，從所有支援的推送型來源擷取資料，而無需設定獨立的緩衝。這些來源包括日誌、追蹤和指標的 HTTP 和 OpenTelemetry。若要啟用持久性緩衝，請在建立或更新管道時選擇**啟用持久性緩衝**。如需詳細資訊，請參閱[建立 Amazon OpenSearch Ingestion 管道](creating-pipeline.md)。

OpenSearch Ingestion 會動態決定用於持久性緩衝、考量資料來源、串流轉換和目的地的 OCUs 數量。由於它將一些 OCUs 配置為緩衝，您可能需要增加最小和最大 OCUs以維持相同的擷取輸送量。管道會在緩衝區中保留資料長達 72 小時。

如果您為管道啟用持久性緩衝，預設請求承載大小上限如下：
+ **HTTP 來源** – 10 MB
+ **OpenTelemetry 來源** – 4 MB

對於 HTTP 來源，您可以將承載大小上限提高到 20 MB。請求承載大小包含整個 HTTP 請求，通常包含多個事件。每個事件不能超過 3.5 MB。

具有持久性緩衝的管道會在運算和緩衝單位之間分割設定的管道單位。如果管道使用 grok、key-value 或 split string 等 CPU 密集型處理器，則會以 1：1 buffer-to-compute比率配置單位。否則，它會以 3：1 的比例配置它們，一律偏好運算單位。

例如：
+ 具有 grok 和最多 2 個單位的管道 – 1 個運算單位和 1 個緩衝單位
+ 具有 grok 和最多 5 個單位的管道 – 3 個運算單位和 2 個緩衝單位
+ 沒有處理器和最多 2 個單位的管道 – 1 個運算單位和 1 個緩衝單位
+ 沒有處理器和最多 4 個單位的管道 – 1 個運算單位和 3 個緩衝單位
+ 具有 grok 和最多 5 個單位的管道 – 2 個運算單位和 3 個緩衝單位

根據預設，管道會使用 AWS 擁有的金鑰 來加密緩衝區資料。這些管道不需要管道角色的任何其他許可。

或者，您可以指定客戶受管金鑰，並將下列 IAM 許可新增至管道角色：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KeyAccess",
            "Effect": "Allow",
            "Action": [
              "kms:Decrypt",
              "kms:GenerateDataKeyWithoutPlaintext"
            ],
            "Resource": "arn:aws:kms:us-east-1:111122223333:key/ASIAIOSFODNN7EXAMPLE"
        }
    ]
}
```

------

如需更多資訊，請參閱 *AWS Key Management Service 開發人員指南*中的[客戶受管金鑰](https://docs.aws.amazon.com/kms/latest/developerguide/concepts.html#customer-cmk)。

**注意**  
如果您停用持久性緩衝，您的管道會開始完全在記憶體內緩衝上執行。

## 分割
<a name="osis-features-splitting"></a>

您可以設定 OpenSearch Ingestion 管道，將傳入的事件*分割*為子管道，讓您在相同的傳入事件上執行不同類型的處理。

下列範例管道會將傳入的事件分割成兩個子管道。每個子管道都會使用自己的處理器來充實和操作資料，然後將資料傳送至不同的 OpenSearch 索引。

```
version: "2"
log-pipeline:
  source:
    http:
    ...
  sink:
    - pipeline:
        name: "logs_enriched_one_pipeline"
    - pipeline:
        name: "logs_enriched_two_pipeline"

logs_enriched_one_pipeline:
  source:
    pipeline:
      name: "log-pipeline"
  processor:
   ...
  sink:
    - opensearch:
        # Provide a domain or collection endpoint
        # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection
        aws:
          ...
        index: "enriched_one_logs"

logs_enriched_two_pipeline:
  source:
    pipeline:
      name: "log-pipeline"
  processor:
   ...
  sink:
    - opensearch:
        # Provide a domain or collection endpoint
        # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection
        aws:
          ...
          index: "enriched_two_logs"
```

## 鏈接
<a name="osis-features-chaining"></a>

您可以將多個子管道*鏈結*在一起，以便在區塊中執行資料處理和擴充。換言之，您可以在一個子管道中使用特定處理功能來豐富傳入事件，然後將其傳送至另一個子管道，以使用不同的處理器進行額外擴充，最後將其傳送至其 OpenSearch 接收器。

在下列範例中，`log_pipeline`子管道會使用一組處理器來充實傳入日誌事件，然後將事件傳送至名為 的 OpenSearch 索引`enriched_logs`。管道會將相同的事件傳送至`log_advanced_pipeline`子管道，以處理該事件並將其傳送至名為 的不同 OpenSearch 索引`enriched_advanced_logs`。

```
version: "2"
log-pipeline:
  source:
    http:
    ...
  processor:
    ...
  sink:
    - opensearch:
        # Provide a domain or collection endpoint
        # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection
        aws:
          ...
          index: "enriched_logs"
    - pipeline:
        name: "log_advanced_pipeline"

log_advanced_pipeline:
  source:
    pipeline:
      name: "log-pipeline"
  processor:
   ...
  sink:
    - opensearch:
        # Provide a domain or collection endpoint
        # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection
        aws:
          ...
          index: "enriched_advanced_logs"
```

## 無效信件佇列
<a name="osis-features-dlq"></a>

無效字母佇列 (DLQs) 是管道無法寫入目的地的事件目的地。在 OpenSearch Ingestion 中，您必須指定具有適當寫入許可的 Amazon S3 儲存貯體，以用作 DLQ。您可以將 DLQ 組態新增至管道中的每個接收器。當管道遇到寫入錯誤時，它會在設定的 S3 儲存貯體中建立 DLQ 物件。DLQ 物件存在於 JSON 檔案中，做為失敗事件的陣列。

當符合下列任一條件時，管道會將事件寫入 DLQ：
+ OpenSearch 接收器的**重試次數上限**已用盡。此設定至少需要 16 個 OpenSearch Ingestion。
+ 目的地因為錯誤條件而拒絕事件。

### Configuration
<a name="osis-features-dlq-config"></a>

若要設定子管道的無效字母佇列，請在設定目的地時選擇**啟用 S3 DLQ**。然後，指定佇列的必要設定。如需詳細資訊，請參閱 Data Prepper DLQ 文件中的[組態](https://opensearch.org/docs/latest/data-prepper/pipelines/dlq/#configuration)。

寫入此 S3 DLQ 的檔案具有下列命名模式：

```
dlq-v${version}-${pipelineName}-${pluginId}-${timestampIso8601}-${uniqueId}
```

如需手動設定管道角色以允許存取 DLQ 寫入的 S3 儲存貯體的說明，請參閱 [寫入 Amazon S3 或無效字母佇列的許可](pipeline-security-overview.md#pipeline-security-dlq)。

### 範例
<a name="osis-features-dlq-example"></a>

請考慮下列範例 DLQ 檔案：

```
dlq-v2-apache-log-pipeline-opensearch-2023-04-05T15:26:19.152938Z-e7eb675a-f558-4048-8566-dac15a4f8343
```

以下是無法寫入目的地，並傳送至 DLQ S3 儲存貯體以進行進一步分析的資料範例：

```
Record_0	
pluginId            "opensearch"
pluginName          "opensearch"
pipelineName        "apache-log-pipeline"
failedData	
index		  "logs"
indexId		 null
status		  0
message		"Number of retries reached the limit of max retries (configured value 15)"
document	
log		    "sample log"
timestamp	    "2023-04-14T10:36:01.070Z"

Record_1	
pluginId            "opensearch"
pluginName          "opensearch"
pipelineName        "apache-log-pipeline"
failedData	
index               "logs"
indexId		 null
status		  0
message		"Number of retries reached the limit of max retries (configured value 15)"
document	
log                 "another sample log"
timestamp           "2023-04-14T10:36:01.071Z"
```

## 索引管理
<a name="osis-features-index-management"></a>

Amazon OpenSearch Ingestion 有許多索引管理功能，包括下列項目。

### 建立索引
<a name="osis-features-index-management-create"></a>

您可以在管道接收器中指定索引名稱，OpenSearch Ingestion 會在佈建管道時建立索引。如果索引已存在，管道會使用它來為傳入事件編製索引。如果您停止並重新啟動管道，或更新其 YAML 組態，管道會嘗試在尚未存在時建立新索引。管道永遠無法刪除索引。

佈建管道時，下列範例接收器會建立兩個索引：

```
sink:
  - opensearch:
      index: apache_logs
  - opensearch:
      index: nginx_logs
```

### 產生索引名稱和模式
<a name="osis-features-index-management-patterns"></a>

您可以使用傳入事件欄位中的變數來產生動態索引名稱。在接收器組態中，使用 格式`string${}`來發出字串插補訊號，並使用 JSON 指標從事件擷取欄位。的選項`index_type`為 `custom`或 `management_disabled`。由於 `custom` OpenSearch 網域`index_type`預設為 `management_disabled` ，OpenSearch Serverless 集合預設為 ，因此可以保持取消設定。

例如，下列管道會從傳入事件中選取 `metadataType` 欄位，以產生索引名稱。

```
pipeline:
  ...
  sink:
    opensearch:
      index: "metadata-${metadataType}"
```

下列組態會持續每天或每小時產生新的索引。

```
pipeline:
  ...
  sink:
    opensearch:
      index: "metadata-${metadataType}-%{yyyy.MM.dd}"

pipeline:
  ...
  sink:
    opensearch:
      index: "metadata-${metadataType}-%{yyyy.MM.dd.HH}"
```

索引名稱也可以是純字串，並以日期時間模式做為尾碼，例如 `my-index-%{yyyy.MM.dd}`。當接收器將資料傳送至 OpenSearch 時，它會將日期時間模式取代為 UTC 時間，並為每天建立新的索引，例如 `my-index-2022.01.25`。如需詳細資訊，請參閱 [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html) 類別。

此索引名稱也可以是格式化字串 （有或沒有日期時間模式尾碼），例如 `my-${index}-name`。當接收將資料傳送至 OpenSearch 時，它會將`"${index}"`部分取代為正在處理的事件中的 值。如果格式為 `"${index1/index2/index3}"`，則會將 欄位取代`index1/index2/index3`為事件中的值。

### 產生文件 IDs
<a name="osis-features-index-management-ids"></a>

將文件編製索引至 OpenSearch 時，管道可以產生文件 ID。它可以從傳入事件中的欄位推斷這些文件 IDs。

此範例使用來自傳入事件的 `uuid` 欄位來產生文件 ID。

```
pipeline:
  ...
  sink:
    opensearch:
      index_type: custom
      index: "metadata-${metadataType}-%{yyyy.MM.dd}" 
      "document_id": "uuid"
```

在下列範例中，[新增項目](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/processors/add-entries/)處理器會合併`other_field`傳入事件中的欄位 `uuid`和 ，以產生文件 ID。

`create` 動作可確保具有相同 IDs的文件不會遭到覆寫。管道會捨棄重複的文件，而沒有任何重試或 DLQ 事件。對於使用此動作的管道作者來說，這是合理的預期，因為目標是避免更新現有的文件。

```
pipeline:
  ...
  processor:
   - add_entries:
      entries:
        - key: "my_doc_id_field"
          format: "${uuid}-${other_field}"
  sink:
    - opensearch:
       ...
       action: "create"
       document_id: "my_doc_id"
```

您可能想要將事件的文件 ID 設定為子物件中的欄位。在下列範例中，OpenSearch 接收器外掛程式會使用 子物件`info/id`來產生文件 ID。

```
sink:
  - opensearch:
       ...
       document_id: info/id
```

鑑於下列事件，管道將產生 `_id` 欄位設定為 的文件`json001`：

```
{
   "fieldA":"arbitrary value",
   "info":{
      "id":"json001",
      "fieldA":"xyz",
      "fieldB":"def"
   }
}
```

### 產生路由 IDs
<a name="osis-features-index-management-routing-ids"></a>

您可以使用 OpenSearch 接收器外掛程式中的 `routing_field`選項，將文件路由屬性 (`_routing`) 的值設定為來自傳入事件的值。

路由支援 JSON 指標語法，因此也提供巢狀欄位，而不只是最上層欄位。

```
sink:
  - opensearch:
       ...
       routing_field: metadata/id
       document_id: id
```

鑑於下列事件，外掛程式會產生將 `_routing` 欄位設定為 的文件`abcd`：

```
{
   "id":"123",
   "metadata":{
      "id":"abcd",
      "fieldA":"valueA"
   },
   "fieldB":"valueB"
}
```

如需建立管道可在索引建立期間使用的索引範本的說明，請參閱[索引範本](https://opensearch.org/docs/latest/im-plugin/index-templates/)。

## End-to-end認可
<a name="osis-features-e2e"></a>

OpenSearch Ingestion 使用*end-to-end確認*，追蹤其從來源到無狀態管道中接收器的交付，以確保資料的耐用性和可靠性。

**注意**  
目前，只有 [S3 來源](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/)外掛程式支援end-to-end確認。

透過end-to-end確認，管道來源外掛程式會建立*確認集*來監控一批事件。當這些事件成功傳送到其目的地時，會收到正面的確認，或當任何事件無法傳送到其目的地時，會收到負面的確認。

如果管道元件發生故障或當機，或來源無法接收確認，則來源會逾時，並採取必要動作，例如重試或記錄故障。如果管道已設定多個接收器或多個子管道，則只有在事件傳送至*所有*子管道*中的所有*接收器之後，才會傳送事件層級確認。如果接收器已設定 DLQ，end-to-end確認也會追蹤寫入 DLQ 的事件。

若要啟用end-to-end確認，請展開 Amazon S3 來源組態中的**其他選項**，然後選擇**啟用end-to-end訊息確認**。

## 來源背壓
<a name="osis-features-backpressure"></a>

當管道處理資料忙碌，或其接收器暫時停機或擷取資料的速度變慢時，管道可能會遇到背壓。OpenSearch Ingestion 有不同的處理背壓方式，取決於管道使用的來源外掛程式。

### HTTP 來源
<a name="osis-features-backpressure-http"></a>

使用 [HTTP 來源](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/http-source/)外掛程式的管道會根據擁塞的管道元件，以不同的方式處理背壓：
+ **緩衝區** – 當緩衝區已滿時，管道會開始將錯誤碼`REQUEST_TIMEOUT`為 408 的 HTTP 狀態傳回至來源端點。當緩衝區釋放時，管道會再次開始處理 HTTP 事件。
+ **來源執行緒** – 當所有 HTTP 來源執行緒都忙於執行請求，且未處理的請求佇列大小已超過允許的最大請求數時，管道會開始將錯誤碼為 429 `TOO_MANY_REQUESTS`的 HTTP 狀態傳回來源端點。當請求佇列低於允許的佇列大小上限時，管道會再次開始處理請求。

### OTel 來源
<a name="osis-features-backpressure-otel"></a>

當使用 OpenTelemetry 來源 ([OTel 日誌](https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otel-logs-source)、[OTel 指標](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/otel-metrics-source/)和 [OTel 追蹤](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/otel-trace/)) 的管道緩衝區已滿時，管道會開始將`REQUEST_TIMEOUT`錯誤碼為 408 的 HTTP 狀態傳回來源端點。當緩衝區釋放時，管道會再次開始處理事件。

### S3 來源
<a name="osis-features-backpressure-s3"></a>

當具有 [S3](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/) 來源的管道緩衝區已滿時，管道會停止處理 SQS 通知。當緩衝區釋放時，管道會再次開始處理通知。

如果目的地關閉或無法擷取資料，且來源已啟用end-to-end確認，管道會停止處理 SQS 通知，直到收到來自所有目的地的成功確認為止。