使用 AWS SDK 获取 Amazon Glacier 档案内容并删除档案 - Amazon Glacier

此页面仅适用于使用文件库和 2012 年原始 REST API 的 Amazon Glacier 服务的现有客户。

如果您正在寻找归档存储解决方案,建议使用 Amazon S3 中的 Amazon Glacier 存储类别 S3 Glacier Instant Retrieval、S3 Glacier Flexible Retrieval 和 S3 Glacier Deep Archive。要了解有关这些存储选项的更多信息,请参阅 Amazon Glacier 存储类别

从 2025 年 12 月 15 日起,Amazon Glacier(最初基于独立文件库的服务)将不再接受新客户,对现有客户不存在任何影响。Amazon Glacier 是一项独立服务,拥有自己的 API,可将数据存储在文件库中,与 Amazon S3 和 Amazon S3 Glacier 存储类别不同。在 Amazon Glacier 中,您现有的数据将确保安全,并且可以无限期地访问。无需进行迁移。对于低成本、长期的存档存储,AWS 建议使用 Amazon S3 Glacier 存储类别,该类别可通过基于 S3 存储桶的 API、完全的 AWS 区域可用性、更低的成本和 AWS 服务集成,提供卓越的客户体验。如果您希望加强功能,可以考虑使用我们的 AWS 将数据从 Amazon Glacier 文件库传输到 Amazon S3 Glacier 存储类别的解决方案指南,迁移到 Amazon S3 Glacier 存储类别。

使用 AWS SDK 获取 Amazon Glacier 档案内容并删除档案

以下代码示例展示了如何:

  • 列出 Amazon Glacier 文件库的任务并获取任务状态。

  • 获取已完成的档案检索任务的输出。

  • 删除档案。

  • 删除文件库。

Python
适用于 Python 的 SDK(Boto3)
注意

查看 GitHub,了解更多信息。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

创建一个包装 Amazon Glacier 操作的类。

import argparse import logging import os import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) class GlacierWrapper: """Encapsulates Amazon S3 Glacier API operations.""" def __init__(self, glacier_resource): """ :param glacier_resource: A Boto3 Amazon S3 Glacier resource. """ self.glacier_resource = glacier_resource @staticmethod def list_jobs(vault, job_type): """ Lists jobs by type for the specified vault. :param vault: The vault to query. :param job_type: The type of job to list. :return: The list of jobs of the requested type. """ job_list = [] try: if job_type == "all": jobs = vault.jobs.all() elif job_type == "in_progress": jobs = vault.jobs_in_progress.all() elif job_type == "completed": jobs = vault.completed_jobs.all() elif job_type == "succeeded": jobs = vault.succeeded_jobs.all() elif job_type == "failed": jobs = vault.failed_jobs.all() else: jobs = [] logger.warning("%s isn't a type of job I can get.", job_type) for job in jobs: job_list.append(job) logger.info("Got %s %s job %s.", job_type, job.action, job.id) except ClientError: logger.exception("Couldn't get %s jobs from %s.", job_type, vault.name) raise else: return job_list @staticmethod def get_job_output(job): """ Gets the output of a job, such as a vault inventory or the contents of an archive. :param job: The job to get output from. :return: The job output, in bytes. """ try: response = job.get_output() out_bytes = response["body"].read() logger.info("Read %s bytes from job %s.", len(out_bytes), job.id) if "archiveDescription" in response: logger.info( "These bytes are described as '%s'", response["archiveDescription"] ) except ClientError: logger.exception("Couldn't get output for job %s.", job.id) raise else: return out_bytes @staticmethod def delete_archive(archive): """ Deletes an archive from a vault. :param archive: The archive to delete. """ try: archive.delete() logger.info( "Deleted archive %s from vault %s.", archive.id, archive.vault_name ) except ClientError: logger.exception("Couldn't delete archive %s.", archive.id) raise @staticmethod def delete_vault(vault): """ Deletes a vault. :param vault: The vault to delete. """ try: vault.delete() logger.info("Deleted vault %s.", vault.name) except ClientError: logger.exception("Couldn't delete vault %s.", vault.name) raise

调用包装器类上的函数,以从已完成的任务中获取档案内容,然后删除档案。

def retrieve_demo(glacier, vault_name): """ Shows how to: * List jobs for a vault and get job status. * Get the output of a completed archive retrieval job. * Delete an archive. * Delete a vault. :param glacier: A Boto3 Amazon S3 Glacier resource. :param vault_name: The name of the vault to query for jobs. """ vault = glacier.glacier_resource.Vault("-", vault_name) try: vault.load() except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": print( f"\nVault {vault_name} doesn't exist. You must first run this script " f"with the --upload flag to create the vault." ) return else: raise print(f"\nGetting completed jobs for {vault.name}.") jobs = glacier.list_jobs(vault, "completed") if not jobs: print("\nNo completed jobs found. Give it some time and try again later.") return retrieval_job = None for job in jobs: if job.action == "ArchiveRetrieval" and job.status_code == "Succeeded": retrieval_job = job break if retrieval_job is None: print( "\nNo ArchiveRetrieval jobs found. Give it some time and try again " "later." ) return print(f"\nGetting output from job {retrieval_job.id}.") archive_bytes = glacier.get_job_output(retrieval_job) archive_str = archive_bytes.decode("utf-8") print("\nGot archive data. Printing the first 10 lines.") print(os.linesep.join(archive_str.split(os.linesep)[:10])) print(f"\nDeleting the archive from {vault.name}.") archive = glacier.glacier_resource.Archive( "-", vault.name, retrieval_job.archive_id ) glacier.delete_archive(archive) print(f"\nDeleting {vault.name}.") glacier.delete_vault(vault)

有关 AWS SDK 开发人员指南和代码示例的完整列表,请参阅将 Amazon Glacier 与 AWS SDK 结合使用。本主题还包括有关入门的信息以及有关先前的 SDK 版本的详细信息。