Python での並列処理を使用した大規模なアーカイブのダウンロード - Amazon Glacier

このページは、Vaults と 2012 年の元の REST API を使用する Amazon Glacier サービスの既存のお客様専用です。

アーカイブストレージソリューションをお探しの場合は、Amazon Glacier Amazon S3、S3 Glacier Flexible Retrieval、S3 S3 Glacier Deep Archive の Amazon Glacier ストレージクラスを使用することをお勧めします。これらのストレージオプションの詳細については、Amazon Glacier ストレージクラス」を参照してください。

Amazon Glacier (元のスタンドアロンボールトベースのサービス) は、2025 年 12 月 15 日以降、既存の顧客に影響を与えずに新規顧客を受け入れなくなります。Amazon Glacier は、ボールトにデータを保存する独自の APIs を備えたスタンドアロンサービスであり、Amazon S3 および Amazon S3 Glacier ストレージクラスとは異なります。既存のデータは Amazon Glacier で無期限に安全でアクセス可能です。移行は必要ありません。低コストの長期アーカイブストレージの場合、 は Amazon S3 Glacier ストレージクラス AWS を推奨します。これにより、S3 バケットベースの APIs、フル AWS リージョン 可用性、低コスト、 AWS サービス統合で優れたカスタマーエクスペリエンスを実現できます。拡張機能が必要な場合は、Amazon Glacier ボールトから Amazon S3 Glacier ストレージクラスにデータを転送するためのソリューションガイダンスを使用して、Amazon S3 Glacier ストレージクラスへの移行を検討してください。 AWS Amazon Glacier Amazon S3

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Python での並列処理を使用した大規模なアーカイブのダウンロード

このトピックでは、Python で並列処理を使用して Amazon S3 Glacier (S3 Glacier) から大きなアーカイブをダウンロードする方法について説明します。このアプローチにより、個別に処理できる小さな部分に分割することで、任意のサイズのアーカイブを確実にダウンロードできます。

概要

この例で提供されている Python スクリプトは、次のタスクを実行します。

  1. 通知に必要な AWS リソース (Amazon SNS トピックと Amazon SQS キュー) を設定します。

  2. Amazon Glacier でアーカイブ取得ジョブを開始します

  3. ジョブ完了通知の Amazon SQS キューをモニタリングします

  4. 大きなアーカイブを管理可能なチャンクに分割します

  5. 複数のワーカースレッドを使用してチャンクを並行してダウンロードします

  6. 後で再組み立てできるように各チャンクをディスクに保存します

前提条件

開始する前に、以下があることを確認してください。

  • Python 3.6 以降がインストールされている

  • AWS SDK for Python (Boto3) がインストールされている

  • AWS Amazon Glacier、Amazon SNS、Amazon SQS の適切なアクセス許可で設定された 認証情報

  • ダウンロードしたアーカイブチャンクを保存するのに十分なディスク容量

例: Python での並列処理を使用したアーカイブのダウンロード

次の Python スクリプトは、並列処理を使用して Amazon Glacier から大きなアーカイブをダウンロードする方法を示しています。

