SDK for Python(Boto3)을 사용한 Neptune 예제 - AWS SDK 코드 예제

Doc AWS SDK 예제 GitHub 리포지토리에서 더 많은 SDK 예제를 사용할 수 있습니다. AWS

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

SDK for Python(Boto3)을 사용한 Neptune 예제

다음 코드 예제에서는 Neptune과 AWS SDK for Python (Boto3) 함께를 사용하여 작업을 수행하고 일반적인 시나리오를 구현하는 방법을 보여줍니다.

기본 사항은 서비스 내에서 필수 작업을 수행하는 방법을 보여주는 코드 예제입니다.

작업은 대규모 프로그램에서 발췌한 코드이며 컨텍스트에 맞춰 실행해야 합니다. 작업은 관련 시나리오의 컨텍스트에 따라 표시되며, 개별 서비스 함수를 직접적으로 호출하는 방법을 보여줍니다.

각 예시에는 전체 소스 코드에 대한 링크가 포함되어 있으며, 여기에서 컨텍스트에 맞춰 코드를 설정하고 실행하는 방법에 대한 지침을 찾을 수 있습니다.

시작

다음 코드 예제에서는 Neptune 사용을 시작하는 방법을 보여줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예 리포지토리에서 전체 예를 찾고 설정 및 실행하는 방법을 배워보세요.

import boto3 from botocore.exceptions import ClientError def describe_db_clusters(neptune_client): """ Describes the Amazon Neptune DB clusters using a paginator to handle multiple pages. Raises ClientError with 'ResourceNotFoundException' if no clusters are found. """ paginator = neptune_client.get_paginator("describe_db_clusters") clusters_found = False for page in paginator.paginate(): for cluster in page.get("DBClusters", []): clusters_found = True print(f"Cluster Identifier: {cluster['DBClusterIdentifier']}") print(f"Status: {cluster['Status']}") if not clusters_found: raise ClientError( { "Error": { "Code": "ResourceNotFoundException", "Message": "No Neptune DB clusters found." } }, operation_name="DescribeDBClusters" ) def main(): """ Main entry point: creates the Neptune client and calls the describe operation. """ neptune_client = boto3.client("neptune") try: describe_db_clusters(neptune_client) except ClientError as e: error_code = e.response["Error"]["Code"] if error_code == "ResourceNotFoundException": print(f"Resource not found: {e.response['Error']['Message']}") else: print(f"Unexpected ClientError: {e.response['Error']['Message']}") except Exception as e: print(f"Unexpected error: {str(e)}") if __name__ == "__main__": main()

기본 사항

다음 코드 예제는 다음과 같은 작업을 수행하는 방법을 보여줍니다.

  • Amazon Neptune 서브넷 그룹을 생성합니다.

  • Neptune 클러스터를 생성합니다.

  • Neptune 인스턴스를 생성합니다.

  • Neptune 인스턴스의 상태를 확인합니다.

  • Neptune 클러스터 세부 정보를 표시합니다.

  • Neptune 클러스터를 중지합니다.

  • Neptune 클러스터를 시작합니다.

  • Neptune 자산을 삭제합니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예 리포지토리에서 전체 예를 찾고 설정 및 실행하는 방법을 배워보세요.

