使用 Python 并行处理下载大型档案 - Amazon Glacier

此页面仅适用于使用 Vaults 和 2012 年原始 REST API 的 Amazon Glacier 服务的现有客户。

如果您正在寻找档案存储解决方案,我们建议您在亚马逊 S3、S3 Glacier 即时检索、S3 Glacier 灵活检索和 S3 Glacier Deep Archive Deep Archive 中使用 Amazon Glacier 存储类。要了解有关这些存储选项的更多信息,请参阅 Amazon Glacier 存储类别

从 2025 年 12 月 15 日起,Amazon Glacier(最初基于保管库的独立服务)将不再接受新客户,对现有客户不产生任何影响。Amazon Glacier 是一项独立的服务 APIs ,拥有自己的服务,可将数据存储在文件库中,不同于亚马逊 S3 和 Amazon S3 Glacier 存储类别。在 Amazon Glacier 中,您的现有数据将保持安全且可以无限期地访问。无需迁移。对于低成本、长期的存档存储, AWS 建议使用 Amazon S3 Glacier 存储类别,这些存储类别基于S3存储桶 APIs、完全 AWS 区域 可用性、更低的成本和 AWS 服务集成,可提供卓越的客户体验。如果您想要增强功能,可以考虑使用我们的AWS 解决方案指南迁移到 Amazon S3 Glacier 存储类别,将数据从 Amazon Glacier 文件库传输到 Amazon S3 Glacier 存储类

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Python 并行处理下载大型档案

本主题介绍如何使用 Python 的并行处理从 Amazon S3 Glacier (S3 Glacier) 下载大型档案。这种方法允许您通过将任何大小的档案分解成可以独立处理的较小片段来可靠地下载它们。

概览

此示例中提供的 Python 脚本执行以下任务:

  1. 为通知设置必要的 AWS 资源(亚马逊 SNS 主题和亚马逊 SQS 队列)

  2. 使用 Amazon Glacier 启动档案取回任务

  3. 监控 Amazon SQS 队列中是否有任务完成通知

  4. 将大型存档拆分为可管理的块

  5. 使用多个工作线程并行下载区块

  6. 将每个区块保存到磁盘以备日后重新组装

先决条件

在开始之前,请确保你有:

  • 已安装 Python 3.6 或更高版本

  • AWS 已安装适用于 Python 的 SDK (Boto3)

  • AWS 为亚马逊 Glacier、Amazon SNS 和亚马逊 SQS 配置了相应权限的证书

  • 足够的磁盘空间来存储下载的存档块

示例:使用 Python 并行处理下载档案

以下 Python 脚本演示了如何使用并行处理从 Amazon Glacier 下载大型档案:

import boto3 import time import json import jmespath import re import concurrent.futures import os output_file_path = "output_directory_path" vault_name = "vault_name" chunk_size = 1000000000 #1gb - size of chunks for parallel download. notify_queue_name = 'GlacierJobCompleteNotifyQueue' # SQS queue for Glacier recall notification chunk_download_queue_name='GlacierChunkReadyNotifyQueue' # SQS queue for chunks sns_topic_name = 'GlacierRecallJobCompleted' # the SNS topic to be notified when Glacier archive is restored. chunk_queue_visibility_timeout = 7200 # 2 hours - this may need to be adjusted. region = 'us-east-1' archive_id = "archive_id_to_restore" retrieve_archive = True # set to false if you do not want to restore from Glacier - useful for restarting or parallel processing of the chunk queue. workers = 12 # the number of parallel worker threads for downloading chunks. def setup_queues_and_topic(): sqs = boto3.client('sqs') sns = boto3.client('sns') # Create the SNS topic topic_response = sns.create_topic( Name=sns_topic_name ) topic_arn = topic_response['TopicArn'] print("Creating the SNS topic " + topic_arn) # Create the notification queue notify_queue_response = sqs.create_queue( QueueName=notify_queue_name, Attributes={ 'VisibilityTimeout': '300', # 5 minutes 'ReceiveMessageWaitTimeSeconds': '20' # Enable long polling } ) notify_queue_url = notify_queue_response['QueueUrl'] print("Creating the archive-retrieval notification queue " + notify_queue_url) # Create the chunk download queue chunk_queue_response = sqs.create_queue( QueueName=chunk_download_queue_name, Attributes={ 'VisibilityTimeout': str(chunk_queue_visibility_timeout), # 5 minutes 'ReceiveMessageWaitTimeSeconds': '0' } ) chunk_queue_url = chunk_queue_response['QueueUrl'] print("Creating the chunk ready notification queue " + chunk_queue_url) # Get the ARN for the notification queue notify_queue_attributes = sqs.get_queue_attributes( QueueUrl=notify_queue_url, AttributeNames=['QueueArn'] ) notify_queue_arn = notify_queue_attributes['Attributes']['QueueArn'] # Set up the SNS topic policy on the notification queue queue_policy = { "Version": "2012-10-17", "Statement": [{ "Sid": "allow-sns-messages", "Effect": "Allow", "Principal": {"AWS": "*"}, "Action": "SQS:SendMessage", "Resource": notify_queue_arn, "Condition": { "ArnEquals": { "aws:SourceArn": topic_arn } } }] } # Set the queue policy sqs.set_queue_attributes( QueueUrl=notify_queue_url, Attributes={ 'Policy': json.dumps(queue_policy) } ) # Subscribe the notification queue to the SNS topic sns.subscribe( TopicArn=topic_arn, Protocol='sqs', Endpoint=notify_queue_arn ) return { 'topic_arn': topic_arn, 'notify_queue_url': notify_queue_url, 'chunk_queue_url': chunk_queue_url } def split_and_send_chunks(archive_size, job_id,chunk_queue_url): ranges = [] current = 0 chunk_number = 0 while current < archive_size: chunk_number += 1 next_range = min(current + chunk_size - 1, archive_size - 1) ranges.append((current, next_range, chunk_number)) current = next_range + 1 # Send messages to SQS queue for start, end, chunk_number in ranges: body = {"start": start, "end": end, "job_id": job_id, "chunk_number": chunk_number} body = json.dumps(body) print("Sending SQS message for range:" + str(body)) response = sqs.send_message( QueueUrl=chunk_queue_url, MessageBody=str(body) ) def GetJobOutputChunks(job_id, byterange, chunk_number): glacier = boto3.client('glacier') response = glacier.get_job_output( vaultName=vault_name, jobId=job_id, range=byterange, ) with open(os.path.join(output_file_path,str(chunk_number)+".chunk"), 'wb') as output_file: output_file.write(response['body'].read()) return response def ReceiveArchiveReadyMessages(notify_queue_url,chunk_queue_url): response = sqs.receive_message( QueueUrl=notify_queue_url, AttributeNames=['All'], MaxNumberOfMessages=1, WaitTimeSeconds=20, MessageAttributeNames=['Message'] ) print("Polling archive retrieval job ready queue...") # Checking that there is a Messages key before proceeding. No 'Messages' key likely means the queue is empty if 'Messages' in response: print("Received a message from the archive retrieval job queue") jsonresponse = response # Loading the string into JSON and checking that ArchiveSizeInBytes key is present before continuing. jsonresponse=json.loads(jsonresponse['Messages'][0]['Body']) jsonresponse=json.loads(jsonresponse['Message']) if 'ArchiveSizeInBytes' in jsonresponse: receipt_handle = response['Messages'][0]['ReceiptHandle'] if jsonresponse['ArchiveSizeInBytes']: archive_size = jsonresponse['ArchiveSizeInBytes'] print(f'Received message: {response}') if archive_size > chunk_size: split_and_send_chunks(archive_size, jsonresponse['JobId'],chunk_queue_url) sqs.delete_message( QueueUrl=notify_queue_url, ReceiptHandle=receipt_handle) else: print("No ArchiveSizeInBytes value found in message") print(response) else: print('No messages available in the queue at this time.') time.sleep(1) def ReceiveArchiveChunkMessages(chunk_queue_url): response = sqs.receive_message( QueueUrl=chunk_queue_url, AttributeNames=['All'], MaxNumberOfMessages=1, WaitTimeSeconds=0, MessageAttributeNames=['Message'] ) print("Polling archive chunk queue...") print(response) # Checking that there is a Messages key before proceeding. No 'Messages' key likely means the queue is empty if 'Messages' in response: jsonresponse = response # Loading the string into JSON and checking that ArchiveSizeInBytes key is present before continuing. jsonresponse=json.loads(jsonresponse['Messages'][0]['Body']) if 'job_id' in jsonresponse: #checking that there is a job id before continuing job_id = jsonresponse['job_id'] byterange = "bytes="+str(jsonresponse['start']) + '-' + str(jsonresponse['end']) chunk_number = jsonresponse['chunk_number'] receipt_handle = response['Messages'][0]['ReceiptHandle'] if jsonresponse['job_id']: print(f'Received message: {response}') GetJobOutputChunks(job_id,byterange,chunk_number) sqs.delete_message( QueueUrl=chunk_queue_url, ReceiptHandle=receipt_handle) else: print('No messages available in the chunk queue at this time.') def initiate_archive_retrieval(archive_id, topic_arn): glacier = boto3.client('glacier') job_parameters = { "Type": "archive-retrieval", "ArchiveId": archive_id, "Description": "Archive retrieval job", "SNSTopic": topic_arn, "Tier": "Bulk" # You can change this to "Standard" or "Expedited" based on your needs } try: response = glacier.initiate_job( vaultName=vault_name, jobParameters=job_parameters ) print("Archive retrieval job initiated:") print(f"Job ID: {response['jobId']}") print(f"Job parameters: {job_parameters}") print(f"Complete response: {json.dumps(response, indent=2)}") return response['jobId'] except Exception as e: print(f"Error initiating archive retrieval job: {str(e)}") raise def run_async_tasks(chunk_queue_url, workers): max_workers = workers # Set the desired maximum number of concurrent tasks with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: for _ in range(max_workers): executor.submit(ReceiveArchiveChunkMessages, chunk_queue_url) # One time setup of the necessary queues and topics. queue_and_topic_atts = setup_queues_and_topic() topic_arn = queue_and_topic_atts['topic_arn'] notify_queue_url = queue_and_topic_atts['notify_queue_url'] chunk_queue_url = queue_and_topic_atts['chunk_queue_url'] if retrieve_archive: print("Retrieving the defined archive... The topic arn we will notify when recalling the archive is: "+topic_arn) job_id = initiate_archive_retrieval(archive_id, topic_arn) else: print("Retrieve archive is false, polling queues and downloading only.") while True: ReceiveArchiveReadyMessages(notify_queue_url,chunk_queue_url) run_async_tasks(chunk_queue_url,workers)

