使用与 Amazon OpenSearch Ingestion 互动 AWS SDKs - 亚马逊 OpenSearch 服务

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用与 Amazon OpenSearch Ingestion 互动 AWS SDKs

本节包含一个示例,说明如何使用与 Amazon OpenSearch Ingestion 进行交互。 AWS SDKs 该代码示例演示了如何创建域和管道,以及如何将数据摄取到管道中。

主题

Python

以下示例脚本使用 AWS SDK for Python (Boto3) 创建 IAM 管道角色、用于向其写入数据的域以及用于摄取数据的管道。然后,它使用 requests HTTP 库将示例日志文件提取到管道中。

要安装所需依赖项,请运行以下命令:

pip install boto3 pip install botocore pip install requests pip install requests-auth-aws-sigv4

在脚本中,将 account-id 的所有实例替换为 AWS 账户 ID。

import boto3 import botocore from botocore.config import Config import requests from requests_auth_aws_sigv4 import AWSSigV4 import time # Build the client using the default credential configuration. # You can use the CLI and run 'aws configure' to set access key, secret # key, and default region. opensearch = boto3.client('opensearch', config=my_config) iam = boto3.client('iam', config=my_config) osis = boto3.client('osis', config=my_config) domainName = 'test-domain' # The name of the domain pipelineName = 'test-pipeline' # The name of the pipeline def createPipelineRole(iam, domainName): """Creates the pipeline role""" response = iam.create_policy( PolicyName='pipeline-policy', PolicyDocument=f'{{\"Version\":\"2012-10-17\",\"Statement\":[{{\"Effect\":\"Allow\",\"Action\":\"es:DescribeDomain\",\"Resource\":\"arn:aws:es:us-east-1:account-id:domain\/{domainName}\"}},{{\"Effect\":\"Allow\",\"Action\":\"es:ESHttp*\",\"Resource\":\"arn:aws:es:us-east-1:account-id:domain\/{domainName}\/*\"}}]}}' ) policyarn = response['Policy']['Arn'] response = iam.create_role( RoleName='PipelineRole', AssumeRolePolicyDocument='{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"osis-pipelines.amazonaws.com\"},\"Action\":\"sts:AssumeRole\"}]}' ) rolename=response['Role']['RoleName'] response = iam.attach_role_policy( RoleName=rolename, PolicyArn=policyarn ) print('Creating pipeline role...') time.sleep(10) print('Role created: ' + rolename) def createDomain(opensearch, domainName): """Creates a domain to ingest data into""" response = opensearch.create_domain( DomainName=domainName, EngineVersion='OpenSearch_2.3', ClusterConfig={ 'InstanceType': 't2.small.search', 'InstanceCount': 5, 'DedicatedMasterEnabled': True, 'DedicatedMasterType': 't2.small.search', 'DedicatedMasterCount': 3 }, # Many instance types require EBS storage. EBSOptions={ 'EBSEnabled': True, 'VolumeType': 'gp2', 'VolumeSize': 10 }, AccessPolicies=f'{{\"Version\":\"2012-10-17\",\"Statement\":[{{\"Effect\":\"Allow\",\"Principal\":{{\"AWS\":\"arn:aws:iam::account-id:role\/PipelineRole\"}},\"Action\":\"es:*\",\"Resource\":\"arn:aws:es:us-east-1:account-id:domain\/{domainName}\/*\"}}]}}', NodeToNodeEncryptionOptions={ 'Enabled': True } ) return(response) def waitForDomainProcessing(opensearch, domainName): """Waits for the domain to be active""" try: response = opensearch.describe_domain( DomainName=domainName ) # Every 30 seconds, check whether the domain is processing. while 'Endpoint' not in response['DomainStatus']: print('Creating domain...') time.sleep(60) response = opensearch.describe_domain( DomainName=domainName) # Once we exit the loop, the domain is ready for ingestion. endpoint = response['DomainStatus']['Endpoint'] print('Domain endpoint ready to receive data: ' + endpoint) createPipeline(osis, endpoint) except botocore.exceptions.ClientError as error: if error.response['Error']['Code'] == 'ResourceNotFoundException': print('Domain not found.') else: raise error def createPipeline(osis, endpoint): """Creates a pipeline using the domain and pipeline role""" try: definition = f'version: \"2\"\nlog-pipeline:\n source:\n http:\n path: \"/${{pipelineName}}/logs\"\n processor:\n - date:\n from_time_received: true\n destination: \"@timestamp\"\n sink:\n - opensearch:\n hosts: [ \"https://{endpoint}\" ]\n index: \"application_logs\"\n aws:\n region: \"us-east-1\"' response = osis.create_pipeline( PipelineName=pipelineName, MinUnits=4, MaxUnits=9, PipelineConfigurationBody=definition, PipelineRoleArn="arn:aws:iam::account-id:role/PipelineRole" ) response = osis.get_pipeline( PipelineName=pipelineName ) # Every 30 seconds, check whether the pipeline is active. while response['Pipeline']['Status'] == 'CREATING': print('Creating pipeline...') time.sleep(30) response = osis.get_pipeline( PipelineName=pipelineName) # Once we exit the loop, the pipeline is ready for ingestion. ingestionEndpoint = response['Pipeline']['IngestEndpointUrls'][0] print('Pipeline ready to ingest data at endpoint: ' + ingestionEndpoint) ingestData(ingestionEndpoint) except botocore.exceptions.ClientError as error: if error.response['Error']['Code'] == 'ResourceAlreadyExistsException': print('Pipeline already exists.') response = osis.get_pipeline( PipelineName=pipelineName ) ingestionEndpoint = response['Pipeline']['IngestEndpointUrls'][0] ingestData(ingestionEndpoint) else: raise error def ingestData(ingestionEndpoint): """Ingests a sample log file into the pipeline""" endpoint = 'https://' + ingestionEndpoint r = requests.request('POST', f'{endpoint}/log-pipeline/logs', data='[{"time":"2014-08-11T11:40:13+00:00","remote_addr":"122.226.223.69","status":"404","request":"GET http://www.k2proxy.com//hello.html HTTP/1.1","http_user_agent":"Mozilla/4.0 (compatible; WOW64; SLCC2;)"}]', auth=AWSSigV4('osis')) print('Ingesting sample log file into pipeline') print('Response: ' + r.text) def main(): createPipelineRole(iam, domainName) createDomain(opensearch, domainName) waitForDomainProcessing(opensearch, domainName) if __name__ == "__main__": main()