import boto3 import time from botocore.exceptions import ClientError # Constants used in this scenario POLL_INTERVAL_SECONDS = 10 TIMEOUT_SECONDS = 1200 # 20 minutes def delete_db_cluster(neptune_client, cluster_id: str): """ Deletes a Neptune DB cluster and throws exceptions to the caller. Args: neptune_client (boto3.client): The Neptune client object. cluster_id (str): The ID of the Neptune DB cluster to be deleted. Raises: ClientError: If the delete operation fails. """ request = { 'DBClusterIdentifier': cluster_id, 'SkipFinalSnapshot': True } try: print(f"Deleting DB Cluster: {cluster_id}") neptune_client.delete_db_cluster(**request) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBClusterNotFoundFault": print(f"Cluster '{cluster_id}' not found or already deleted.") elif code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't delete DB cluster. {code}: {message}") raise def format_elapsed_time(seconds: int) -> str: mins, secs = divmod(seconds, 60) hours, mins = divmod(mins, 60) return f"{hours:02}:{mins:02}:{secs:02}" def delete_db_instance(neptune_client, instance_id: str): """ Deletes a Neptune DB instance and waits for its deletion to complete. Raises exception to be handled by calling code. """ print(f"Initiating deletion of DB Instance: {instance_id}") try: neptune_client.delete_db_instance( DBInstanceIdentifier=instance_id, SkipFinalSnapshot=True ) print(f"Waiting for DB Instance '{instance_id}' to be deleted...") waiter = neptune_client.get_waiter('db_instance_deleted') waiter.wait( DBInstanceIdentifier=instance_id, WaiterConfig={ 'Delay': 30, 'MaxAttempts': 40 } ) print(f"DB Instance '{instance_id}' successfully deleted.") except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBInstanceNotFoundFault": print(f"Instance '{instance_id}' not found or already deleted.") elif code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't delete DB instance. {code}: {message}") raise def delete_db_subnet_group(neptune_client, subnet_group_name): """ Deletes a Neptune DB subnet group synchronously using Boto3. Args: neptune_client (boto3.client): The Neptune client. subnet_group_name (str): The name of the DB subnet group to delete. Raises: ClientError: If the delete operation fails. """ delete_group_request = { 'DBSubnetGroupName': subnet_group_name } try: neptune_client.delete_db_subnet_group(**delete_group_request) print(f"️ Deleting Subnet Group: {subnet_group_name}") except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBSubnetGroupNotFoundFault": print(f"Subnet group '{subnet_group_name}' not found or already deleted.") elif code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't delete subnet group. {code}: {message}") raise def wait_for_cluster_status( neptune_client, cluster_id: str, desired_status: str, timeout_seconds: int = TIMEOUT_SECONDS, poll_interval_seconds: int = POLL_INTERVAL_SECONDS ): """ Waits for a Neptune DB cluster to reach a desired status. Args: neptune_client (boto3.client): The Amazon Neptune client. cluster_id (str): The identifier of the Neptune DB cluster. desired_status (str): The target status (e.g., "available", "stopped"). timeout_seconds (int): Max time to wait in seconds (default: 1200). poll_interval_seconds (int): Polling interval in seconds (default: 10). Raises: RuntimeError: If the desired status is not reached before timeout. """ print(f"Waiting for cluster '{cluster_id}' to reach status '{desired_status}'...") start_time = time.time() while True: # Prepare request object describe_cluster_request = { 'DBClusterIdentifier': cluster_id } # Call the Neptune API response = neptune_client.describe_db_clusters(**describe_cluster_request) clusters = response.get('DBClusters', []) current_status = clusters[0].get('Status') if clusters else None elapsed_seconds = int(time.time() - start_time) status_str = current_status if current_status else "Unknown" print( f"\r Elapsed: {format_elapsed_time(elapsed_seconds):<20} Cluster status: {status_str:<20}", end="", flush=True ) if current_status and current_status.lower() == desired_status.lower(): print( f"\nNeptune cluster reached desired status '{desired_status}' after {format_elapsed_time(elapsed_seconds)}." ) return if elapsed_seconds > timeout_seconds: raise RuntimeError(f"Timeout waiting for Neptune cluster to reach status: {desired_status}") time.sleep(poll_interval_seconds) def start_db_cluster(neptune_client, cluster_identifier: str): """ Starts an Amazon Neptune DB cluster and waits until it reaches 'available'. Args: neptune_client (boto3.client): The Neptune client. cluster_identifier (str): The DB cluster identifier. Raises: ClientError: Propagates AWS API issues like resource not found. RuntimeError: If cluster doesn't reach 'available' within timeout. """ try: # Initial wait in case the cluster was just stopped time.sleep(30) neptune_client.start_db_cluster(DBClusterIdentifier=cluster_identifier) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't start DB cluster. Here's why: {code}: {message}") raise start_time = time.time() paginator = neptune_client.get_paginator('describe_db_clusters') while True: try: pages = paginator.paginate(DBClusterIdentifier=cluster_identifier) clusters = [] for page in pages: clusters.extend(page.get('DBClusters', [])) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBClusterNotFound": print(f"Cluster '{cluster_identifier}' not found while polling. It may have been deleted.") else: print(f"Couldn't describe DB cluster. Here's why: {code}: {message}") raise status = clusters[0].get('Status') if clusters else None elapsed = time.time() - start_time print(f"\rElapsed: {int(elapsed)}s – Cluster status: {status}", end="", flush=True) if status and status.lower() == 'available': print(f"\n🎉 Cluster '{cluster_identifier}' is available.") return if elapsed > TIMEOUT_SECONDS: raise RuntimeError(f"Timeout waiting for cluster '{cluster_identifier}' to become available.") time.sleep(POLL_INTERVAL_SECONDS) def stop_db_cluster(neptune_client, cluster_identifier: str): """ Stops an Amazon Neptune DB cluster and waits until it's fully stopped. Args: neptune_client (boto3.client): The Neptune client. cluster_identifier (str): The DB cluster identifier. Raises: ClientError: For AWS API errors (e.g., resource not found). RuntimeError: If the cluster doesn't stop within the timeout. """ try: neptune_client.stop_db_cluster(DBClusterIdentifier=cluster_identifier) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't stop DB cluster. Here's why: {code}: {message}") raise start_time = time.time() paginator = neptune_client.get_paginator('describe_db_clusters') while True: try: pages = paginator.paginate(DBClusterIdentifier=cluster_identifier) clusters = [] for page in pages: clusters.extend(page.get('DBClusters', [])) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBClusterNotFound": print(f"Cluster '{cluster_identifier}' not found while polling. It may have been deleted.") else: print(f"Couldn't describe DB cluster. Here's why: {code}: {message}") raise status = clusters[0].get('Status') if clusters else None elapsed = time.time() - start_time print(f"\rElapsed: {int(elapsed)}s – Cluster status: {status}", end="", flush=True) if status and status.lower() == 'stopped': print(f"\nCluster '{cluster_identifier}' is now stopped.") return if elapsed > TIMEOUT_SECONDS: raise RuntimeError(f"Timeout waiting for cluster '{cluster_identifier}' to stop.") time.sleep(POLL_INTERVAL_SECONDS) def describe_db_clusters(neptune_client, cluster_id: str): """ Describes details of a Neptune DB cluster, paginating if needed. Args: neptune_client (boto3.client): The Neptune client. cluster_id (str): The ID of the cluster to describe. Raises: ClientError: If there's an AWS API error (e.g., cluster not found). """ paginator = neptune_client.get_paginator('describe_db_clusters') try: pages = paginator.paginate(DBClusterIdentifier=cluster_id) found = False for page in pages: for cluster in page.get('DBClusters', []): found = True print(f"Cluster Identifier: {cluster.get('DBClusterIdentifier')}") print(f"Status: {cluster.get('Status')}") print(f"Engine: {cluster.get('Engine')}") print(f"Engine Version: {cluster.get('EngineVersion')}") print(f"Endpoint: {cluster.get('Endpoint')}") print(f"Reader Endpoint: {cluster.get('ReaderEndpoint')}") print(f"Availability Zones: {cluster.get('AvailabilityZones')}") print(f"Subnet Group: {cluster.get('DBSubnetGroup')}") print("VPC Security Groups:") for vpc_group in cluster.get('VpcSecurityGroups', []): print(f" - {vpc_group.get('VpcSecurityGroupId')}") print(f"Storage Encrypted: {cluster.get('StorageEncrypted')}") print(f"IAM Auth Enabled: {cluster.get('IAMDatabaseAuthenticationEnabled')}") print(f"Backup Retention Period: {cluster.get('BackupRetentionPeriod')} days") print(f"Preferred Backup Window: {cluster.get('PreferredBackupWindow')}") print(f"Preferred Maintenance Window: {cluster.get('PreferredMaintenanceWindow')}") print("------") if not found: # Treat empty response as cluster not found raise ClientError( {"Error": {"Code": "DBClusterNotFound", "Message": f"No cluster found with ID '{cluster_id}'"}}, "DescribeDBClusters" ) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") elif code == "DBClusterNotFound": print(f"Cluster '{cluster_id}' not found. Please verify the cluster ID.") else: print(f"Couldn't describe DB cluster. Here's why: {code}: {message}") raise def check_instance_status(neptune_client, instance_id: str, desired_status: str): """ Polls the status of a Neptune DB instance until it reaches desired_status. Uses pagination via describe_db_instances — even for a single instance. Raises: ClientError: If describe_db_instances fails (e.g., instance not found). RuntimeError: If timeout expires before reaching desired status. """ paginator = neptune_client.get_paginator('describe_db_instances') start_time = time.time() while True: try: pages = paginator.paginate(DBInstanceIdentifier=instance_id) instances = [] for page in pages: instances.extend(page.get('DBInstances', [])) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBInstanceNotFound": print(f"Instance '{instance_id}' not found. Please verify the instance ID.") else: print(f"Failed to describe DB instance. {code}: {message}") raise current_status = instances[0].get('DBInstanceStatus') if instances else None elapsed = int(time.time() - start_time) print(f"\rElapsed: {format_elapsed_time(elapsed)} Status: {current_status}", end="", flush=True) if current_status and current_status.lower() == desired_status.lower(): print(f"\nInstance '{instance_id}' reached '{desired_status}' in {format_elapsed_time(elapsed)}.") return if elapsed > TIMEOUT_SECONDS: raise RuntimeError(f"Timeout waiting for '{instance_id}' to reach '{desired_status}'") time.sleep(POLL_INTERVAL_SECONDS) def create_db_instance(neptune_client, db_instance_id: str, db_cluster_id: str) -> str: try: request = { 'DBInstanceIdentifier': db_instance_id, 'DBInstanceClass': 'db.r5.large', 'Engine': 'neptune', 'DBClusterIdentifier': db_cluster_id } print(f"Creating Neptune DB Instance: {db_instance_id}") response = neptune_client.create_db_instance(**request) instance = response.get('DBInstance') if not instance or 'DBInstanceIdentifier' not in instance: raise RuntimeError("Instance creation succeeded but no ID returned.") print(f"Waiting for DB Instance '{db_instance_id}' to become available...") waiter = neptune_client.get_waiter('db_instance_available') waiter.wait( DBInstanceIdentifier=db_instance_id, WaiterConfig={'Delay': 30, 'MaxAttempts': 40} ) print(f"DB Instance '{db_instance_id}' is now available.") return instance['DBInstanceIdentifier'] except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't create DB instance. Here's why: {code}: {message}") raise except Exception as e: print(f"Unexpected error creating DB instance '{db_instance_id}': {e}") raise RuntimeError(f"Unexpected error creating DB instance '{db_instance_id}': {e}") from e def create_db_cluster(neptune_client, db_name: str) -> str: """ Creates a Neptune DB cluster and returns its identifier. Args: neptune_client (boto3.client): The Neptune client object. db_name (str): The desired cluster identifier. Returns: str: The DB cluster identifier. Raises: RuntimeError: For any failure or AWS error, with a user-friendly message. """ request = { 'DBClusterIdentifier': db_name, 'Engine': 'neptune', 'DeletionProtection': False, 'BackupRetentionPeriod': 1 } try: response = neptune_client.create_db_cluster(**request) cluster = response.get('DBCluster') or {} cluster_id = cluster.get('DBClusterIdentifier') if not cluster_id: raise RuntimeError("Cluster created but no ID returned.") print(f"DB Cluster created: {cluster_id}") return cluster_id except ClientError as e: code = e.response["Error"]["Code"] message = e.response["Error"]["Message"] if code in ("ServiceQuotaExceededException", "DBClusterQuotaExceededFault"): raise RuntimeError("You have exceeded the quota for Neptune DB clusters.") from e else: raise RuntimeError(f"AWS error [{code}]: {message}") from e except Exception as e: raise RuntimeError(f"Unexpected error creating DB cluster '{db_name}': {e}") from e def get_subnet_ids(vpc_id: str) -> list[str]: ec2_client = boto3.client('ec2') describe_subnets_request = { 'Filters': [{'Name': 'vpc-id', 'Values': [vpc_id]}] } response = ec2_client.describe_subnets(**describe_subnets_request) subnets = response.get('Subnets', []) subnet_ids = [subnet['SubnetId'] for subnet in subnets if 'SubnetId' in subnet] return subnet_ids def get_default_vpc_id() -> str: ec2_client = boto3.client('ec2') describe_vpcs_request = { 'Filters': [{'Name': 'isDefault', 'Values': ['true']}] } response = ec2_client.describe_vpcs(**describe_vpcs_request) vpcs = response.get('Vpcs', []) if not vpcs: raise RuntimeError("No default VPC found in this region.") default_vpc_id = vpcs[0]['VpcId'] print(f"Default VPC ID: {default_vpc_id}") return default_vpc_id def create_subnet_group(neptune_client, group_name: str): """ Creates a Neptune DB subnet group and returns its name and ARN. Args: neptune_client (boto3.client): The Neptune client object. group_name (str): The desired name of the subnet group. Returns: tuple(str, str): (subnet_group_name, subnet_group_arn) Raises: RuntimeError: For quota errors or other AWS-related failures. """ vpc_id = get_default_vpc_id() subnet_ids = get_subnet_ids(vpc_id) request = { 'DBSubnetGroupName': group_name, 'DBSubnetGroupDescription': 'My Neptune subnet group', 'SubnetIds': subnet_ids, 'Tags': [{'Key': 'Environment', 'Value': 'Dev'}] } try: response = neptune_client.create_db_subnet_group(**request) sg = response.get("DBSubnetGroup", {}) name = sg.get("DBSubnetGroupName") arn = sg.get("DBSubnetGroupArn") if not name or not arn: raise RuntimeError("Response missing subnet group name or ARN.") print(f"Subnet group created: {name}") print(f"ARN: {arn}") return name, arn except ClientError as e: code = e.response["Error"]["Code"] msg = e.response["Error"]["Message"] if code == "ServiceQuotaExceededException": print("Subnet group quota exceeded.") raise RuntimeError("Subnet group quota exceeded.") from e else: print(f"AWS error [{code}]: {msg}") raise RuntimeError(f"AWS error [{code}]: {msg}") from e except Exception as e: print(f"Unexpected error creating subnet group '{group_name}': {e}") raise RuntimeError(f"Unexpected error creating subnet group '{group_name}': {e}") from e def wait_for_input_to_continue(): input("\nPress <ENTER> to continue...") print("Continuing with the program...\n") def run_scenario(neptune_client, subnet_group_name: str, db_instance_id: str, cluster_name: str): print("-" * 88) print("1. Create a Neptune DB Subnet Group") wait_for_input_to_continue() try: name, arn = create_subnet_group(neptune_client, subnet_group_name) print(f"Subnet group successfully created: {name}") print("-" * 88) print("2. Create a Neptune Cluster") wait_for_input_to_continue() db_cluster_id = create_db_cluster(neptune_client, cluster_name) print("-" * 88) print("3. Create a Neptune DB Instance") wait_for_input_to_continue() create_db_instance(neptune_client, db_instance_id, cluster_name) print("-" * 88) print("4. Check the status of the Neptune DB Instance") print(""" Even though you're targeting a single DB instance, describe_db_instances supports pagination and can return multiple pages. Handling paginated responses ensures your method continues to work reliably even if AWS returns large or paged results. """) wait_for_input_to_continue() check_instance_status(neptune_client, db_instance_id, "available") print("-" * 88) print("5. Show Neptune Cluster details") wait_for_input_to_continue() describe_db_clusters(neptune_client, db_cluster_id) print("-" * 88) print("6. Stop the Amazon Neptune cluster") print(""" Boto3 doesn't currently offer a built-in waiter for stop_db_cluster, This example implements a custom polling strategy until the cluster is in a stopped state. """) wait_for_input_to_continue() stop_db_cluster(neptune_client, db_cluster_id) check_instance_status(neptune_client, db_instance_id, "stopped") print("-" * 88) print("7. Start the Amazon Neptune cluster") print(""" Boto3 doesn't currently offer a built-in waiter for start_db_cluster, This example implements a custom polling strategy until the cluster is in an available state. """) wait_for_input_to_continue() start_db_cluster(neptune_client, db_cluster_id) wait_for_cluster_status(neptune_client, db_cluster_id, "available") check_instance_status(neptune_client, db_instance_id, "available") print("All Neptune resources are now available.") print("-" * 88) print("-" * 88) print("8. Delete the Neptune Assets") print("Would you like to delete the Neptune Assets? (y/n)") del_ans = input().strip().lower() if del_ans == "y": print("You selected to delete the Neptune assets.") delete_db_instance(neptune_client, db_instance_id) delete_db_cluster(neptune_client, db_cluster_id) delete_db_subnet_group(neptune_client, subnet_group_name) print("Neptune resources deleted successfully") except ClientError as ce: code = ce.response["Error"]["Code"] if code in ("DBInstanceNotFound", "DBInstanceNotFoundFault", "ResourceNotFound"): print(f"Instance '{db_instance_id}' not found.") elif code in ("DBClusterNotFound", "DBClusterNotFoundFault", "ResourceNotFoundFault"): print(f"Cluster '{cluster_name}' not found.") elif code == "DBSubnetGroupNotFoundFault": print(f"Subnet group '{subnet_group_name}' not found.") elif code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"AWS error [{code}]: {ce.response['Error']['Message']}") raise # re-raise unexpected errors except RuntimeError as re: print(f"Runtime error or timeout: {re}") def main(): neptune_client = boto3.client('neptune') # Customize the following names to match your Neptune setup # (You must change these to unique values for your environment) subnet_group_name = "neptuneSubnetGroup111" cluster_name = "neptuneCluster111" db_instance_id = "neptuneDB111" print(""" Amazon Neptune is a fully managed graph database service by AWS... Let's get started! """) wait_for_input_to_continue() run_scenario(neptune_client, subnet_group_name, db_instance_id, cluster_name) print(""" Thank you for checking out the Amazon Neptune Service Use demo. For more AWS code examples, visit: https://docs.aws.amazon.com/code-library/latest/ug/what-is-code-library.html """) if __name__ == "__main__": main()

