Python での並列処理を使用した大きなアーカイブのダウンロード - Amazon Glacier

このページは、ボールトと 2012 年リリース当時の REST API を使用する、Amazon Glacier サービスの既存のお客様のみを対象としています。

アーカイブストレージソリューションをお探しの場合は、Amazon S3 の Amazon Glacier ストレージクラス (S3 Glacier Instant Retrieval、S3 Glacier Flexible Retrieval、S3 Glacier Deep Archive) を使用することをお勧めします。これらのストレージオプションの詳細については、「Amazon Glacier ストレージクラス」を参照してください。

Amazon Glacier (元のスタンドアロンのボールトベースのサービス) は、2025 年 12 月 15 日以降、新規のお客様を受け入れなくなります。既存のお客様に影響はありません。Amazon Glacier は、ボールトにデータを保存する独自の API を備えたスタンドアロンサービスであり、Amazon S3 および Amazon S3 Glacier ストレージクラスとは異なります。既存のデータは Amazon Glacier で無期限に安全性が確保され、引き続きアクセス可能です。移行は必要ありません。低コストの長期アーカイブストレージをお探しの場合、AWS は Amazon S3 Glacier ストレージクラスを推奨します。このストレージクラスは、コストを抑えながら、S3 バケットベースの API、AWS リージョン全体での可用性、AWS サービス統合により、優れたカスタマーエクスペリエンスを提供します。拡張機能が必要な場合は、Amazon Glacier ボールトから Amazon S3 Glacier ストレージクラスにデータを転送するための AWS ソリューションガイダンスを使用して、Amazon S3 Glacier ストレージクラスへの移行を検討してください。

Python での並列処理を使用した大きなアーカイブのダウンロード

このトピックでは、Python での並列処理を使用して Amazon S3 Glacier (S3 Glacier) から大きなアーカイブをダウンロードする方法について説明します。この方法では、個別に処理できるように細かく分割することで、どのサイズのアーカイブも確実にダウンロードできます。

概要

この例で提供されている Python スクリプトは、次のタスクを実行します。

  1. 通知に必要な AWS リソース (Amazon SNS トピックと Amazon SQS キュー) を設定します。

  2. Amazon Glacier でアーカイブ取得ジョブを開始します

  3. ジョブ完了通知の Amazon SQS キューをモニタリングします

  4. 大きなアーカイブを管理しやすいチャンクに分割します

  5. 複数のワーカースレッドを使用してチャンクを並行してダウンロードします

  6. 後で再組み立てできるように各チャンクをディスクに保存します

前提条件

開始する前に、以下の準備が整っていることを確認します。

  • Python 3.6 以降がインストールされていること。

  • AWS SDK for Python (Boto3) をインストールするには

  • AWS Amazon Glacier、Amazon SNS、Amazon SQS の適切なアクセス許可で設定された 認証情報

  • ダウンロードしたアーカイブチャンクを保存するのに十分なディスク容量

例: Python での並列処理を使用したアーカイブのダウンロード

次の Python スクリプトは、並列処理を使用して Amazon Glacier から大きなアーカイブをダウンロードする方法を示しています。