使用脚本

要使用此脚本,请执行以下步骤:

  1. 用您的特定信息替换脚本中的占位符值:

    • output_file_path: 用于保存区块文件的目录

    • vault_name: 你的 S3 Glacier 保管库的名称

    • notify_queue_name: 作业通知队列的名称

    • chunk_download_queue_name: 区块下载队列的名称

    • sns_topic_name: SNS 主题的名称

    • region: AWS 您的保管库所在的地区

    • archive_id: 要检索的档案的 ID

  2. 运行 脚本:

    python download_large_archive.py
  3. 下载完所有区块后,你可以使用如下命令将它们合并为一个文件:

    cat /path/to/chunks/*.chunk > complete_archive.file

重要注意事项

使用此脚本时,请记住以下几点:

  • 从 S3 Glacier 检索档案可能需要几个小时才能完成,具体取决于所选的检索层。

  • 该脚本无限期运行,持续轮询队列。您可能需要根据自己的具体要求添加终止条件。

  • 确保您有足够的磁盘空间来存储存档的所有区块。

  • 如果脚本中断,您可以重新启动脚本retrieve_archive=False以继续下载区块,而无需启动新的检索任务。

  • 根据您的网络带宽chunk_size和系统资源调整和workers参数。

  • 亚马逊 S3 检索、亚马逊 SNS 和亚马逊 SQS 的使用收取标准 AWS 费用。