작업

다음 코드 예시는 CreateDBCluster의 사용 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예 리포지토리에서 전체 예를 찾고 설정 및 실행하는 방법을 배워보세요.

def create_db_cluster(neptune_client, db_name: str) -> str: """ Creates a Neptune DB cluster and returns its identifier. Args: neptune_client (boto3.client): The Neptune client object. db_name (str): The desired cluster identifier. Returns: str: The DB cluster identifier. Raises: RuntimeError: For any failure or AWS error, with a user-friendly message. """ request = { 'DBClusterIdentifier': db_name, 'Engine': 'neptune', 'DeletionProtection': False, 'BackupRetentionPeriod': 1 } try: response = neptune_client.create_db_cluster(**request) cluster = response.get('DBCluster') or {} cluster_id = cluster.get('DBClusterIdentifier') if not cluster_id: raise RuntimeError("Cluster created but no ID returned.") print(f"DB Cluster created: {cluster_id}") return cluster_id except ClientError as e: code = e.response["Error"]["Code"] message = e.response["Error"]["Message"] if code in ("ServiceQuotaExceededException", "DBClusterQuotaExceededFault"): raise RuntimeError("You have exceeded the quota for Neptune DB clusters.") from e else: raise RuntimeError(f"AWS error [{code}]: {message}") from e except Exception as e: raise RuntimeError(f"Unexpected error creating DB cluster '{db_name}': {e}") from e
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조CreateDBCluster를 참조하십시오.