import boto3 import time import json import jmespath import re import concurrent.futures import os output_file_path = "output_directory_path" vault_name = "vault_name" chunk_size = 1000000000 #1gb - size of chunks for parallel download. notify_queue_name = 'GlacierJobCompleteNotifyQueue' # SQS queue for Glacier recall notification chunk_download_queue_name='GlacierChunkReadyNotifyQueue' # SQS queue for chunks sns_topic_name = 'GlacierRecallJobCompleted' # the SNS topic to be notified when Glacier archive is restored. chunk_queue_visibility_timeout = 7200 # 2 hours - this may need to be adjusted. region = 'us-east-1' archive_id = "archive_id_to_restore" retrieve_archive = True # set to false if you do not want to restore from Glacier - useful for restarting or parallel processing of the chunk queue. workers = 12 # the number of parallel worker threads for downloading chunks. def setup_queues_and_topic(): sqs = boto3.client('sqs') sns = boto3.client('sns') # Create the SNS topic topic_response = sns.create_topic( Name=sns_topic_name ) topic_arn = topic_response['TopicArn'] print("Creating the SNS topic " + topic_arn) # Create the notification queue notify_queue_response = sqs.create_queue( QueueName=notify_queue_name, Attributes={ 'VisibilityTimeout': '300', # 5 minutes 'ReceiveMessageWaitTimeSeconds': '20' # Enable long polling } ) notify_queue_url = notify_queue_response['QueueUrl'] print("Creating the archive-retrieval notification queue " + notify_queue_url) # Create the chunk download queue chunk_queue_response = sqs.create_queue( QueueName=chunk_download_queue_name, Attributes={ 'VisibilityTimeout': str(chunk_queue_visibility_timeout), # 5 minutes 'ReceiveMessageWaitTimeSeconds': '0' } ) chunk_queue_url = chunk_queue_response['QueueUrl'] print("Creating the chunk ready notification queue " + chunk_queue_url) # Get the ARN for the notification queue notify_queue_attributes = sqs.get_queue_attributes( QueueUrl=notify_queue_url, AttributeNames=['QueueArn'] ) notify_queue_arn = notify_queue_attributes['Attributes']['QueueArn'] # Set up the SNS topic policy on the notification queue queue_policy = { "Version": "2012-10-17", "Statement": [{ "Sid": "allow-sns-messages", "Effect": "Allow", "Principal": {"AWS": "*"}, "Action": "SQS:SendMessage", "Resource": notify_queue_arn, "Condition": { "ArnEquals": { "aws:SourceArn": topic_arn } } }] } # Set the queue policy sqs.set_queue_attributes( QueueUrl=notify_queue_url, Attributes={ 'Policy': json.dumps(queue_policy) } ) # Subscribe the notification queue to the SNS topic sns.subscribe( TopicArn=topic_arn, Protocol='sqs', Endpoint=notify_queue_arn ) return { 'topic_arn': topic_arn, 'notify_queue_url': notify_queue_url, 'chunk_queue_url': chunk_queue_url } def split_and_send_chunks(archive_size, job_id,chunk_queue_url): ranges = [] current = 0 chunk_number = 0 while current < archive_size: chunk_number += 1 next_range = min(current + chunk_size - 1, archive_size - 1) ranges.append((current, next_range, chunk_number)) current = next_range + 1 # Send messages to SQS queue for start, end, chunk_number in ranges: body = {"start": start, "end": end, "job_id": job_id, "chunk_number": chunk_number} body = json.dumps(body) print("Sending SQS message for range:" + str(body)) response = sqs.send_message( QueueUrl=chunk_queue_url, MessageBody=str(body) ) def GetJobOutputChunks(job_id, byterange, chunk_number): glacier = boto3.client('glacier') response = glacier.get_job_output( vaultName=vault_name, jobId=job_id, range=byterange, ) with open(os.path.join(output_file_path,str(chunk_number)+".chunk"), 'wb') as output_file: output_file.write(response['body'].read()) return response def ReceiveArchiveReadyMessages(notify_queue_url,chunk_queue_url): response = sqs.receive_message( QueueUrl=notify_queue_url, AttributeNames=['All'], MaxNumberOfMessages=1, WaitTimeSeconds=20, MessageAttributeNames=['Message'] ) print("Polling archive retrieval job ready queue...") # Checking that there is a Messages key before proceeding. No 'Messages' key likely means the queue is empty if 'Messages' in response: print("Received a message from the archive retrieval job queue") jsonresponse = response # Loading the string into JSON and checking that ArchiveSizeInBytes key is present before continuing. jsonresponse=json.loads(jsonresponse['Messages'][0]['Body']) jsonresponse=json.loads(jsonresponse['Message']) if 'ArchiveSizeInBytes' in jsonresponse: receipt_handle = response['Messages'][0]['ReceiptHandle'] if jsonresponse['ArchiveSizeInBytes']: archive_size = jsonresponse['ArchiveSizeInBytes'] print(f'Received message: {response}') if archive_size > chunk_size: split_and_send_chunks(archive_size, jsonresponse['JobId'],chunk_queue_url) sqs.delete_message( QueueUrl=notify_queue_url, ReceiptHandle=receipt_handle) else: print("No ArchiveSizeInBytes value found in message") print(response) else: print('No messages available in the queue at this time.') time.sleep(1) def ReceiveArchiveChunkMessages(chunk_queue_url): response = sqs.receive_message( QueueUrl=chunk_queue_url, AttributeNames=['All'], MaxNumberOfMessages=1, WaitTimeSeconds=0, MessageAttributeNames=['Message'] ) print("Polling archive chunk queue...") print(response) # Checking that there is a Messages key before proceeding. No 'Messages' key likely means the queue is empty if 'Messages' in response: jsonresponse = response # Loading the string into JSON and checking that ArchiveSizeInBytes key is present before continuing. jsonresponse=json.loads(jsonresponse['Messages'][0]['Body']) if 'job_id' in jsonresponse: #checking that there is a job id before continuing job_id = jsonresponse['job_id'] byterange = "bytes="+str(jsonresponse['start']) + '-' + str(jsonresponse['end']) chunk_number = jsonresponse['chunk_number'] receipt_handle = response['Messages'][0]['ReceiptHandle'] if jsonresponse['job_id']: print(f'Received message: {response}') GetJobOutputChunks(job_id,byterange,chunk_number) sqs.delete_message( QueueUrl=chunk_queue_url, ReceiptHandle=receipt_handle) else: print('No messages available in the chunk queue at this time.') def initiate_archive_retrieval(archive_id, topic_arn): glacier = boto3.client('glacier') job_parameters = { "Type": "archive-retrieval", "ArchiveId": archive_id, "Description": "Archive retrieval job", "SNSTopic": topic_arn, "Tier": "Bulk" # You can change this to "Standard" or "Expedited" based on your needs } try: response = glacier.initiate_job( vaultName=vault_name, jobParameters=job_parameters ) print("Archive retrieval job initiated:") print(f"Job ID: {response['jobId']}") print(f"Job parameters: {job_parameters}") print(f"Complete response: {json.dumps(response, indent=2)}") return response['jobId'] except Exception as e: print(f"Error initiating archive retrieval job: {str(e)}") raise def run_async_tasks(chunk_queue_url, workers): max_workers = workers # Set the desired maximum number of concurrent tasks with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: for _ in range(max_workers): executor.submit(ReceiveArchiveChunkMessages, chunk_queue_url) # One time setup of the necessary queues and topics. queue_and_topic_atts = setup_queues_and_topic() topic_arn = queue_and_topic_atts['topic_arn'] notify_queue_url = queue_and_topic_atts['notify_queue_url'] chunk_queue_url = queue_and_topic_atts['chunk_queue_url'] if retrieve_archive: print("Retrieving the defined archive... The topic arn we will notify when recalling the archive is: "+topic_arn) job_id = initiate_archive_retrieval(archive_id, topic_arn) else: print("Retrieve archive is false, polling queues and downloading only.") while True: ReceiveArchiveReadyMessages(notify_queue_url,chunk_queue_url) run_async_tasks(chunk_queue_url,workers)

