本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
將資料擷取至 Amazon OpenSearch Serverless 集合
這些章節提供有關支援的擷取管道,以將資料擷取至 Amazon OpenSearch Serverless 集合的詳細資訊。同時還涵蓋了一些您可以用於與 OpenSearch API 操作互動的用戶端。您的用戶端應與 OpenSearch 2.x 相容,以便與 OpenSearch Serverless 整合。
主題
所需的最低許可
若要將資料擷取至 OpenSearch Serverless 集合,寫入資料的主體必須在資料存取政策中指派下列最低許可:
[ { "Rules":[ { "ResourceType":"index", "Resource":[ "index/
target-collection
/logs
" ], "Permission":[ "aoss:CreateIndex", "aoss:WriteDocument", "aoss:UpdateIndex" ] } ], "Principal":[ "arn:aws:iam::123456789012
:user/my-user
" ] } ]
如果您計劃寫入到其他索引,許可的範圍可能會更廣泛。例如,您可以允許對所有索引 (index/target-collection
/*) 或索引子集合 (index/target-collection
/logs*
) 的許可,而不是指定單一目標索引。
如需所有可用 OpenSearch API 操作及其相關許可的參考資料,請參閱 Amazon OpenSearch Serverless 中支援的操作和外掛程式。
OpenSearch 擷取
您可以使用 Amazon OpenSearch Ingestion,而不是使用第三方用戶端直接將資料傳送至 OpenSearch Serverless 集合。 OpenSearch 您可以將資料生產者設定為將資料傳送至 OpenSearch Ingestion,並自動將資料交付至您指定的集合。您也可以設定 OpenSearch Ingestion 在交付資料之前轉換資料。如需詳細資訊,請參閱Amazon OpenSearch 擷取概觀。
OpenSearch Ingestion 管道需要許可,才能寫入設定為接收器的 OpenSearch Serverless 集合。這些許可包括描述集合並向其傳送 HTTP 請求的能力。如需使用 OpenSearch Ingestion 將資料新增至集合的指示,請參閱 授予 Amazon OpenSearch Ingestion 管道對集合的存取權。
若要開始使用 OpenSearch Ingestion,請參閱 教學課程:使用 Amazon OpenSearch Ingestion 將資料擷取至集合。
Fluent Bit
您可以使用 AWS
for Fluent Bit 映像
注意
您必須擁有 AWS 適用於 Fluent Bit 映像的 2.30.0 版或更新版本,才能與 OpenSearch Serverless 整合。
範例組態:
這個組態檔案的範例輸出區段顯示如何使用 OpenSearch Serverless 集合作為目的地。重要的補充是 AWS_Service_Name
參數,也就是 aoss
。Host
是集合端點。
[OUTPUT] Name opensearch Match * Host
collection-endpoint
.us-west-2
.aoss.amazonaws.com Port 443 Indexmy_index
Trace_Error On Trace_Output On AWS_Auth On AWS_Region<region>
AWS_Service_Name aoss tls On Suppress_Type_Name On
Amazon Data Firehose
Firehose 支援 OpenSearch Serverless 作為交付目的地。如需將資料傳送至 OpenSearch Serverless 的指示,請參閱《Amazon Data Firehose 開發人員指南》中的建立 Kinesis Data Firehose 交付串流和為您的目的地選擇 OpenSearch Serverless。
您提供給 Firehose 交付的 IAM 角色必須在具有目標集合aoss:WriteDocument
最低許可的資料存取政策中指定,而且您必須具有預先存在的索引才能傳送資料。如需詳細資訊,請參閱所需的最低許可。
將資料傳送至 OpenSearch Serverless 之前,您可能需要執行資料轉換。如需進一步了解如何使用 Lambda 函數來執行此任務,請參閱相同指南中的 Amazon Kinesis Data Firehose 資料轉換。
Go
下列範本程式碼會使用 Go 的 opensearch-goregion
和 host
的值。
package main import ( "context" "log" "strings" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" opensearch "github.com/opensearch-project/opensearch-go/v2" opensearchapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi" requestsigner "github.com/opensearch-project/opensearch-go/v2/signer/awsv2" ) const endpoint = "" // serverless collection endpoint func main() { ctx := context.Background() awsCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion("<AWS_REGION>"), config.WithCredentialsProvider( getCredentialProvider("<AWS_ACCESS_KEY>", "<AWS_SECRET_ACCESS_KEY>", "<AWS_SESSION_TOKEN>"), ), ) if err != nil { log.Fatal(err) // don't log.fatal in a production-ready app } // create an AWS request Signer and load AWS configuration using default config folder or env vars. signer, err := requestsigner.NewSignerWithService(awsCfg, "aoss") // "aoss" for Amazon OpenSearch Serverless if err != nil { log.Fatal(err) // don't log.fatal in a production-ready app } // create an opensearch client and use the request-signer client, err := opensearch.NewClient(opensearch.Config{ Addresses: []string{endpoint}, Signer: signer, }) if err != nil { log.Fatal("client creation err", err) } indexName := "go-test-index" // define index mapping mapping := strings.NewReader(`{ "settings": { "index": { "number_of_shards": 4 } } }`) // create an index createIndex := opensearchapi.IndicesCreateRequest{ Index: indexName, Body: mapping, } createIndexResponse, err := createIndex.Do(context.Background(), client) if err != nil { log.Println("Error ", err.Error()) log.Println("failed to create index ", err) log.Fatal("create response body read err", err) } log.Println(createIndexResponse) // delete the index deleteIndex := opensearchapi.IndicesDeleteRequest{ Index: []string{indexName}, } deleteIndexResponse, err := deleteIndex.Do(context.Background(), client) if err != nil { log.Println("failed to delete index ", err) log.Fatal("delete index response body read err", err) } log.Println("deleting index", deleteIndexResponse) } func getCredentialProvider(accessKey, secretAccessKey, token string) aws.CredentialsProviderFunc { return func(ctx context.Context) (aws.Credentials, error) { c := &aws.Credentials{ AccessKeyID: accessKey, SecretAccessKey: secretAccessKey, SessionToken: token, } return *c, nil } }
Java
下列範本程式碼會使用 Java 的 opensearch-javaregion
和 host
的值。
與 OpenSearch Service 網域相較,重要的區別在於服務名稱 (是 aoss
而不是 es
)。
// import OpenSearchClient to establish connection to OpenSearch Serverless collection import org.opensearch.client.opensearch.OpenSearchClient; SdkHttpClient httpClient = ApacheHttpClient.builder().build(); // create an opensearch client and use the request-signer OpenSearchClient client = new OpenSearchClient( new AwsSdk2Transport( httpClient, "...us-west-2.aoss.amazonaws.com", // serverless collection endpoint "aoss" // signing service name Region.US_WEST_2, // signing service region AwsSdk2TransportOptions.builder().build() ) ); String index = "sample-index"; // create an index CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder().index(index).build(); CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest); System.out.println("Create index reponse: " + createIndexResponse); // delete the index DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder().index(index).build(); DeleteIndexResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest); System.out.println("Delete index reponse: " + deleteIndexResponse); httpClient.close();
下列範例程式碼會再次建立安全連線,然後搜尋索引。
import org.opensearch.client.opensearch.OpenSearchClient; SdkHttpClient httpClient = ApacheHttpClient.builder().build(); OpenSearchClient client = new OpenSearchClient( new AwsSdk2Transport( httpClient, "...us-west-2.aoss.amazonaws.com", // serverless collection endpoint "aoss" // signing service name Region.US_WEST_2, // signing service region AwsSdk2TransportOptions.builder().build() ) ); Response response = client.generic() .execute( Requests.builder() .endpoint("/" + "users" + "/_search?typed_keys=true") .method("GET") .json("{" + " \"query\": {" + " \"match_all\": {}" + " }" + "}") .build()); httpClient.close();
JavaScript
下列範本程式碼會使用 JavaScript 的 opensearch-jsnode
和 region
的值。
與 OpenSearch Service 網域相較,重要的區別在於服務名稱 (是 aoss
而不是 es
)。
Logstash
您可以使用 Logstash OpenSearch 外掛程式
使用 Logstash 將資料傳送至 OpenSearch Serverless
-
使用 Docker 或 Linux 安裝 logstash-output-opensearch 外掛程式的 2.0.0 版或更新版本。 logstash-output-opensearch
-
若要讓 OpenSearch 輸出外掛程式與 OpenSearch Serverless 搭配使用,您必須對 logstash.conf 的
opensearch
輸出區段進行下列修改:-
將
aoss
指定為auth_type
下的service_name
。 -
針對
hosts
指定您的集合端點。 -
新增參數
default_server_major_version
和legacy_template
。外掛程式需要這些參數才能與 OpenSearch Serverless 搭配使用。
output { opensearch { hosts => "
collection-endpoint
:443" auth_type => { ... service_name => 'aoss' } default_server_major_version => 2 legacy_template => false } }此組態檔案範例會從 S3 儲存貯體中的檔案取得輸入,並將其傳送至 OpenSearch Serverless 集合:
input { s3 { bucket => "
my-s3-bucket
" region => "us-east-1
" } } output { opensearch { ecs_compatibility => disabled hosts => "https://my-collection-endpoint
.us-east-1
.aoss.amazonaws.com:443" index =>my-index
auth_type => { type => 'aws_iam' aws_access_key_id => 'your-access-key
' aws_secret_access_key => 'your-secret-key
' region => 'us-east-1
' service_name => 'aoss' } default_server_major_version => 2 legacy_template => false } } -
-
然後,使用新的組態執行 Logstash 來測試外掛程式:
bin/logstash -f config/
test-plugin
.conf
Python
下列範本程式碼會使用 Python 的 opensearch-pyregion
和 host
的值。
與 OpenSearch Service 網域相較,重要的區別在於服務名稱 (是 aoss
而不是 es
)。
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth import boto3 host = '' # serverless collection endpoint, without https:// region = '' # e.g. us-east-1 service = 'aoss' credentials = boto3.Session().get_credentials() auth = AWSV4SignerAuth(credentials, region, service) # create an opensearch client and use the request-signer client = OpenSearch( hosts=[{'host': host, 'port': 443}], http_auth=auth, use_ssl=True, verify_certs=True, connection_class=RequestsHttpConnection, pool_maxsize=20, ) # create an index index_name = 'books-index' create_response = client.indices.create( index_name ) print('\nCreating index:') print(create_response) # index a document document = { 'title': 'The Green Mile', 'director': 'Stephen King', 'year': '1996' } response = client.index( index = 'books-index', body = document, id = '1' ) # delete the index delete_response = client.indices.delete( index_name ) print('\nDeleting index:') print(delete_response)
Ruby
opensearch-aws-sigv4
Gem 套件提供了立即使用的 OpenSearch Serverless 的存取,以及 OpenSearch 服務。它具有 opensearch-ruby
執行個體化 Sigv4 簽署者時,請指定 aoss
為服務名稱:
require 'opensearch-aws-sigv4' require 'aws-sigv4' signer = Aws::Sigv4::Signer.new(service: 'aoss', region: 'us-west-2', access_key_id: 'key_id', secret_access_key: 'secret') # create an opensearch client and use the request-signer client = OpenSearch::Aws::Sigv4Client.new( { host: 'https://your.amz-opensearch-serverless.endpoint', log: true }, signer) # create an index index = 'prime' client.indices.create(index: index) # insert data client.index(index: index, id: '1', body: { name: 'Amazon Echo', msrp: '5999', year: 2011 }) # query the index client.search(body: { query: { match: { name: 'Echo' } } }) # delete index entry client.delete(index: index, id: '1') # delete the index client.indices.delete(index: index)
使用其他用戶端簽署 HTTP 請求
當您使用其他用戶端建構 HTTP 請求,並簽署對 OpenSearch Serverless 集合的請求時,下列要求適用。
-
您必須將服務名稱指定為
aoss
。 -
所有 AWS Signature 版本 4 請求均需要
x-amz-content-sha256
標頭。其提供請求承載的雜湊。如果有請求承載,請將值設定為其安全雜湊演算法 (SHA) 加密雜湊 (SHA256)。如果沒有請求承載,請將值設定為e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
,這是一個空字串的雜湊。
使用 cURL 編製索引
下列範例請求使用用戶端 URL 請求程式庫 (cURL),將單一文件傳送至集合movies-index
中名為 的索引:
curl -XPOST \ --user "$AWS_ACCESS_KEY_ID":"$AWS_SECRET_ACCESS_KEY" \ --aws-sigv4 "aws:amz:
us-east-1
:aoss" \ --header "x-amz-content-sha256: $REQUEST_PAYLOAD_SHA_HASH" \ --header "x-amz-security-token: $AWS_SESSION_TOKEN" \ "https://my-collection-endpoint
.us-east-1
.aoss.amazonaws.com/movies-index
/_doc" \ -H "Content-Type: application/json" -d '{"title": "Shawshank Redemption"}'
使用 Postman 編製索引
下圖顯示如何使用 Postman 將請求傳送至集合。如需驗證的指示,請參閱 Postman 中的使用 AWS 簽章驗證工作流程進行驗證