다음 코드 예시는 CreateDBInstance의 사용 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예 리포지토리에서 전체 예를 찾고 설정 및 실행하는 방법을 배워보세요.

def create_db_instance(neptune_client, db_instance_id: str, db_cluster_id: str) -> str: try: request = { 'DBInstanceIdentifier': db_instance_id, 'DBInstanceClass': 'db.r5.large', 'Engine': 'neptune', 'DBClusterIdentifier': db_cluster_id } print(f"Creating Neptune DB Instance: {db_instance_id}") response = neptune_client.create_db_instance(**request) instance = response.get('DBInstance') if not instance or 'DBInstanceIdentifier' not in instance: raise RuntimeError("Instance creation succeeded but no ID returned.") print(f"Waiting for DB Instance '{db_instance_id}' to become available...") waiter = neptune_client.get_waiter('db_instance_available') waiter.wait( DBInstanceIdentifier=db_instance_id, WaiterConfig={'Delay': 30, 'MaxAttempts': 40} ) print(f"DB Instance '{db_instance_id}' is now available.") return instance['DBInstanceIdentifier'] except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't create DB instance. Here's why: {code}: {message}") raise except Exception as e: print(f"Unexpected error creating DB instance '{db_instance_id}': {e}") raise RuntimeError(f"Unexpected error creating DB instance '{db_instance_id}': {e}") from e
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조CreateDBInstance를 참조하십시오.

