將資料擷取至 Amazon OpenSearch Serverless 集合 - Amazon OpenSearch Service

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

將資料擷取至 Amazon OpenSearch Serverless 集合

這些章節提供有關支援的擷取管道,以將資料擷取至 Amazon OpenSearch Serverless 集合的詳細資訊。同時還涵蓋了一些您可以用於與 OpenSearch API 操作互動的用戶端。您的用戶端應與 OpenSearch 2.x 相容,以便與 OpenSearch Serverless 整合。

所需的最低許可

若要將資料擷取至 OpenSearch Serverless 集合,寫入資料的主體必須在資料存取政策中指派下列最低許可:

[ { "Rules":[ { "ResourceType":"index", "Resource":[ "index/target-collection/logs" ], "Permission":[ "aoss:CreateIndex", "aoss:WriteDocument", "aoss:UpdateIndex" ] } ], "Principal":[ "arn:aws:iam::123456789012:user/my-user" ] } ]

如果您計劃寫入到其他索引,許可的範圍可能會更廣泛。例如,您可以允許對所有索引 (index/target-collection/*) 或索引子集合 (index/target-collection/logs*) 的許可,而不是指定單一目標索引。

如需所有可用 OpenSearch API 操作及其相關許可的參考資料,請參閱 Amazon OpenSearch Serverless 中支援的操作和外掛程式

OpenSearch 擷取

您可以使用 Amazon OpenSearch Ingestion,而不是使用第三方用戶端直接將資料傳送至 OpenSearch Serverless 集合。 OpenSearch 您可以將資料生產者設定為將資料傳送至 OpenSearch Ingestion,並自動將資料交付至您指定的集合。您也可以設定 OpenSearch Ingestion 在交付資料之前轉換資料。如需詳細資訊,請參閱Amazon OpenSearch 擷取概觀

OpenSearch Ingestion 管道需要許可,才能寫入設定為接收器的 OpenSearch Serverless 集合。這些許可包括描述集合並向其傳送 HTTP 請求的能力。如需使用 OpenSearch Ingestion 將資料新增至集合的指示,請參閱 授予 Amazon OpenSearch Ingestion 管道對集合的存取權

若要開始使用 OpenSearch Ingestion,請參閱 教學課程:使用 Amazon OpenSearch Ingestion 將資料擷取至集合

Fluent Bit

您可以使用 AWS for Fluent Bit 映像OpenSearch 輸出外掛程式,將資料擷取至 OpenSearch Serverless 集合。

注意

您必須擁有 AWS 適用於 Fluent Bit 映像的 2.30.0 版或更新版本,才能與 OpenSearch Serverless 整合。

範例組態

這個組態檔案的範例輸出區段顯示如何使用 OpenSearch Serverless 集合作為目的地。重要的補充是 AWS_Service_Name 參數,也就是 aossHost 是集合端點。

[OUTPUT] Name opensearch Match * Host collection-endpoint.us-west-2.aoss.amazonaws.com Port 443 Index my_index Trace_Error On Trace_Output On AWS_Auth On AWS_Region <region> AWS_Service_Name aoss tls On Suppress_Type_Name On

Amazon Data Firehose

Firehose 支援 OpenSearch Serverless 作為交付目的地。如需將資料傳送至 OpenSearch Serverless 的指示,請參閱《Amazon Data Firehose 開發人員指南》中的建立 Kinesis Data Firehose 交付串流為您的目的地選擇 OpenSearch Serverless

您提供給 Firehose 交付的 IAM 角色必須在具有目標集合aoss:WriteDocument最低許可的資料存取政策中指定,而且您必須具有預先存在的索引才能傳送資料。如需詳細資訊,請參閱所需的最低許可

將資料傳送至 OpenSearch Serverless 之前,您可能需要執行資料轉換。如需進一步了解如何使用 Lambda 函數來執行此任務,請參閱相同指南中的 Amazon Kinesis Data Firehose 資料轉換

Go

下列範本程式碼會使用 Go 的 opensearch-go 用戶端,建立與指定 OpenSearch Serverless 集合的安全連線,並建立單一索引。您必須提供 regionhost 的值。

package main import ( "context" "log" "strings" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" opensearch "github.com/opensearch-project/opensearch-go/v2" opensearchapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi" requestsigner "github.com/opensearch-project/opensearch-go/v2/signer/awsv2" ) const endpoint = "" // serverless collection endpoint func main() { ctx := context.Background() awsCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion("<AWS_REGION>"), config.WithCredentialsProvider( getCredentialProvider("<AWS_ACCESS_KEY>", "<AWS_SECRET_ACCESS_KEY>", "<AWS_SESSION_TOKEN>"), ), ) if err != nil { log.Fatal(err) // don't log.fatal in a production-ready app } // create an AWS request Signer and load AWS configuration using default config folder or env vars. signer, err := requestsigner.NewSignerWithService(awsCfg, "aoss") // "aoss" for Amazon OpenSearch Serverless if err != nil { log.Fatal(err) // don't log.fatal in a production-ready app } // create an opensearch client and use the request-signer client, err := opensearch.NewClient(opensearch.Config{ Addresses: []string{endpoint}, Signer: signer, }) if err != nil { log.Fatal("client creation err", err) } indexName := "go-test-index" // define index mapping mapping := strings.NewReader(`{ "settings": { "index": { "number_of_shards": 4 } } }`) // create an index createIndex := opensearchapi.IndicesCreateRequest{ Index: indexName, Body: mapping, } createIndexResponse, err := createIndex.Do(context.Background(), client) if err != nil { log.Println("Error ", err.Error()) log.Println("failed to create index ", err) log.Fatal("create response body read err", err) } log.Println(createIndexResponse) // delete the index deleteIndex := opensearchapi.IndicesDeleteRequest{ Index: []string{indexName}, } deleteIndexResponse, err := deleteIndex.Do(context.Background(), client) if err != nil { log.Println("failed to delete index ", err) log.Fatal("delete index response body read err", err) } log.Println("deleting index", deleteIndexResponse) } func getCredentialProvider(accessKey, secretAccessKey, token string) aws.CredentialsProviderFunc { return func(ctx context.Context) (aws.Credentials, error) { c := &aws.Credentials{ AccessKeyID: accessKey, SecretAccessKey: secretAccessKey, SessionToken: token, } return *c, nil } }

Java

下列範本程式碼會使用 Java 的 opensearch-java 用戶端,建立與指定 OpenSearch Serverless 集合的安全連線,並建立單一索引。您必須提供 regionhost 的值。

與 OpenSearch Service 網域相較,重要的區別在於服務名稱 (是 aoss 而不是 es)。

// import OpenSearchClient to establish connection to OpenSearch Serverless collection import org.opensearch.client.opensearch.OpenSearchClient; SdkHttpClient httpClient = ApacheHttpClient.builder().build(); // create an opensearch client and use the request-signer OpenSearchClient client = new OpenSearchClient( new AwsSdk2Transport( httpClient, "...us-west-2.aoss.amazonaws.com", // serverless collection endpoint "aoss" // signing service name Region.US_WEST_2, // signing service region AwsSdk2TransportOptions.builder().build() ) ); String index = "sample-index"; // create an index CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder().index(index).build(); CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest); System.out.println("Create index reponse: " + createIndexResponse); // delete the index DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder().index(index).build(); DeleteIndexResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest); System.out.println("Delete index reponse: " + deleteIndexResponse); httpClient.close();

下列範例程式碼會再次建立安全連線,然後搜尋索引。

import org.opensearch.client.opensearch.OpenSearchClient; SdkHttpClient httpClient = ApacheHttpClient.builder().build(); OpenSearchClient client = new OpenSearchClient( new AwsSdk2Transport( httpClient, "...us-west-2.aoss.amazonaws.com", // serverless collection endpoint "aoss" // signing service name Region.US_WEST_2, // signing service region AwsSdk2TransportOptions.builder().build() ) ); Response response = client.generic() .execute( Requests.builder() .endpoint("/" + "users" + "/_search?typed_keys=true") .method("GET") .json("{" + " \"query\": {" + " \"match_all\": {}" + " }" + "}") .build()); httpClient.close();

JavaScript

下列範本程式碼會使用 JavaScript 的 opensearch-js 用戶端,建立與指定 OpenSearch Serverless 集合的安全連線,建立單一索引,新增文件,以及刪除索引。您必須提供 noderegion 的值。

與 OpenSearch Service 網域相較,重要的區別在於服務名稱 (是 aoss 而不是 es)。

Version 3

此範例使用適用於 JavaScript in Node.js 的 SDK 的版本 3

const { defaultProvider } = require('@aws-sdk/credential-provider-node'); const { Client } = require('@opensearch-project/opensearch'); const { AwsSigv4Signer } = require('@opensearch-project/opensearch/aws'); async function main() { // create an opensearch client and use the request-signer const client = new Client({ ...AwsSigv4Signer({ region: 'us-west-2', service: 'aoss', getCredentials: () => { const credentialsProvider = defaultProvider(); return credentialsProvider(); }, }), node: '' # // serverless collection endpoint }); const index = 'movies'; // create index if it doesn't already exist if (!(await client.indices.exists({ index })).body) { console.log((await client.indices.create({ index })).body); } // add a document to the index const document = { foo: 'bar' }; const response = await client.index({ id: '1', index: index, body: document, }); console.log(response.body); // delete the index console.log((await client.indices.delete({ index })).body); } main();
Version 2

此範例使用適用於 JavaScript in Node.js 的 SDK 的版本 2

const AWS = require('aws-sdk'); const { Client } = require('@opensearch-project/opensearch'); const { AwsSigv4Signer } = require('@opensearch-project/opensearch/aws'); async function main() { // create an opensearch client and use the request-signer const client = new Client({ ...AwsSigv4Signer({ region: 'us-west-2', service: 'aoss', getCredentials: () => new Promise((resolve, reject) => { AWS.config.getCredentials((err, credentials) => { if (err) { reject(err); } else { resolve(credentials); } }); }), }), node: '' # // serverless collection endpoint }); const index = 'movies'; // create index if it doesn't already exist if (!(await client.indices.exists({ index })).body) { console.log((await client.indices.create({ index })).body); } // add a document to the index const document = { foo: 'bar' }; const response = await client.index({ id: '1', index: index, body: document, }); console.log(response.body); // delete the index console.log((await client.indices.delete({ index })).body); } main();

Logstash

您可以使用 Logstash OpenSearch 外掛程式,將日誌發佈至 OpenSearch Serverless 集合。

使用 Logstash 將資料傳送至 OpenSearch Serverless
  1. 使用 Docker 或 Linux 安裝 logstash-output-opensearch 外掛程式的 2.0.0 版或更新版本logstash-output-opensearch

    Docker

    Docker 託管預先安裝了 OpenSearch 輸出外掛程式的 Logstash OSS 軟體:opensearchproject/logstash-oss-with-opensearch-output-plugin。您可以像任何其他映像一樣提取映像:

    docker pull opensearchproject/logstash-oss-with-opensearch-output-plugin:latest
    Linux

    首先,如果您尚未安裝最新版本的 Logstash,請先安裝。然後,安裝輸出外掛程式的 2.0.0 版本:

    cd logstash-8.5.0/ bin/logstash-plugin install --version 2.0.0 logstash-output-opensearch

    如果外掛程式已安裝,請將其更新至最新版本:

    bin/logstash-plugin update logstash-output-opensearch

    從 2.0.0 版的外掛程式開始, AWS 軟體開發套件使用 第 3 版。如果您使用的是早於 8.4.0 的 Logstash 版本,則必須移除任何預先安裝的 AWS 外掛程式,並安裝logstash-integration-aws外掛程式:

    /usr/share/logstash/bin/logstash-plugin remove logstash-input-s3 /usr/share/logstash/bin/logstash-plugin remove logstash-input-sqs /usr/share/logstash/bin/logstash-plugin remove logstash-output-s3 /usr/share/logstash/bin/logstash-plugin remove logstash-output-sns /usr/share/logstash/bin/logstash-plugin remove logstash-output-sqs /usr/share/logstash/bin/logstash-plugin remove logstash-output-cloudwatch /usr/share/logstash/bin/logstash-plugin install --version 0.1.0.pre logstash-integration-aws
  2. 若要讓 OpenSearch 輸出外掛程式與 OpenSearch Serverless 搭配使用,您必須對 logstash.conf 的 opensearch 輸出區段進行下列修改:

    • aoss 指定為 auth_type 下的 service_name

    • 針對 hosts 指定您的集合端點。

    • 新增參數 default_server_major_versionlegacy_template。外掛程式需要這些參數才能與 OpenSearch Serverless 搭配使用。

    output { opensearch { hosts => "collection-endpoint:443" auth_type => { ... service_name => 'aoss' } default_server_major_version => 2 legacy_template => false } }

    此組態檔案範例會從 S3 儲存貯體中的檔案取得輸入,並將其傳送至 OpenSearch Serverless 集合:

    input { s3 { bucket => "my-s3-bucket" region => "us-east-1" } } output { opensearch { ecs_compatibility => disabled hosts => "https://my-collection-endpoint.us-east-1.aoss.amazonaws.com:443" index => my-index auth_type => { type => 'aws_iam' aws_access_key_id => 'your-access-key' aws_secret_access_key => 'your-secret-key' region => 'us-east-1' service_name => 'aoss' } default_server_major_version => 2 legacy_template => false } }
  3. 然後,使用新的組態執行 Logstash 來測試外掛程式:

    bin/logstash -f config/test-plugin.conf

Python

下列範本程式碼會使用 Python 的 opensearch-py 用戶端,建立與指定 OpenSearch Serverless 集合的安全連線,建立單一索引並搜尋該索引。您必須提供 regionhost 的值。

與 OpenSearch Service 網域相較,重要的區別在於服務名稱 (是 aoss 而不是 es)。

from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth import boto3 host = '' # serverless collection endpoint, without https:// region = '' # e.g. us-east-1 service = 'aoss' credentials = boto3.Session().get_credentials() auth = AWSV4SignerAuth(credentials, region, service) # create an opensearch client and use the request-signer client = OpenSearch( hosts=[{'host': host, 'port': 443}], http_auth=auth, use_ssl=True, verify_certs=True, connection_class=RequestsHttpConnection, pool_maxsize=20, ) # create an index index_name = 'books-index' create_response = client.indices.create( index_name ) print('\nCreating index:') print(create_response) # index a document document = { 'title': 'The Green Mile', 'director': 'Stephen King', 'year': '1996' } response = client.index( index = 'books-index', body = document, id = '1' ) # delete the index delete_response = client.indices.delete( index_name ) print('\nDeleting index:') print(delete_response)

Ruby

opensearch-aws-sigv4 Gem 套件提供了立即使用的 OpenSearch Serverless 的存取,以及 OpenSearch 服務。它具有 opensearch-ruby 客戶端的所有功能,因為這是此 Gem 套件的相依項目。

執行個體化 Sigv4 簽署者時,請指定 aoss 為服務名稱:

require 'opensearch-aws-sigv4' require 'aws-sigv4' signer = Aws::Sigv4::Signer.new(service: 'aoss', region: 'us-west-2', access_key_id: 'key_id', secret_access_key: 'secret') # create an opensearch client and use the request-signer client = OpenSearch::Aws::Sigv4Client.new( { host: 'https://your.amz-opensearch-serverless.endpoint', log: true }, signer) # create an index index = 'prime' client.indices.create(index: index) # insert data client.index(index: index, id: '1', body: { name: 'Amazon Echo', msrp: '5999', year: 2011 }) # query the index client.search(body: { query: { match: { name: 'Echo' } } }) # delete index entry client.delete(index: index, id: '1') # delete the index client.indices.delete(index: index)

使用其他用戶端簽署 HTTP 請求

當您使用其他用戶端建構 HTTP 請求,並簽署對 OpenSearch Serverless 集合的請求時,下列要求適用。

  • 您必須將服務名稱指定為 aoss

  • 所有 AWS Signature 版本 4 請求均需要 x-amz-content-sha256 標頭。其提供請求承載的雜湊。如果有請求承載,請將值設定為其安全雜湊演算法 (SHA) 加密雜湊 (SHA256)。如果沒有請求承載,請將值設定為 e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855,這是一個空字串的雜湊。

使用 cURL 編製索引

下列範例請求使用用戶端 URL 請求程式庫 (cURL),將單一文件傳送至集合movies-index中名為 的索引:

curl -XPOST \ --user "$AWS_ACCESS_KEY_ID":"$AWS_SECRET_ACCESS_KEY" \ --aws-sigv4 "aws:amz:us-east-1:aoss" \ --header "x-amz-content-sha256: $REQUEST_PAYLOAD_SHA_HASH" \ --header "x-amz-security-token: $AWS_SESSION_TOKEN" \ "https://my-collection-endpoint.us-east-1.aoss.amazonaws.com/movies-index/_doc" \ -H "Content-Type: application/json" -d '{"title": "Shawshank Redemption"}'

使用 Postman 編製索引

下圖顯示如何使用 Postman 將請求傳送至集合。如需驗證的指示,請參閱 Postman 中的使用 AWS 簽章驗證工作流程進行驗證

JSON response showing creation of a "movies-index" with successful result and no shards.