スクリプトを使用する

このスクリプトを使用するには、次のステップに従います。

  1. スクリプトのプレースホルダー値をユーザー固有の情報に置き換えます。

    • output_file_path: チャンクファイルが保存されるディレクトリ

    • vault_name: S3 Glacier ボールトの名前

    • notify_queue_name: ジョブ通知キューの名前

    • chunk_download_queue_name: チャンクダウンロードキューの名前

    • sns_topic_name: SNS トピックの名前

    • region: ボールトがある AWS リージョン

    • archive_id: 取得するアーカイブの ID

  2. スクリプトを実行します。

    python download_large_archive.py
  3. すべてのチャンクをダウンロードしたら、次のようなコマンドを使用して 1 つのファイルにまとめることができます。

    cat /path/to/chunks/*.chunk > complete_archive.file

重要な考慮事項

このスクリプトを使用する場合は、次の点に留意してください。

  • 選択した取得階層によって、S3 Glacier からのアーカイブの取得を完了するまでに数時間かかる場合があります。

  • スクリプトは無期限に実行され、キューを継続的にポーリングします。特定の要件に基づいて終了条件を追加することもできます。

  • アーカイブのすべてのチャンクを保存するのに十分なディスク容量があることを確認します。

  • スクリプトが中断された場合は、retrieve_archive=False で再起動して、新しい取得ジョブを開始せずにチャンクのダウンロードを続行できます。

  • ネットワーク帯域幅とシステムリソースに基づいて、chunk_size パラメータと workers パラメータを調整します。

  • Amazon S3 の取得、Amazon SNS、Amazon SQS の使用には、標準の AWS 料金が適用されます。