다음 코드 예시는 CreateDBSubnetGroup의 사용 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예 리포지토리에서 전체 예를 찾고 설정 및 실행하는 방법을 배워보세요.

def create_subnet_group(neptune_client, group_name: str): """ Creates a Neptune DB subnet group and returns its name and ARN. Args: neptune_client (boto3.client): The Neptune client object. group_name (str): The desired name of the subnet group. Returns: tuple(str, str): (subnet_group_name, subnet_group_arn) Raises: RuntimeError: For quota errors or other AWS-related failures. """ vpc_id = get_default_vpc_id() subnet_ids = get_subnet_ids(vpc_id) request = { 'DBSubnetGroupName': group_name, 'DBSubnetGroupDescription': 'My Neptune subnet group', 'SubnetIds': subnet_ids, 'Tags': [{'Key': 'Environment', 'Value': 'Dev'}] } try: response = neptune_client.create_db_subnet_group(**request) sg = response.get("DBSubnetGroup", {}) name = sg.get("DBSubnetGroupName") arn = sg.get("DBSubnetGroupArn") if not name or not arn: raise RuntimeError("Response missing subnet group name or ARN.") print(f"Subnet group created: {name}") print(f"ARN: {arn}") return name, arn except ClientError as e: code = e.response["Error"]["Code"] msg = e.response["Error"]["Message"] if code == "ServiceQuotaExceededException": print("Subnet group quota exceeded.") raise RuntimeError("Subnet group quota exceeded.") from e else: print(f"AWS error [{code}]: {msg}") raise RuntimeError(f"AWS error [{code}]: {msg}") from e except Exception as e: print(f"Unexpected error creating subnet group '{group_name}': {e}") raise RuntimeError(f"Unexpected error creating subnet group '{group_name}': {e}") from e
  • API 세부 정보는 SDK for Python (Boto3) API 참조의 CreateDBSubnetGroup을 참조하세요. AWS

다음 코드 예시는 CreateGraph의 사용 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예 리포지토리에서 전체 예를 찾고 설정 및 실행하는 방법을 배워보세요.

""" Running this example. ---------------------------------------------------------------------------------- VPC Networking Requirement: ---------------------------------------------------------------------------------- Amazon Neptune must be accessed from **within the same VPC** as the Neptune cluster. It does not expose a public endpoint, so this code must be executed from: - An **AWS Lambda function** configured to run inside the same VPC - An **EC2 instance** or **ECS task** running in the same VPC - A connected environment such as a **VPN**, **AWS Direct Connect**, or a **peered VPC** """ GRAPH_NAME = "sample-analytics-graph" def main(): config = Config(retries={"total_max_attempts": 1, "mode": "standard"}, read_timeout=None) client = boto3.client("neptune-graph", config=config) execute_create_graph(client, GRAPH_NAME) def execute_create_graph(client, graph_name): try: print("Creating Neptune graph...") response = client.create_graph( graphName=graph_name, provisionedMemory = 16 ) created_graph_name = response.get("name") graph_arn = response.get("arn") graph_endpoint = response.get("endpoint") print("Graph created successfully!") print(f"Graph Name: {created_graph_name}") print(f"Graph ARN: {graph_arn}") print(f"Graph Endpoint: {graph_endpoint}") except ClientError as e: print(f"Failed to create graph: {e.response['Error']['Message']}") except BotoCoreError as e: print(f"Failed to create graph: {str(e)}") except Exception as e: print(f"Unexpected error: {str(e)}") if __name__ == "__main__": main()
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조CreateGraph를 참조하세요.

다음 코드 예시는 DeleteDBCluster의 사용 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예 리포지토리에서 전체 예를 찾고 설정 및 실행하는 방법을 배워보세요.

def delete_db_cluster(neptune_client, cluster_id: str): """ Deletes a Neptune DB cluster and throws exceptions to the caller. Args: neptune_client (boto3.client): The Neptune client object. cluster_id (str): The ID of the Neptune DB cluster to be deleted. Raises: ClientError: If the delete operation fails. """ request = { 'DBClusterIdentifier': cluster_id, 'SkipFinalSnapshot': True } try: print(f"Deleting DB Cluster: {cluster_id}") neptune_client.delete_db_cluster(**request) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBClusterNotFoundFault": print(f"Cluster '{cluster_id}' not found or already deleted.") elif code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't delete DB cluster. {code}: {message}") raise
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조CreateDBInstance를 참조하십시오.

다음 코드 예시는 DeleteDBInstance의 사용 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예 리포지토리에서 전체 예를 찾고 설정 및 실행하는 방법을 배워보세요.