import boto3 import time import json import jmespath import re import concurrent.futures import os output_file_path = "output_directory_path" vault_name = "vault_name" chunk_size = 1000000000 #1gb - size of chunks for parallel download. notify_queue_name = 'GlacierJobCompleteNotifyQueue' # SQS queue for Glacier recall notification chunk_download_queue_name='GlacierChunkReadyNotifyQueue' # SQS queue for chunks sns_topic_name = 'GlacierRecallJobCompleted' # the SNS topic to be notified when Glacier archive is restored. chunk_queue_visibility_timeout = 7200 # 2 hours - this may need to be adjusted. region = 'us-east-1' archive_id = "archive_id_to_restore" retrieve_archive = True # set to false if you do not want to restore from Glacier - useful for restarting or parallel processing of the chunk queue. workers = 12 # the number of parallel worker threads for downloading chunks. def setup_queues_and_topic(): sqs = boto3.client('sqs') sns = boto3.client('sns') # Create the SNS topic topic_response = sns.create_topic( Name=sns_topic_name ) topic_arn = topic_response['TopicArn'] print("Creating the SNS topic " + topic_arn) # Create the notification queue notify_queue_response = sqs.create_queue( QueueName=notify_queue_name, Attributes={ 'VisibilityTimeout': '300', # 5 minutes 'ReceiveMessageWaitTimeSeconds': '20' # Enable long polling } ) notify_queue_url = notify_queue_response['QueueUrl'] print("Creating the archive-retrieval notification queue " + notify_queue_url) # Create the chunk download queue chunk_queue_response = sqs.create_queue( QueueName=chunk_download_queue_name, Attributes={ 'VisibilityTimeout': str(chunk_queue_visibility_timeout), # 5 minutes 'ReceiveMessageWaitTimeSeconds': '0' } ) chunk_queue_url = chunk_queue_response['QueueUrl'] print("Creating the chunk ready notification queue " + chunk_queue_url) # Get the ARN for the notification queue notify_queue_attributes = sqs.get_queue_attributes( QueueUrl=notify_queue_url, AttributeNames=['QueueArn'] ) notify_queue_arn = notify_queue_attributes['Attributes']['QueueArn'] # Set up the SNS topic policy on the notification queue queue_policy = { "Version": "2012-10-17", "Statement": [{ "Sid": "allow-sns-messages", "Effect": "Allow", "Principal": {"AWS": "*"}, "Action": "SQS:SendMessage", "Resource": notify_queue_arn, "Condition": { "ArnEquals": { "aws:SourceArn": topic_arn } } }] } # Set the queue policy sqs.set_queue_attributes( QueueUrl=notify_queue_url, Attributes={ 'Policy': json.dumps(queue_policy) } ) # Subscribe the notification queue to the SNS topic sns.subscribe( TopicArn=topic_arn, Protocol='sqs', Endpoint=notify_queue_arn ) return { 'topic_arn': topic_arn, 'notify_queue_url': notify_queue_url, 'chunk_queue_url': chunk_queue_url } def split_and_send_chunks(archive_size, job_id,chunk_queue_url): ranges = [] current = 0 chunk_number = 0 while current < archive_size: chunk_number += 1 next_range = min(current + chunk_size - 1, archive_size - 1) ranges.append((current, next_range, chunk_number)) current = next_range + 1 # Send messages to SQS queue for start, end, chunk_number in ranges: body = {"start": start, "end": end, "job_id": job_id, "chunk_number": chunk_number} body = json.dumps(body) print("Sending SQS message for range:" + str(body)) response = sqs.send_message( QueueUrl=chunk_queue_url, MessageBody=str(body) ) def GetJobOutputChunks(job_id, byterange, chunk_number): glacier = boto3.client('glacier') response = glacier.get_job_output( vaultName=vault_name, jobId=job_id, range=byterange, ) with open(os.path.join(output_file_path,str(chunk_number)+".chunk"), 'wb') as output_file: output_file.write(response['body'].read()) return response def ReceiveArchiveReadyMessages(notify_queue_url,chunk_queue_url): response = sqs.receive_message( QueueUrl=notify_queue_url, AttributeNames=['All'], MaxNumberOfMessages=1, WaitTimeSeconds=20, MessageAttributeNames=['Message'] ) print("Polling archive retrieval job ready queue...") # Checking that there is a Messages key before proceeding. No 'Messages' key likely means the queue is empty if 'Messages' in response: print("Received a message from the archive retrieval job queue") jsonresponse = response # Loading the string into JSON and checking that ArchiveSizeInBytes key is present before continuing. jsonresponse=json.loads(jsonresponse['Messages'][0]['Body']) jsonresponse=json.loads(jsonresponse['Message']) if 'ArchiveSizeInBytes' in jsonresponse: receipt_handle = response['Messages'][0]['ReceiptHandle'] if jsonresponse['ArchiveSizeInBytes']: archive_size = jsonresponse['ArchiveSizeInBytes'] print(f'Received message: {response}') if archive_size > chunk_size: split_and_send_chunks(archive_size, jsonresponse['JobId'],chunk_queue_url) sqs.delete_message( QueueUrl=notify_queue_url, ReceiptHandle=receipt_handle) else: print("No ArchiveSizeInBytes value found in message") print(response) else: print('No messages available in the queue at this time.') time.sleep(1) def ReceiveArchiveChunkMessages(chunk_queue_url): response = sqs.receive_message( QueueUrl=chunk_queue_url, AttributeNames=['All'], MaxNumberOfMessages=1, WaitTimeSeconds=0, MessageAttributeNames=['Message'] ) print("Polling archive chunk queue...") print(response) # Checking that there is a Messages key before proceeding. No 'Messages' key likely means the queue is empty if 'Messages' in response: jsonresponse = response # Loading the string into JSON and checking that ArchiveSizeInBytes key is present before continuing. jsonresponse=json.loads(jsonresponse['Messages'][0]['Body']) if 'job_id' in jsonresponse: #checking that there is a job id before continuing job_id = jsonresponse['job_id'] byterange = "bytes="+str(jsonresponse['start']) + '-' + str(jsonresponse['end']) chunk_number = jsonresponse['chunk_number'] receipt_handle = response['Messages'][0]['ReceiptHandle'] if jsonresponse['job_id']: print(f'Received message: {response}') GetJobOutputChunks(job_id,byterange,chunk_number) sqs.delete_message( QueueUrl=chunk_queue_url, ReceiptHandle=receipt_handle) else: print('No messages available in the chunk queue at this time.') def initiate_archive_retrieval(archive_id, topic_arn): glacier = boto3.client('glacier') job_parameters = { "Type": "archive-retrieval", "ArchiveId": archive_id, "Description": "Archive retrieval job", "SNSTopic": topic_arn, "Tier": "Bulk" # You can change this to "Standard" or "Expedited" based on your needs } try: response = glacier.initiate_job( vaultName=vault_name, jobParameters=job_parameters ) print("Archive retrieval job initiated:") print(f"Job ID: {response['jobId']}") print(f"Job parameters: {job_parameters}") print(f"Complete response: {json.dumps(response, indent=2)}") return response['jobId'] except Exception as e: print(f"Error initiating archive retrieval job: {str(e)}") raise def run_async_tasks(chunk_queue_url, workers): max_workers = workers # Set the desired maximum number of concurrent tasks with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: for _ in range(max_workers): executor.submit(ReceiveArchiveChunkMessages, chunk_queue_url) # One time setup of the necessary queues and topics. queue_and_topic_atts = setup_queues_and_topic() topic_arn = queue_and_topic_atts['topic_arn'] notify_queue_url = queue_and_topic_atts['notify_queue_url'] chunk_queue_url = queue_and_topic_atts['chunk_queue_url'] if retrieve_archive: print("Retrieving the defined archive... The topic arn we will notify when recalling the archive is: "+topic_arn) job_id = initiate_archive_retrieval(archive_id, topic_arn) else: print("Retrieve archive is false, polling queues and downloading only.") while True: ReceiveArchiveReadyMessages(notify_queue_url,chunk_queue_url) run_async_tasks(chunk_queue_url,workers)

