AWS 文档 SDK 示例
使用 SDK for Python (Boto3) 的 HealthImaging 示例
以下代码示例演示如何通过将 适用于 Python (Boto3) 的 AWS SDK与 HealthImaging 结合使用,来执行操作和实现常见场景。
操作是大型程序的代码摘录,必须在上下文中运行。您可以通过操作了解如何调用单个服务函数,还可以通过函数相关场景的上下文查看操作。
场景是向您演示如何通过在一个服务中调用多个函数或与其他 AWS 服务 结合来完成特定任务的代码示例。
每个示例都包含一个指向完整源代码的链接,您可以从中找到有关如何在上下文中设置和运行代码的说明。
开始使用
以下代码示例演示了如何开始使用 HealthImaging。
- 适用于 Python 的 SDK (Boto3)
-
import logging import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) def hello_medical_imaging(medical_imaging_client): """ Use the AWS SDK for Python (Boto3) to create an AWS HealthImaging client and list the data stores in your account. This example uses the default settings specified in your shared credentials and config files. :param medical_imaging_client: A Boto3 AWS HealthImaging Client object. """ print("Hello, Amazon Health Imaging! Let's list some of your data stores:\n") try: paginator = medical_imaging_client.get_paginator("list_datastores") page_iterator = paginator.paginate() datastore_summaries = [] for page in page_iterator: datastore_summaries.extend(page["datastoreSummaries"]) print("\tData Stores:") for ds in datastore_summaries: print(f"\t\tDatastore: {ds['datastoreName']} ID {ds['datastoreId']}") except ClientError as err: logger.error( "Couldn't list data stores. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise if __name__ == "__main__": hello_medical_imaging(boto3.client("medical-imaging"))-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 ListDatastores。
注意
查看 GitHub,了解更多信息。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 -
操作
以下代码示例演示如何使用 CopyImageSet。
- 适用于 Python 的 SDK (Boto3)
-
用于复制映像集的实用程序函数。
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def copy_image_set( self, datastore_id, image_set_id, version_id, destination_image_set_id=None, destination_version_id=None, force=False, subsets=[], ): """ Copy an image set. :param datastore_id: The ID of the data store. :param image_set_id: The ID of the image set. :param version_id: The ID of the image set version. :param destination_image_set_id: The ID of the optional destination image set. :param destination_version_id: The ID of the optional destination image set version. :param force: Force the copy. :param subsets: The optional subsets to copy. For example: ["12345678901234567890123456789012"]. :return: The copied image set ID. """ try: copy_image_set_information = { "sourceImageSet": {"latestVersionId": version_id} } if destination_image_set_id and destination_version_id: copy_image_set_information["destinationImageSet"] = { "imageSetId": destination_image_set_id, "latestVersionId": destination_version_id, } if len(subsets) > 0: copySubsetsJson = { "SchemaVersion": "1.1", "Study": {"Series": {"imageSetId": {"Instances": {}}}}, } for subset in subsets: copySubsetsJson["Study"]["Series"]["imageSetId"]["Instances"][ subset ] = {} copy_image_set_information["sourceImageSet"]["DICOMCopies"] = { "copiableAttributes": json.dumps(copySubsetsJson) } copy_results = self.health_imaging_client.copy_image_set( datastoreId=datastore_id, sourceImageSetId=image_set_id, copyImageSetInformation=copy_image_set_information, force=force, ) except ClientError as err: logger.error( "Couldn't copy image set. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return copy_results["destinationImageSetProperties"]["imageSetId"]复制没有目标的映像集。
copy_image_set_information = { "sourceImageSet": {"latestVersionId": version_id} } copy_results = self.health_imaging_client.copy_image_set( datastoreId=datastore_id, sourceImageSetId=image_set_id, copyImageSetInformation=copy_image_set_information, force=force, )复制带有目标的映像集。
copy_image_set_information = { "sourceImageSet": {"latestVersionId": version_id} } if destination_image_set_id and destination_version_id: copy_image_set_information["destinationImageSet"] = { "imageSetId": destination_image_set_id, "latestVersionId": destination_version_id, } copy_results = self.health_imaging_client.copy_image_set( datastoreId=datastore_id, sourceImageSetId=image_set_id, copyImageSetInformation=copy_image_set_information, force=force, )复制映像集的子集。
copy_image_set_information = { "sourceImageSet": {"latestVersionId": version_id} } if len(subsets) > 0: copySubsetsJson = { "SchemaVersion": "1.1", "Study": {"Series": {"imageSetId": {"Instances": {}}}}, } for subset in subsets: copySubsetsJson["Study"]["Series"]["imageSetId"]["Instances"][ subset ] = {} copy_image_set_information["sourceImageSet"]["DICOMCopies"] = { "copiableAttributes": json.dumps(copySubsetsJson) } copy_results = self.health_imaging_client.copy_image_set( datastoreId=datastore_id, sourceImageSetId=image_set_id, copyImageSetInformation=copy_image_set_information, force=force, )以下代码实例化 MedicalImagingWrapper 对象。
client = boto3.client("medical-imaging") medical_imaging_wrapper = MedicalImagingWrapper(client)-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 CopyImageSet。
注意
查看 GitHub,了解更多信息。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 -
以下代码示例演示如何使用 CreateDatastore。
- 适用于 Python 的 SDK (Boto3)
-
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def create_datastore(self, name): """ Create a data store. :param name: The name of the data store to create. :return: The data store ID. """ try: data_store = self.health_imaging_client.create_datastore(datastoreName=name) except ClientError as err: logger.error( "Couldn't create data store %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return data_store["datastoreId"]以下代码实例化 MedicalImagingWrapper 对象。
client = boto3.client("medical-imaging") medical_imaging_wrapper = MedicalImagingWrapper(client)-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 CreateDatastore。
注意
查看 GitHub,了解更多信息。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 -
以下代码示例演示如何使用 DeleteDatastore。
- 适用于 Python 的 SDK (Boto3)
-
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def delete_datastore(self, datastore_id): """ Delete a data store. :param datastore_id: The ID of the data store. """ try: self.health_imaging_client.delete_datastore(datastoreId=datastore_id) except ClientError as err: logger.error( "Couldn't delete data store %s. Here's why: %s: %s", datastore_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise以下代码实例化 MedicalImagingWrapper 对象。
client = boto3.client("medical-imaging") medical_imaging_wrapper = MedicalImagingWrapper(client)-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 DeleteDatastore。
注意
查看 GitHub,了解更多信息。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 -
以下代码示例演示如何使用 DeleteImageSet。
- 适用于 Python 的 SDK (Boto3)
-
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def delete_image_set(self, datastore_id, image_set_id): """ Delete an image set. :param datastore_id: The ID of the data store. :param image_set_id: The ID of the image set. :return: The delete results. """ try: delete_results = self.health_imaging_client.delete_image_set( imageSetId=image_set_id, datastoreId=datastore_id ) except ClientError as err: logger.error( "Couldn't delete image set. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return delete_results以下代码实例化 MedicalImagingWrapper 对象。
client = boto3.client("medical-imaging") medical_imaging_wrapper = MedicalImagingWrapper(client)-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 DeleteImageSet。
注意
查看 GitHub,了解更多信息。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 -
以下代码示例演示如何使用 GetDICOMImportJob。
- 适用于 Python 的 SDK (Boto3)
-
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def get_dicom_import_job(self, datastore_id, job_id): """ Get the properties of a DICOM import job. :param datastore_id: The ID of the data store. :param job_id: The ID of the job. :return: The job properties. """ try: job = self.health_imaging_client.get_dicom_import_job( jobId=job_id, datastoreId=datastore_id ) except ClientError as err: logger.error( "Couldn't get DICOM import job. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return job["jobProperties"]以下代码实例化 MedicalImagingWrapper 对象。
client = boto3.client("medical-imaging") medical_imaging_wrapper = MedicalImagingWrapper(client)-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 GetDICOMImportJob。
注意
查看 GitHub,了解更多信息。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 -
以下代码示例演示如何使用 GetDatastore。
- 适用于 Python 的 SDK (Boto3)
-
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def get_datastore_properties(self, datastore_id): """ Get the properties of a data store. :param datastore_id: The ID of the data store. :return: The data store properties. """ try: data_store = self.health_imaging_client.get_datastore( datastoreId=datastore_id ) except ClientError as err: logger.error( "Couldn't get data store %s. Here's why: %s: %s", id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return data_store["datastoreProperties"]以下代码实例化 MedicalImagingWrapper 对象。
client = boto3.client("medical-imaging") medical_imaging_wrapper = MedicalImagingWrapper(client)-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 GetDatastore。
注意
查看 GitHub,了解更多信息。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 -
以下代码示例演示如何使用 GetImageFrame。
- 适用于 Python 的 SDK (Boto3)
-
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def get_pixel_data( self, file_path_to_write, datastore_id, image_set_id, image_frame_id ): """ Get an image frame's pixel data. :param file_path_to_write: The path to write the image frame's HTJ2K encoded pixel data. :param datastore_id: The ID of the data store. :param image_set_id: The ID of the image set. :param image_frame_id: The ID of the image frame. """ try: image_frame = self.health_imaging_client.get_image_frame( datastoreId=datastore_id, imageSetId=image_set_id, imageFrameInformation={"imageFrameId": image_frame_id}, ) with open(file_path_to_write, "wb") as f: for chunk in image_frame["imageFrameBlob"].iter_chunks(): if chunk: f.write(chunk) except ClientError as err: logger.error( "Couldn't get image frame. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise以下代码实例化 MedicalImagingWrapper 对象。
client = boto3.client("medical-imaging") medical_imaging_wrapper = MedicalImagingWrapper(client)-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 GetImageFrame。
注意
查看 GitHub,了解更多信息。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 -
以下代码示例演示如何使用 GetImageSet。
- 适用于 Python 的 SDK (Boto3)
-
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def get_image_set(self, datastore_id, image_set_id, version_id=None): """ Get the properties of an image set. :param datastore_id: The ID of the data store. :param image_set_id: The ID of the image set. :param version_id: The optional version of the image set. :return: The image set properties. """ try: if version_id: image_set = self.health_imaging_client.get_image_set( imageSetId=image_set_id, datastoreId=datastore_id, versionId=version_id, ) else: image_set = self.health_imaging_client.get_image_set( imageSetId=image_set_id, datastoreId=datastore_id ) except ClientError as err: logger.error( "Couldn't get image set. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return image_set以下代码实例化 MedicalImagingWrapper 对象。
client = boto3.client("medical-imaging") medical_imaging_wrapper = MedicalImagingWrapper(client)-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 GetImageSet。
注意
查看 GitHub,了解更多信息。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 -
以下代码示例演示如何使用 GetImageSetMetadata。
- 适用于 Python 的 SDK (Boto3)
-
用于获取映像集元数据的实用程序函数。
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def get_image_set_metadata( self, metadata_file, datastore_id, image_set_id, version_id=None ): """ Get the metadata of an image set. :param metadata_file: The file to store the JSON gzipped metadata. :param datastore_id: The ID of the data store. :param image_set_id: The ID of the image set. :param version_id: The version of the image set. """ try: if version_id: image_set_metadata = self.health_imaging_client.get_image_set_metadata( imageSetId=image_set_id, datastoreId=datastore_id, versionId=version_id, ) else: image_set_metadata = self.health_imaging_client.get_image_set_metadata( imageSetId=image_set_id, datastoreId=datastore_id ) print(image_set_metadata) with open(metadata_file, "wb") as f: for chunk in image_set_metadata["imageSetMetadataBlob"].iter_chunks(): if chunk: f.write(chunk) except ClientError as err: logger.error( "Couldn't get image metadata. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise获取没有版本的映像集元数据。
image_set_metadata = self.health_imaging_client.get_image_set_metadata( imageSetId=image_set_id, datastoreId=datastore_id )获取带有版本的映像集元数据。
image_set_metadata = self.health_imaging_client.get_image_set_metadata( imageSetId=image_set_id, datastoreId=datastore_id, versionId=version_id, )以下代码实例化 MedicalImagingWrapper 对象。
client = boto3.client("medical-imaging") medical_imaging_wrapper = MedicalImagingWrapper(client)-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 GetImageSetMetadata。
注意
查看 GitHub,了解更多信息。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 -
以下代码示例演示如何使用 ListDICOMImportJobs。
- 适用于 Python 的 SDK (Boto3)
-
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def list_dicom_import_jobs(self, datastore_id): """ List the DICOM import jobs. :param datastore_id: The ID of the data store. :return: The list of jobs. """ try: paginator = self.health_imaging_client.get_paginator( "list_dicom_import_jobs" ) page_iterator = paginator.paginate(datastoreId=datastore_id) job_summaries = [] for page in page_iterator: job_summaries.extend(page["jobSummaries"]) except ClientError as err: logger.error( "Couldn't list DICOM import jobs. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return job_summaries以下代码实例化 MedicalImagingWrapper 对象。
client = boto3.client("medical-imaging") medical_imaging_wrapper = MedicalImagingWrapper(client)-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 ListDICOMImportJobs。
注意
查看 GitHub,了解更多信息。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 -
以下代码示例演示如何使用 ListDatastores。
- 适用于 Python 的 SDK (Boto3)
-
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def list_datastores(self): """ List the data stores. :return: The list of data stores. """ try: paginator = self.health_imaging_client.get_paginator("list_datastores") page_iterator = paginator.paginate() datastore_summaries = [] for page in page_iterator: datastore_summaries.extend(page["datastoreSummaries"]) except ClientError as err: logger.error( "Couldn't list data stores. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return datastore_summaries以下代码实例化 MedicalImagingWrapper 对象。
client = boto3.client("medical-imaging") medical_imaging_wrapper = MedicalImagingWrapper(client)-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 ListDatastores。
注意
查看 GitHub,了解更多信息。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 -
以下代码示例演示如何使用 ListImageSetVersions。
- 适用于 Python 的 SDK (Boto3)
-
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def list_image_set_versions(self, datastore_id, image_set_id): """ List the image set versions. :param datastore_id: The ID of the data store. :param image_set_id: The ID of the image set. :return: The list of image set versions. """ try: paginator = self.health_imaging_client.get_paginator( "list_image_set_versions" ) page_iterator = paginator.paginate( imageSetId=image_set_id, datastoreId=datastore_id ) image_set_properties_list = [] for page in page_iterator: image_set_properties_list.extend(page["imageSetPropertiesList"]) except ClientError as err: logger.error( "Couldn't list image set versions. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return image_set_properties_list以下代码实例化 MedicalImagingWrapper 对象。
client = boto3.client("medical-imaging") medical_imaging_wrapper = MedicalImagingWrapper(client)-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 ListImageSetVersions。
注意
查看 GitHub,了解更多信息。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 -
以下代码示例演示如何使用 ListTagsForResource。
- 适用于 Python 的 SDK (Boto3)
-
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def list_tags_for_resource(self, resource_arn): """ List the tags for a resource. :param resource_arn: The ARN of the resource. :return: The list of tags. """ try: tags = self.health_imaging_client.list_tags_for_resource( resourceArn=resource_arn ) except ClientError as err: logger.error( "Couldn't list tags for resource. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return tags["tags"]以下代码实例化 MedicalImagingWrapper 对象。
client = boto3.client("medical-imaging") medical_imaging_wrapper = MedicalImagingWrapper(client)-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 ListTagsForResource。
注意
查看 GitHub,了解更多信息。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 -
以下代码示例演示如何使用 SearchImageSets。
- 适用于 Python 的 SDK (Boto3)
-
用于搜索映像集的实用程序函数。
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def search_image_sets(self, datastore_id, search_filter): """ Search for image sets. :param datastore_id: The ID of the data store. :param search_filter: The search filter. For example: {"filters" : [{ "operator": "EQUAL", "values": [{"DICOMPatientId": "3524578"}]}]}. :return: The list of image sets. """ try: paginator = self.health_imaging_client.get_paginator("search_image_sets") page_iterator = paginator.paginate( datastoreId=datastore_id, searchCriteria=search_filter ) metadata_summaries = [] for page in page_iterator: metadata_summaries.extend(page["imageSetsMetadataSummaries"]) except ClientError as err: logger.error( "Couldn't search image sets. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return metadata_summaries使用案例 #1:EQUAL 运算符。
search_filter = { "filters": [ {"operator": "EQUAL", "values": [{"DICOMPatientId": patient_id}]} ] } image_sets = self.search_image_sets(data_store_id, search_filter) print(f"Image sets found with EQUAL operator\n{image_sets}")使用案例 #2:使用 DICOMStudyDate 和 DICOMStudyTime 的 BETWEEN 运算符。
search_filter = { "filters": [ { "operator": "BETWEEN", "values": [ { "DICOMStudyDateAndTime": { "DICOMStudyDate": "19900101", "DICOMStudyTime": "000000", } }, { "DICOMStudyDateAndTime": { "DICOMStudyDate": "20230101", "DICOMStudyTime": "000000", } }, ], } ] } image_sets = self.search_image_sets(data_store_id, search_filter) print( f"Image sets found with BETWEEN operator using DICOMStudyDate and DICOMStudyTime\n{image_sets}" )使用案例 #3:使用 createdAt 的 BETWEEN 运算符。时间研究以前一直存在。
search_filter = { "filters": [ { "values": [ { "createdAt": datetime.datetime( 2021, 8, 4, 14, 49, 54, 429000 ) }, { "createdAt": datetime.datetime.now() + datetime.timedelta(days=1) }, ], "operator": "BETWEEN", } ] } recent_image_sets = self.search_image_sets(data_store_id, search_filter) print( f"Image sets found with with BETWEEN operator using createdAt\n{recent_image_sets}" )使用案例 #4:对 DICOMSeriesInstanceUID 使用 EQUAL 运算符,对 updatedAt 使用 BETWEEN,并对 updatedAt 字段按照 ASC 顺序为响应排序。
search_filter = { "filters": [ { "values": [ { "updatedAt": datetime.datetime( 2021, 8, 4, 14, 49, 54, 429000 ) }, { "updatedAt": datetime.datetime.now() + datetime.timedelta(days=1) }, ], "operator": "BETWEEN", }, { "values": [{"DICOMSeriesInstanceUID": series_instance_uid}], "operator": "EQUAL", }, ], "sort": { "sortOrder": "ASC", "sortField": "updatedAt", }, } image_sets = self.search_image_sets(data_store_id, search_filter) print( "Image sets found with EQUAL operator on DICOMSeriesInstanceUID and BETWEEN on updatedAt and" ) print(f"sort response in ASC order on updatedAt field\n{image_sets}")以下代码实例化 MedicalImagingWrapper 对象。
client = boto3.client("medical-imaging") medical_imaging_wrapper = MedicalImagingWrapper(client)-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 SearchImageSets。
注意
查看 GitHub,了解更多信息。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 -
以下代码示例演示如何使用 StartDICOMImportJob。
- 适用于 Python 的 SDK (Boto3)
-
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def start_dicom_import_job( self, job_name, datastore_id, role_arn, input_s3_uri, output_s3_uri ): """ Start a DICOM import job. :param job_name: The name of the job. :param datastore_id: The ID of the data store. :param role_arn: The Amazon Resource Name (ARN) of the role to use for the job. :param input_s3_uri: The S3 bucket input prefix path containing the DICOM files. :param output_s3_uri: The S3 bucket output prefix path for the result. :return: The job ID. """ try: job = self.health_imaging_client.start_dicom_import_job( jobName=job_name, datastoreId=datastore_id, dataAccessRoleArn=role_arn, inputS3Uri=input_s3_uri, outputS3Uri=output_s3_uri, ) except ClientError as err: logger.error( "Couldn't start DICOM import job. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return job["jobId"]以下代码实例化 MedicalImagingWrapper 对象。
client = boto3.client("medical-imaging") medical_imaging_wrapper = MedicalImagingWrapper(client)-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 StartDICOMImportJob。
注意
查看 GitHub,了解更多信息。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 -
以下代码示例演示如何使用 TagResource。
- 适用于 Python 的 SDK (Boto3)
-
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def tag_resource(self, resource_arn, tags): """ Tag a resource. :param resource_arn: The ARN of the resource. :param tags: The tags to apply. """ try: self.health_imaging_client.tag_resource(resourceArn=resource_arn, tags=tags) except ClientError as err: logger.error( "Couldn't tag resource. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise以下代码实例化 MedicalImagingWrapper 对象。
client = boto3.client("medical-imaging") medical_imaging_wrapper = MedicalImagingWrapper(client)-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 TagResource。
注意
查看 GitHub,了解更多信息。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 -
以下代码示例演示如何使用 UntagResource。
- 适用于 Python 的 SDK (Boto3)
-
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def untag_resource(self, resource_arn, tag_keys): """ Untag a resource. :param resource_arn: The ARN of the resource. :param tag_keys: The tag keys to remove. """ try: self.health_imaging_client.untag_resource( resourceArn=resource_arn, tagKeys=tag_keys ) except ClientError as err: logger.error( "Couldn't untag resource. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise以下代码实例化 MedicalImagingWrapper 对象。
client = boto3.client("medical-imaging") medical_imaging_wrapper = MedicalImagingWrapper(client)-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 UntagResource。
注意
查看 GitHub,了解更多信息。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 -
以下代码示例演示如何使用 UpdateImageSetMetadata。
- 适用于 Python 的 SDK (Boto3)
-
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def update_image_set_metadata( self, datastore_id, image_set_id, version_id, metadata, force=False ): """ Update the metadata of an image set. :param datastore_id: The ID of the data store. :param image_set_id: The ID of the image set. :param version_id: The ID of the image set version. :param metadata: The image set metadata as a dictionary. For example {"DICOMUpdates": {"updatableAttributes": "{\"SchemaVersion\":1.1,\"Patient\":{\"DICOM\":{\"PatientName\":\"Garcia^Gloria\"}}}"}} :param: force: Force the update. :return: The updated image set metadata. """ try: updated_metadata = self.health_imaging_client.update_image_set_metadata( imageSetId=image_set_id, datastoreId=datastore_id, latestVersionId=version_id, updateImageSetMetadataUpdates=metadata, force=force, ) except ClientError as err: logger.error( "Couldn't update image set metadata. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return updated_metadata以下代码实例化 MedicalImagingWrapper 对象。
client = boto3.client("medical-imaging") medical_imaging_wrapper = MedicalImagingWrapper(client)使用案例 #1:插入或更新属性。
attributes = """{ "SchemaVersion": 1.1, "Study": { "DICOM": { "StudyDescription": "CT CHEST" } } }""" metadata = {"DICOMUpdates": {"updatableAttributes": attributes}} self.update_image_set_metadata( data_store_id, image_set_id, version_id, metadata, force )使用案例 #2:移除属性。
# Attribute key and value must match the existing attribute. attributes = """{ "SchemaVersion": 1.1, "Study": { "DICOM": { "StudyDescription": "CT CHEST" } } }""" metadata = {"DICOMUpdates": {"removableAttributes": attributes}} self.update_image_set_metadata( data_store_id, image_set_id, version_id, metadata, force )使用案例 #3:移除实例。
attributes = """{ "SchemaVersion": 1.1, "Study": { "Series": { "1.1.1.1.1.1.12345.123456789012.123.12345678901234.1": { "Instances": { "1.1.1.1.1.1.12345.123456789012.123.12345678901234.1": {} } } } } }""" metadata = {"DICOMUpdates": {"removableAttributes": attributes}} self.update_image_set_metadata( data_store_id, image_set_id, version_id, metadata, force )使用案例 #4:恢复到早期版本。
metadata = {"revertToVersionId": "1"} self.update_image_set_metadata( data_store_id, image_set_id, version_id, metadata, force )-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 UpdateImageSetMetadata。
注意
查看 GitHub,了解更多信息。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 -
场景
以下代码示例演示如何在 HealthImaging 中导入 DICOM 文件和下载映像帧。
该实现构造为命令行应用程序。
设置 DICOM 导入的资源。
将 DICOM 文件导入数据存储中。
检索导入任务的影像集 ID。
检索影像集的影像帧 ID。
下载、解码并验证影像帧。
清理资源。
- 适用于 Python 的 SDK (Boto3)
-
使用必要的资源创建 CloudFormation 堆栈。
def deploy(self): """ Deploys prerequisite resources used by the scenario. The resources are defined in the associated `setup.yaml` AWS CloudFormation script and are deployed as a CloudFormation stack, so they can be easily managed and destroyed. """ print("\t\tLet's deploy the stack for resource creation.") stack_name = q.ask("\t\tEnter a name for the stack: ", q.non_empty) data_store_name = q.ask( "\t\tEnter a name for the Health Imaging Data Store: ", q.non_empty ) account_id = boto3.client("sts").get_caller_identity()["Account"] with open( "../../../../scenarios/features/healthimaging_image_sets/resources/cfn_template.yaml" ) as setup_file: setup_template = setup_file.read() print(f"\t\tCreating {stack_name}.") stack = self.cf_resource.create_stack( StackName=stack_name, TemplateBody=setup_template, Capabilities=["CAPABILITY_NAMED_IAM"], Parameters=[ { "ParameterKey": "datastoreName", "ParameterValue": data_store_name, }, { "ParameterKey": "userAccountID", "ParameterValue": account_id, }, ], ) print("\t\tWaiting for stack to deploy. This typically takes a minute or two.") waiter = self.cf_resource.meta.client.get_waiter("stack_create_complete") waiter.wait(StackName=stack.name) stack.load() print(f"\t\tStack status: {stack.stack_status}") outputs_dictionary = { output["OutputKey"]: output["OutputValue"] for output in stack.outputs } self.input_bucket_name = outputs_dictionary["BucketName"] self.output_bucket_name = outputs_dictionary["BucketName"] self.role_arn = outputs_dictionary["RoleArn"] self.data_store_id = outputs_dictionary["DatastoreID"] return stack将 DICOM 文件复制到 Amazon S3 导入桶。
def copy_single_object(self, key, source_bucket, target_bucket, target_directory): """ Copies a single object from a source to a target bucket. :param key: The key of the object to copy. :param source_bucket: The source bucket for the copy. :param target_bucket: The target bucket for the copy. :param target_directory: The target directory for the copy. """ new_key = target_directory + "/" + key copy_source = {"Bucket": source_bucket, "Key": key} self.s3_client.copy_object( CopySource=copy_source, Bucket=target_bucket, Key=new_key ) print(f"\n\t\tCopying {key}.") def copy_images( self, source_bucket, source_directory, target_bucket, target_directory ): """ Copies the images from the source to the target bucket using multiple threads. :param source_bucket: The source bucket for the images. :param source_directory: Directory within the source bucket. :param target_bucket: The target bucket for the images. :param target_directory: Directory within the target bucket. """ # Get list of all objects in source bucket. list_response = self.s3_client.list_objects_v2( Bucket=source_bucket, Prefix=source_directory ) objs = list_response["Contents"] keys = [obj["Key"] for obj in objs] # Copy the objects in the bucket. for key in keys: self.copy_single_object(key, source_bucket, target_bucket, target_directory) print("\t\tDone copying all objects.")将 DICOM 文件导入 Amazon S3 数据存储。
class MedicalImagingWrapper: """Encapsulates AWS HealthImaging functionality.""" def __init__(self, medical_imaging_client, s3_client): """ :param medical_imaging_client: A Boto3 Amazon MedicalImaging client. :param s3_client: A Boto3 S3 client. """ self.medical_imaging_client = medical_imaging_client self.s3_client = s3_client @classmethod def from_client(cls): medical_imaging_client = boto3.client("medical-imaging") s3_client = boto3.client("s3") return cls(medical_imaging_client, s3_client) def start_dicom_import_job( self, data_store_id, input_bucket_name, input_directory, output_bucket_name, output_directory, role_arn, ): """ Routine which starts a HealthImaging import job. :param data_store_id: The HealthImaging data store ID. :param input_bucket_name: The name of the Amazon S3 bucket containing the DICOM files. :param input_directory: The directory in the S3 bucket containing the DICOM files. :param output_bucket_name: The name of the S3 bucket for the output. :param output_directory: The directory in the S3 bucket to store the output. :param role_arn: The ARN of the IAM role with permissions for the import. :return: The job ID of the import. """ input_uri = f"s3://{input_bucket_name}/{input_directory}/" output_uri = f"s3://{output_bucket_name}/{output_directory}/" try: job = self.medical_imaging_client.start_dicom_import_job( jobName="examplejob", datastoreId=data_store_id, dataAccessRoleArn=role_arn, inputS3Uri=input_uri, outputS3Uri=output_uri, ) except ClientError as err: logger.error( "Couldn't start DICOM import job. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return job["jobId"]获取由 DICOM 导入任务创建的影像集。
class MedicalImagingWrapper: """Encapsulates AWS HealthImaging functionality.""" def __init__(self, medical_imaging_client, s3_client): """ :param medical_imaging_client: A Boto3 Amazon MedicalImaging client. :param s3_client: A Boto3 S3 client. """ self.medical_imaging_client = medical_imaging_client self.s3_client = s3_client @classmethod def from_client(cls): medical_imaging_client = boto3.client("medical-imaging") s3_client = boto3.client("s3") return cls(medical_imaging_client, s3_client) def get_image_sets_for_dicom_import_job(self, datastore_id, import_job_id): """ Retrieves the image sets created for an import job. :param datastore_id: The HealthImaging data store ID :param import_job_id: The import job ID :return: List of image set IDs """ import_job = self.medical_imaging_client.get_dicom_import_job( datastoreId=datastore_id, jobId=import_job_id ) output_uri = import_job["jobProperties"]["outputS3Uri"] bucket = output_uri.split("/")[2] key = "/".join(output_uri.split("/")[3:]) # Try to get the manifest. retries = 3 while retries > 0: try: obj = self.s3_client.get_object( Bucket=bucket, Key=key + "job-output-manifest.json" ) body = obj["Body"] break except ClientError as error: retries = retries - 1 time.sleep(3) try: data = json.load(body) expression = jmespath.compile("jobSummary.imageSetsSummary[].imageSetId") image_sets = expression.search(data) except json.decoder.JSONDecodeError as error: image_sets = import_job["jobProperties"] return image_sets def get_image_set(self, datastore_id, image_set_id, version_id=None): """ Get the properties of an image set. :param datastore_id: The ID of the data store. :param image_set_id: The ID of the image set. :param version_id: The optional version of the image set. :return: The image set properties. """ try: if version_id: image_set = self.medical_imaging_client.get_image_set( imageSetId=image_set_id, datastoreId=datastore_id, versionId=version_id, ) else: image_set = self.medical_imaging_client.get_image_set( imageSetId=image_set_id, datastoreId=datastore_id ) except ClientError as err: logger.error( "Couldn't get image set. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return image_set获取影像集的影像帧信息。
class MedicalImagingWrapper: """Encapsulates AWS HealthImaging functionality.""" def __init__(self, medical_imaging_client, s3_client): """ :param medical_imaging_client: A Boto3 Amazon MedicalImaging client. :param s3_client: A Boto3 S3 client. """ self.medical_imaging_client = medical_imaging_client self.s3_client = s3_client @classmethod def from_client(cls): medical_imaging_client = boto3.client("medical-imaging") s3_client = boto3.client("s3") return cls(medical_imaging_client, s3_client) def get_image_frames_for_image_set(self, datastore_id, image_set_id, out_directory): """ Get the image frames for an image set. :param datastore_id: The ID of the data store. :param image_set_id: The ID of the image set. :param out_directory: The directory to save the file. :return: The image frames. """ image_frames = [] file_name = os.path.join(out_directory, f"{image_set_id}_metadata.json.gzip") file_name = file_name.replace("/", "\\\\") self.get_image_set_metadata(file_name, datastore_id, image_set_id) try: with gzip.open(file_name, "rb") as f_in: doc = json.load(f_in) instances = jmespath.search("Study.Series.*.Instances[].*[]", doc) for instance in instances: rescale_slope = jmespath.search("DICOM.RescaleSlope", instance) rescale_intercept = jmespath.search("DICOM.RescaleIntercept", instance) image_frames_json = jmespath.search("ImageFrames[][]", instance) for image_frame in image_frames_json: checksum_json = jmespath.search( "max_by(PixelDataChecksumFromBaseToFullResolution, &Width)", image_frame, ) image_frame_info = { "imageSetId": image_set_id, "imageFrameId": image_frame["ID"], "rescaleIntercept": rescale_intercept, "rescaleSlope": rescale_slope, "minPixelValue": image_frame["MinPixelValue"], "maxPixelValue": image_frame["MaxPixelValue"], "fullResolutionChecksum": checksum_json["Checksum"], } image_frames.append(image_frame_info) return image_frames except TypeError: return {} except ClientError as err: logger.error( "Couldn't get image frames for image set. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise return image_frames def get_image_set_metadata( self, metadata_file, datastore_id, image_set_id, version_id=None ): """ Get the metadata of an image set. :param metadata_file: The file to store the JSON gzipped metadata. :param datastore_id: The ID of the data store. :param image_set_id: The ID of the image set. :param version_id: The version of the image set. """ try: if version_id: image_set_metadata = self.medical_imaging_client.get_image_set_metadata( imageSetId=image_set_id, datastoreId=datastore_id, versionId=version_id, ) else: image_set_metadata = self.medical_imaging_client.get_image_set_metadata( imageSetId=image_set_id, datastoreId=datastore_id ) with open(metadata_file, "wb") as f: for chunk in image_set_metadata["imageSetMetadataBlob"].iter_chunks(): if chunk: f.write(chunk) except ClientError as err: logger.error( "Couldn't get image metadata. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise下载、解码并验证映像帧。
class MedicalImagingWrapper: """Encapsulates AWS HealthImaging functionality.""" def __init__(self, medical_imaging_client, s3_client): """ :param medical_imaging_client: A Boto3 Amazon MedicalImaging client. :param s3_client: A Boto3 S3 client. """ self.medical_imaging_client = medical_imaging_client self.s3_client = s3_client @classmethod def from_client(cls): medical_imaging_client = boto3.client("medical-imaging") s3_client = boto3.client("s3") return cls(medical_imaging_client, s3_client) def get_pixel_data( self, file_path_to_write, datastore_id, image_set_id, image_frame_id ): """ Get an image frame's pixel data. :param file_path_to_write: The path to write the image frame's HTJ2K encoded pixel data. :param datastore_id: The ID of the data store. :param image_set_id: The ID of the image set. :param image_frame_id: The ID of the image frame. """ try: image_frame = self.medical_imaging_client.get_image_frame( datastoreId=datastore_id, imageSetId=image_set_id, imageFrameInformation={"imageFrameId": image_frame_id}, ) with open(file_path_to_write, "wb") as f: for chunk in image_frame["imageFrameBlob"].iter_chunks(): f.write(chunk) except ClientError as err: logger.error( "Couldn't get image frame. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def download_decode_and_check_image_frames( self, data_store_id, image_frames, out_directory ): """ Downloads image frames, decodes them, and uses the checksum to validate the decoded images. :param data_store_id: The HealthImaging data store ID. :param image_frames: A list of dicts containing image frame information. :param out_directory: A directory for the downloaded images. :return: True if the function succeeded; otherwise, False. """ total_result = True for image_frame in image_frames: image_file_path = f"{out_directory}/image_{image_frame['imageFrameId']}.jph" self.get_pixel_data( image_file_path, data_store_id, image_frame["imageSetId"], image_frame["imageFrameId"], ) image_array = self.jph_image_to_opj_bitmap(image_file_path) crc32_checksum = image_frame["fullResolutionChecksum"] # Verify checksum. crc32_calculated = zlib.crc32(image_array) image_result = crc32_checksum == crc32_calculated print( f"\t\tImage checksum verified for {image_frame['imageFrameId']}: {image_result }" ) total_result = total_result and image_result return total_result @staticmethod def jph_image_to_opj_bitmap(jph_file): """ Decode the image to a bitmap using an OPENJPEG library. :param jph_file: The file to decode. :return: The decoded bitmap as an array. """ # Use format 2 for the JPH file. params = openjpeg.utils.get_parameters(jph_file, 2) print(f"\n\t\tImage parameters for {jph_file}: \n\t\t{params}") image_array = openjpeg.utils.decode(jph_file, 2) return image_array清理资源。
def destroy(self, stack): """ Destroys the resources managed by the CloudFormation stack, and the CloudFormation stack itself. :param stack: The CloudFormation stack that manages the example resources. """ print(f"\t\tCleaning up resources and {stack.name}.") data_store_id = None for oput in stack.outputs: if oput["OutputKey"] == "DatastoreID": data_store_id = oput["OutputValue"] if data_store_id is not None: print(f"\t\tDeleting image sets in data store {data_store_id}.") image_sets = self.medical_imaging_wrapper.search_image_sets( data_store_id, {} ) image_set_ids = [image_set["imageSetId"] for image_set in image_sets] for image_set_id in image_set_ids: self.medical_imaging_wrapper.delete_image_set( data_store_id, image_set_id ) print(f"\t\tDeleted image set with id : {image_set_id}") print(f"\t\tDeleting {stack.name}.") stack.delete() print("\t\tWaiting for stack removal. This may take a few minutes.") waiter = self.cf_resource.meta.client.get_waiter("stack_delete_complete") waiter.wait(StackName=stack.name) print("\t\tStack delete complete.") class MedicalImagingWrapper: """Encapsulates AWS HealthImaging functionality.""" def __init__(self, medical_imaging_client, s3_client): """ :param medical_imaging_client: A Boto3 Amazon MedicalImaging client. :param s3_client: A Boto3 S3 client. """ self.medical_imaging_client = medical_imaging_client self.s3_client = s3_client @classmethod def from_client(cls): medical_imaging_client = boto3.client("medical-imaging") s3_client = boto3.client("s3") return cls(medical_imaging_client, s3_client) def search_image_sets(self, datastore_id, search_filter): """ Search for image sets. :param datastore_id: The ID of the data store. :param search_filter: The search filter. For example: {"filters" : [{ "operator": "EQUAL", "values": [{"DICOMPatientId": "3524578"}]}]}. :return: The list of image sets. """ try: paginator = self.medical_imaging_client.get_paginator("search_image_sets") page_iterator = paginator.paginate( datastoreId=datastore_id, searchCriteria=search_filter ) metadata_summaries = [] for page in page_iterator: metadata_summaries.extend(page["imageSetsMetadataSummaries"]) except ClientError as err: logger.error( "Couldn't search image sets. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return metadata_summaries def delete_image_set(self, datastore_id, image_set_id): """ Delete an image set. :param datastore_id: The ID of the data store. :param image_set_id: The ID of the image set. """ try: delete_results = self.medical_imaging_client.delete_image_set( imageSetId=image_set_id, datastoreId=datastore_id ) except ClientError as err: logger.error( "Couldn't delete image set. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API Reference》中的以下主题。
注意
查看 GitHub,了解更多信息。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 -
以下代码示例演示了如何标记 HealthImaging 数据存储。
- 适用于 Python 的 SDK (Boto3)
-
标记数据存储。
a_data_store_arn = "arn:aws:medical-imaging:us-east-1:123456789012:datastore/12345678901234567890123456789012" medical_imaging_wrapper.tag_resource(data_store_arn, {"Deployment": "Development"})用于标记资源的实用程序函数。
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def tag_resource(self, resource_arn, tags): """ Tag a resource. :param resource_arn: The ARN of the resource. :param tags: The tags to apply. """ try: self.health_imaging_client.tag_resource(resourceArn=resource_arn, tags=tags) except ClientError as err: logger.error( "Couldn't tag resource. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise列出数据存储的标签。
a_data_store_arn = "arn:aws:medical-imaging:us-east-1:123456789012:datastore/12345678901234567890123456789012" medical_imaging_wrapper.list_tags_for_resource(data_store_arn)用于列出资源标签的实用程序函数。
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def list_tags_for_resource(self, resource_arn): """ List the tags for a resource. :param resource_arn: The ARN of the resource. :return: The list of tags. """ try: tags = self.health_imaging_client.list_tags_for_resource( resourceArn=resource_arn ) except ClientError as err: logger.error( "Couldn't list tags for resource. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return tags["tags"]取消标记数据存储。
a_data_store_arn = "arn:aws:medical-imaging:us-east-1:123456789012:datastore/12345678901234567890123456789012" medical_imaging_wrapper.untag_resource(data_store_arn, ["Deployment"])用于取消标记资源的实用程序函数。
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def untag_resource(self, resource_arn, tag_keys): """ Untag a resource. :param resource_arn: The ARN of the resource. :param tag_keys: The tag keys to remove. """ try: self.health_imaging_client.untag_resource( resourceArn=resource_arn, tagKeys=tag_keys ) except ClientError as err: logger.error( "Couldn't untag resource. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise以下代码实例化 MedicalImagingWrapper 对象。
client = boto3.client("medical-imaging") medical_imaging_wrapper = MedicalImagingWrapper(client)-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API Reference》中的以下主题。
注意
查看 GitHub,了解更多信息。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 -
以下代码示例演示了如何标记 HealthImaging 图像集。
- 适用于 Python 的 SDK (Boto3)
-
标记映像集。
an_image_set_arn = ( "arn:aws:medical-imaging:us-east-1:123456789012:datastore/12345678901234567890123456789012/" "imageset/12345678901234567890123456789012" ) medical_imaging_wrapper.tag_resource(image_set_arn, {"Deployment": "Development"})用于标记资源的实用程序函数。
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def tag_resource(self, resource_arn, tags): """ Tag a resource. :param resource_arn: The ARN of the resource. :param tags: The tags to apply. """ try: self.health_imaging_client.tag_resource(resourceArn=resource_arn, tags=tags) except ClientError as err: logger.error( "Couldn't tag resource. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise列出映像集的标签。
an_image_set_arn = ( "arn:aws:medical-imaging:us-east-1:123456789012:datastore/12345678901234567890123456789012/" "imageset/12345678901234567890123456789012" ) medical_imaging_wrapper.list_tags_for_resource(image_set_arn)用于列出资源标签的实用程序函数。
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def list_tags_for_resource(self, resource_arn): """ List the tags for a resource. :param resource_arn: The ARN of the resource. :return: The list of tags. """ try: tags = self.health_imaging_client.list_tags_for_resource( resourceArn=resource_arn ) except ClientError as err: logger.error( "Couldn't list tags for resource. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return tags["tags"]取消标记映像集。
an_image_set_arn = ( "arn:aws:medical-imaging:us-east-1:123456789012:datastore/12345678901234567890123456789012/" "imageset/12345678901234567890123456789012" ) medical_imaging_wrapper.untag_resource(image_set_arn, ["Deployment"])用于取消标记资源的实用程序函数。
class MedicalImagingWrapper: def __init__(self, health_imaging_client): self.health_imaging_client = health_imaging_client def untag_resource(self, resource_arn, tag_keys): """ Untag a resource. :param resource_arn: The ARN of the resource. :param tag_keys: The tag keys to remove. """ try: self.health_imaging_client.untag_resource( resourceArn=resource_arn, tagKeys=tag_keys ) except ClientError as err: logger.error( "Couldn't untag resource. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise以下代码实例化 MedicalImagingWrapper 对象。
client = boto3.client("medical-imaging") medical_imaging_wrapper = MedicalImagingWrapper(client)-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API Reference》中的以下主题。
注意
查看 GitHub,了解更多信息。在 AWS 代码示例存储库
中查找完整示例,了解如何进行设置和运行。 -