def delete_db_instance(neptune_client, instance_id: str): """ Deletes a Neptune DB instance and waits for its deletion to complete. Raises exception to be handled by calling code. """ print(f"Initiating deletion of DB Instance: {instance_id}") try: neptune_client.delete_db_instance( DBInstanceIdentifier=instance_id, SkipFinalSnapshot=True ) print(f"Waiting for DB Instance '{instance_id}' to be deleted...") waiter = neptune_client.get_waiter('db_instance_deleted') waiter.wait( DBInstanceIdentifier=instance_id, WaiterConfig={ 'Delay': 30, 'MaxAttempts': 40 } ) print(f"DB Instance '{instance_id}' successfully deleted.") except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBInstanceNotFoundFault": print(f"Instance '{instance_id}' not found or already deleted.") elif code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't delete DB instance. {code}: {message}") raise
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조DeleteDBInstance를 참조하십시오.

다음 코드 예시는 DeleteDBSubnetGroup의 사용 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예 리포지토리에서 전체 예를 찾고 설정 및 실행하는 방법을 배워보세요.

def delete_db_subnet_group(neptune_client, subnet_group_name): """ Deletes a Neptune DB subnet group synchronously using Boto3. Args: neptune_client (boto3.client): The Neptune client. subnet_group_name (str): The name of the DB subnet group to delete. Raises: ClientError: If the delete operation fails. """ delete_group_request = { 'DBSubnetGroupName': subnet_group_name } try: neptune_client.delete_db_subnet_group(**delete_group_request) print(f"️ Deleting Subnet Group: {subnet_group_name}") except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBSubnetGroupNotFoundFault": print(f"Subnet group '{subnet_group_name}' not found or already deleted.") elif code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't delete subnet group. {code}: {message}") raise
  • API 세부 정보는 SDK for Python (Boto3) API 참조의 DeleteDBSubnetGroup을 참조하세요. AWS

다음 코드 예시는 DescribeDBClusters의 사용 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예 리포지토리에서 전체 예를 찾고 설정 및 실행하는 방법을 배워보세요.

def describe_db_clusters(neptune_client, cluster_id: str): """ Describes details of a Neptune DB cluster, paginating if needed. Args: neptune_client (boto3.client): The Neptune client. cluster_id (str): The ID of the cluster to describe. Raises: ClientError: If there's an AWS API error (e.g., cluster not found). """ paginator = neptune_client.get_paginator('describe_db_clusters') try: pages = paginator.paginate(DBClusterIdentifier=cluster_id) found = False for page in pages: for cluster in page.get('DBClusters', []): found = True print(f"Cluster Identifier: {cluster.get('DBClusterIdentifier')}") print(f"Status: {cluster.get('Status')}") print(f"Engine: {cluster.get('Engine')}") print(f"Engine Version: {cluster.get('EngineVersion')}") print(f"Endpoint: {cluster.get('Endpoint')}") print(f"Reader Endpoint: {cluster.get('ReaderEndpoint')}") print(f"Availability Zones: {cluster.get('AvailabilityZones')}") print(f"Subnet Group: {cluster.get('DBSubnetGroup')}") print("VPC Security Groups:") for vpc_group in cluster.get('VpcSecurityGroups', []): print(f" - {vpc_group.get('VpcSecurityGroupId')}") print(f"Storage Encrypted: {cluster.get('StorageEncrypted')}") print(f"IAM Auth Enabled: {cluster.get('IAMDatabaseAuthenticationEnabled')}") print(f"Backup Retention Period: {cluster.get('BackupRetentionPeriod')} days") print(f"Preferred Backup Window: {cluster.get('PreferredBackupWindow')}") print(f"Preferred Maintenance Window: {cluster.get('PreferredMaintenanceWindow')}") print("------") if not found: # Treat empty response as cluster not found raise ClientError( {"Error": {"Code": "DBClusterNotFound", "Message": f"No cluster found with ID '{cluster_id}'"}}, "DescribeDBClusters" ) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") elif code == "DBClusterNotFound": print(f"Cluster '{cluster_id}' not found. Please verify the cluster ID.") else: print(f"Couldn't describe DB cluster. Here's why: {code}: {message}") raise
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조DescribeDBClusters를 참조하십시오.

다음 코드 예시는 DescribeDBInstances의 사용 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예 리포지토리에서 전체 예를 찾고 설정 및 실행하는 방법을 배워보세요.

def check_instance_status(neptune_client, instance_id: str, desired_status: str): """ Polls the status of a Neptune DB instance until it reaches desired_status. Uses pagination via describe_db_instances — even for a single instance. Raises: ClientError: If describe_db_instances fails (e.g., instance not found). RuntimeError: If timeout expires before reaching desired status. """ paginator = neptune_client.get_paginator('describe_db_instances') start_time = time.time() while True: try: pages = paginator.paginate(DBInstanceIdentifier=instance_id) instances = [] for page in pages: instances.extend(page.get('DBInstances', [])) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBInstanceNotFound": print(f"Instance '{instance_id}' not found. Please verify the instance ID.") else: print(f"Failed to describe DB instance. {code}: {message}") raise current_status = instances[0].get('DBInstanceStatus') if instances else None elapsed = int(time.time() - start_time) print(f"\rElapsed: {format_elapsed_time(elapsed)} Status: {current_status}", end="", flush=True) if current_status and current_status.lower() == desired_status.lower(): print(f"\nInstance '{instance_id}' reached '{desired_status}' in {format_elapsed_time(elapsed)}.") return if elapsed > TIMEOUT_SECONDS: raise RuntimeError(f"Timeout waiting for '{instance_id}' to reach '{desired_status}'") time.sleep(POLL_INTERVAL_SECONDS)
  • API 세부 정보는 AWS SDK for Python (Boto3) API 참조DescribeDBInstances를 참조하십시오.

다음 코드 예시는 ExecuteGremlinProfileQuery의 사용 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예 리포지토리에서 전체 예를 찾고 설정 및 실행하는 방법을 배워보세요.