スクリプトの使用

このスクリプトを使用するには、次の手順に従います。

  1. スクリプトのプレースホルダー値を特定の情報に置き換えます。

    • output_file_path: チャンクファイルが保存されるディレクトリ

    • vault_name: S3 Glacier ボールトの名前

    • notify_queue_name: ジョブ通知キューの名前

    • chunk_download_queue_name: チャンクダウンロードキューの名前

    • sns_topic_name: SNS トピックの名前

    • region: ボールトがある AWS リージョン

    • archive_id: 取得するアーカイブの ID

  2. スクリプトを実行します。

    python download_large_archive.py
  3. すべてのチャンクをダウンロードしたら、次のようなコマンドを使用して 1 つのファイルにまとめることができます。

    cat /path/to/chunks/*.chunk > complete_archive.file

重要な考慮事項

このスクリプトを使用する場合は、次の点に注意してください。

  • S3 Glacier からのアーカイブの取得は、選択した取得階層によっては完了までに数時間かかる場合があります。

  • スクリプトは無期限に実行され、キューを継続的にポーリングします。特定の要件に基づいて終了条件を追加することもできます。

  • アーカイブのすべてのチャンクを保存するのに十分なディスク容量があることを確認します。

  • スクリプトが中断された場合は、 で再起動retrieve_archive=Falseして、新しい取得ジョブを開始せずにチャンクのダウンロードを続行できます。

  • ネットワーク帯域幅とシステムリソースに基づいて、chunk_size パラメータと workers パラメータを調整します。

  • Amazon S3 の取得、Amazon SNS、Amazon SQS の使用には、標準 AWS 料金が適用されます。