# Replace this with your actual Neptune endpoint NEPTUNE_ENDPOINT = "https://[Specify Endpoint]:8182" def main(): """ Entry point of the program. Initializes the Neptune client and executes the Gremlin query. """ config = Config(connect_timeout=10, read_timeout=30, retries={'max_attempts': 3}) neptune_client = boto3.client( "neptunedata", endpoint_url=NEPTUNE_ENDPOINT, config=config ) execute_gremlin_query(neptune_client) def execute_gremlin_query(neptune_client): """ Executes a Gremlin query against an Amazon Neptune database. """ try: print("Querying Neptune...") response = neptune_client.execute_gremlin_explain_query( gremlinQuery="g.V().has('code', 'ANC')" ) print("Full Response:") print(response['output'].read().decode('UTF-8')) except ClientError as e: print(f"Error calling Neptune: {e.response['Error']['Message']}") except BotoCoreError as e: print(f"BotoCore error: {str(e)}") except Exception as e: print(f"Unexpected error: {str(e)}") if __name__ == "__main__": main()

다음 코드 예시는 ExecuteGremlinQuery의 사용 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예 리포지토리에서 전체 예를 찾고 설정 및 실행하는 방법을 배워보세요.

""" Running this example. ---------------------------------------------------------------------------------- VPC Networking Requirement: ---------------------------------------------------------------------------------- Amazon Neptune must be accessed from **within the same VPC** as the Neptune cluster. It does not expose a public endpoint, so this code must be executed from: - An **AWS Lambda function** configured to run inside the same VPC - An **EC2 instance** or **ECS task** running in the same VPC - A connected environment such as a **VPN**, **AWS Direct Connect**, or a **peered VPC** """ # Replace with your actual Neptune endpoint NEPTUNE_ENDPOINT = "https://[Specify-Your-Endpoint]:8182" def main(): """ Entry point of the program. Initializes the Neptune client and runs both EXPLAIN and PROFILE queries. """ config = Config(connect_timeout=10, read_timeout=30, retries={'max_attempts': 3}) neptune_client = boto3.client( "neptunedata", endpoint_url=NEPTUNE_ENDPOINT, config=config ) try: run_profile_query(neptune_client) except ClientError as e: print(f"Neptune error: {e.response['Error']['Message']}") except BotoCoreError as e: print(f"BotoCore error: {str(e)}") except Exception as e: print(f"Unexpected error: {str(e)}") def run_profile_query(neptune_client): """ Runs a PROFILE query on the Neptune graph database. """ print("Running Gremlin PROFILE query...") try: response = neptune_client.execute_gremlin_profile_query( gremlinQuery="g.V().has('code', 'ANC')" ) print("Profile Query Result:") output = response.get("output") if output: print(output.read().decode('utf-8')) else: print("No explain output returned.") except Exception as e: print(f"Failed to execute PROFILE query: {str(e)}") if __name__ == "__main__": main()
  • API 세부 정보는 SDK for Python (Boto3) API 참조의 ExecuteGremlinQuery를 참조하세요. AWS

다음 코드 예시는 ExecuteOpenCypherExplainQuery의 사용 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예 리포지토리에서 전체 예를 찾고 설정 및 실행하는 방법을 배워보세요.

# Replace with your actual Neptune endpoint URL NEPTUNE_ENDPOINT = "https://<your-neptune-endpoint>:8182" def main(): """ Entry point: Create Neptune client and execute different OpenCypher queries. """ config = Config(connect_timeout=10, read_timeout=30, retries={'max_attempts': 3}) neptune_client = boto3.client( "neptunedata", endpoint_url=NEPTUNE_ENDPOINT, config=config ) execute_open_cypher_query_without_params(neptune_client) execute_open_cypher_query_with_params(neptune_client) execute_open_cypher_explain_query(neptune_client) def execute_open_cypher_query_without_params(client): """ Executes a simple OpenCypher query without parameters. """ try: print("\nRunning OpenCypher query without parameters...") resp = client.execute_open_cypher_query( openCypherQuery="MATCH (n {code: 'ANC'}) RETURN n" ) print("Results:") print(resp['results']) except Exception as e: print(f"Error in simple OpenCypher query: {str(e)}") def execute_open_cypher_query_with_params(client): """ Executes an OpenCypher query using parameters. """ try: print("\nRunning OpenCypher query with parameters...") parameters = {'code': 'ANC'} resp = client.execute_open_cypher_query( openCypherQuery="MATCH (n {code: $code}) RETURN n", parameters=json.dumps(parameters) ) print("Results:") print(resp['results']) except Exception as e: print(f"Error in parameterized OpenCypher query: {str(e)}") def execute_open_cypher_explain_query(client): """ Runs an OpenCypher EXPLAIN query in debug mode. """ try: print("\nRunning OpenCypher EXPLAIN query (debug mode)...") resp = client.execute_open_cypher_explain_query( openCypherQuery="MATCH (n {code: 'ANC'}) RETURN n", explainMode="details" ) results = resp.get('results') if results is None: print("No explain results returned.") else: try: print("Explain Results:") print(results.read().decode('UTF-8')) except Exception as e: print(f"Error in OpenCypher EXPLAIN query: {str(e)}") except ClientError as e: print(f"Neptune error: {e.response['Error']['Message']}") except BotoCoreError as e: print(f"BotoCore error: {str(e)}") except Exception as e: print(f"Unexpected error: {str(e)}") if __name__ == "__main__": main()

다음 코드 예시는 ExecuteQuery의 사용 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예 리포지토리에서 전체 예를 찾고 설정 및 실행하는 방법을 배워보세요.

""" Running this example. ---------------------------------------------------------------------------------- VPC Networking Requirement: ---------------------------------------------------------------------------------- Amazon Neptune must be accessed from **within the same VPC** as the Neptune cluster. It does not expose a public endpoint, so this code must be executed from: - An **AWS Lambda function** configured to run inside the same VPC - An **EC2 instance** or **ECS task** running in the same VPC - A connected environment such as a **VPN**, **AWS Direct Connect**, or a **peered VPC** """ GRAPH_ID = "<your-graph-id>" def main(): config = Config(retries={"total_max_attempts": 1, "mode": "standard"}, read_timeout=None) client = boto3.client("neptune-graph", config=config) try: print("\n--- Running OpenCypher query without parameters ---") run_open_cypher_query(client, GRAPH_ID) print("\n--- Running OpenCypher query with parameters ---") run_open_cypher_query_with_params(client, GRAPH_ID) print("\n--- Running OpenCypher explain query ---") run_open_cypher_explain_query(client, GRAPH_ID) except Exception as e: print(f"Unexpected error in main: {e}") def run_open_cypher_query(client, graph_id): """ Run an OpenCypher query without parameters. """ try: resp = client.execute_query( graphIdentifier=graph_id, queryString="MATCH (n {code: 'ANC'}) RETURN n", language='OPEN_CYPHER' ) print(resp['payload'].read().decode('UTF-8')) except client.exceptions.InternalServerException as e: print(f"InternalServerException: {e.response['Error']['Message']}") except ClientError as e: print(f"ClientError: {e.response['Error']['Message']}") except Exception as e: # <--- ADD THIS BLOCK print(f"Unexpected error: {e}") def run_open_cypher_query_with_params(client, graph_id): """ Run an OpenCypher query with parameters. """ try: parameters = {'code': 'ANC'} resp = client.execute_query( graphIdentifier=graph_id, queryString="MATCH (n {code: $code}) RETURN n", language='OPEN_CYPHER', parameters=parameters ) print(resp['payload'].read().decode('UTF-8')) except client.exceptions.InternalServerException as e: print(f"InternalServerException: {e.response['Error']['Message']}") except ClientError as e: print(f"ClientError: {e.response['Error']['Message']}") except Exception as e: # <--- ADD THIS BLOCK print(f"Unexpected error: {e}") def run_open_cypher_explain_query(client, graph_id): """ Run an OpenCypher explain query (explainMode = "debug"). """ try: resp = client.execute_query( graphIdentifier=graph_id, queryString="MATCH (n {code: 'ANC'}) RETURN n", language='OPEN_CYPHER', explainMode='DETAILS' ) print(resp['payload'].read().decode('UTF-8')) except ClientError as e: print(f"Neptune error: {e.response['Error']['Message']}") except BotoCoreError as e: print(f"Unexpected Boto3 error: {str(e)}") except Exception as e: # <-- Add this generic catch print(f"Unexpected error: {str(e)}") if __name__ == "__main__": main()
  • API 세부 정보는 SDK for Python (Boto3) API 참조의 ExecuteQuery를 참조하세요. AWS

다음 코드 예시는 StartDBCluster의 사용 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예 리포지토리에서 전체 예를 찾고 설정 및 실행하는 방법을 배워보세요.

def start_db_cluster(neptune_client, cluster_identifier: str): """ Starts an Amazon Neptune DB cluster and waits until it reaches 'available'. Args: neptune_client (boto3.client): The Neptune client. cluster_identifier (str): The DB cluster identifier. Raises: ClientError: Propagates AWS API issues like resource not found. RuntimeError: If cluster doesn't reach 'available' within timeout. """ try: # Initial wait in case the cluster was just stopped time.sleep(30) neptune_client.start_db_cluster(DBClusterIdentifier=cluster_identifier) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't start DB cluster. Here's why: {code}: {message}") raise start_time = time.time() paginator = neptune_client.get_paginator('describe_db_clusters') while True: try: pages = paginator.paginate(DBClusterIdentifier=cluster_identifier) clusters = [] for page in pages: clusters.extend(page.get('DBClusters', [])) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBClusterNotFound": print(f"Cluster '{cluster_identifier}' not found while polling. It may have been deleted.") else: print(f"Couldn't describe DB cluster. Here's why: {code}: {message}") raise status = clusters[0].get('Status') if clusters else None elapsed = time.time() - start_time print(f"\rElapsed: {int(elapsed)}s – Cluster status: {status}", end="", flush=True) if status and status.lower() == 'available': print(f"\n🎉 Cluster '{cluster_identifier}' is available.") return if elapsed > TIMEOUT_SECONDS: raise RuntimeError(f"Timeout waiting for cluster '{cluster_identifier}' to become available.") time.sleep(POLL_INTERVAL_SECONDS)
  • API 세부 정보는 SDK for Python (Boto3) API 참조의 StartDBCluster를 참조하세요. AWS

다음 코드 예시는 StopDBCluster의 사용 방법을 보여 줍니다.

SDK for Python (Boto3)
참고

GitHub에 더 많은 내용이 있습니다. AWS 코드 예 리포지토리에서 전체 예를 찾고 설정 및 실행하는 방법을 배워보세요.

def stop_db_cluster(neptune_client, cluster_identifier: str): """ Stops an Amazon Neptune DB cluster and waits until it's fully stopped. Args: neptune_client (boto3.client): The Neptune client. cluster_identifier (str): The DB cluster identifier. Raises: ClientError: For AWS API errors (e.g., resource not found). RuntimeError: If the cluster doesn't stop within the timeout. """ try: neptune_client.stop_db_cluster(DBClusterIdentifier=cluster_identifier) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "AccessDeniedException": print("Access denied. Please ensure you have the necessary permissions.") else: print(f"Couldn't stop DB cluster. Here's why: {code}: {message}") raise start_time = time.time() paginator = neptune_client.get_paginator('describe_db_clusters') while True: try: pages = paginator.paginate(DBClusterIdentifier=cluster_identifier) clusters = [] for page in pages: clusters.extend(page.get('DBClusters', [])) except ClientError as err: code = err.response["Error"]["Code"] message = err.response["Error"]["Message"] if code == "DBClusterNotFound": print(f"Cluster '{cluster_identifier}' not found while polling. It may have been deleted.") else: print(f"Couldn't describe DB cluster. Here's why: {code}: {message}") raise status = clusters[0].get('Status') if clusters else None elapsed = time.time() - start_time print(f"\rElapsed: {int(elapsed)}s – Cluster status: {status}", end="", flush=True) if status and status.lower() == 'stopped': print(f"\nCluster '{cluster_identifier}' is now stopped.") return if elapsed > TIMEOUT_SECONDS: raise RuntimeError(f"Timeout waiting for cluster '{cluster_identifier}' to stop.") time.sleep(POLL_INTERVAL_SECONDS)
  • API 세부 정보는 SDK for Python (Boto3) API 참조의 StopDBCluster를 참조하세요. AWS