Herunterladen eines großen Archivs mithilfe der Parallelverarbeitung mit Python - Amazon Glacier

Diese Seite ist nur für Bestandskunden des Amazon Glacier-Service bestimmt, die Vaults und die ursprüngliche REST-API von 2012 verwenden.

Wenn Sie nach Archivspeicherlösungen suchen, empfehlen wir die Verwendung der Amazon Glacier-Speicherklassen in Amazon S3, S3 Glacier Instant Retrieval, S3 Glacier Flexible Retrieval und S3 Glacier Deep Archive. Weitere Informationen zu diesen Speicheroptionen finden Sie unter Amazon Glacier-Speicherklassen.

Amazon Glacier (ursprünglicher eigenständiger, vault-basierter Service) akzeptiert ab dem 15. Dezember 2025 keine Neukunden mehr, ohne dass dies Auswirkungen auf Bestandskunden hat. Amazon Glacier ist ein eigenständiger Service APIs , der Daten in Tresoren speichert und sich von den Speicherklassen Amazon S3 und Amazon S3 Glacier unterscheidet. Ihre vorhandenen Daten bleiben in Amazon Glacier auf unbestimmte Zeit sicher und zugänglich. Es ist keine Migration erforderlich. Für kostengünstige, langfristige Archivierungsspeicherung AWS empfiehlt sich die Amazon S3 Glacier-Speicherklasse, die mit S3-Bucket-Basis, voller AWS-Region Verfügbarkeit APIs, geringeren Kosten und AWS Serviceintegration ein hervorragendes Kundenerlebnis bieten. Wenn Sie erweiterte Funktionen wünschen, sollten Sie eine Migration zu Amazon S3 Glacier-Speicherklassen in Betracht ziehen, indem Sie unseren AWS Lösungsleitfaden für die Übertragung von Daten aus Amazon Glacier-Tresoren in Amazon S3 Glacier-Speicherklassen verwenden.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Herunterladen eines großen Archivs mithilfe der Parallelverarbeitung mit Python

In diesem Thema wird beschrieben, wie Sie mithilfe der Parallelverarbeitung mit Python ein großes Archiv von Amazon S3 Glacier (S3 Glacier) herunterladen. Mit diesem Ansatz können Sie Archive jeder Größe zuverlässig herunterladen, indem Sie sie in kleinere Teile aufteilen, die unabhängig voneinander verarbeitet werden können.

Übersicht

Das in diesem Beispiel bereitgestellte Python-Skript führt die folgenden Aufgaben aus:

  1. Richtet die erforderlichen AWS Ressourcen (Amazon SNS SNS-Thema und Amazon SQS SQS-Warteschlangen) für Benachrichtigungen ein

  2. Initiiert einen Job zum Abrufen von Archiven mit Amazon Glacier

  3. Überwacht eine Amazon SQS SQS-Warteschlange für Benachrichtigungen über den Abschluss von Jobs

  4. Teilt das große Archiv in verwaltbare Teile auf

  5. Lädt Chunks parallel unter Verwendung mehrerer Worker-Threads herunter

  6. Speichert jeden Block auf der Festplatte, damit er später wieder zusammengebaut werden kann

Voraussetzungen

Bevor Sie beginnen, stellen Sie sicher, dass Sie über Folgendes verfügen:

  • Python 3.6 oder höher installiert

  • AWS SDK for Python (Boto3) installiert

  • AWS Anmeldeinformationen, die mit den entsprechenden Berechtigungen für Amazon Glacier, Amazon SNS und Amazon SQS konfiguriert sind

  • Ausreichend Festplattenspeicher zum Speichern der heruntergeladenen Archivblöcke

Beispiel: Herunterladen eines Archivs mithilfe von Parallelverarbeitung mit Python

Das folgende Python-Skript veranschaulicht, wie ein großes Archiv mithilfe von Parallelverarbeitung von Amazon Glacier heruntergeladen wird:

import boto3 import time import json import jmespath import re import concurrent.futures import os output_file_path = "output_directory_path" vault_name = "vault_name" chunk_size = 1000000000 #1gb - size of chunks for parallel download. notify_queue_name = 'GlacierJobCompleteNotifyQueue' # SQS queue for Glacier recall notification chunk_download_queue_name='GlacierChunkReadyNotifyQueue' # SQS queue for chunks sns_topic_name = 'GlacierRecallJobCompleted' # the SNS topic to be notified when Glacier archive is restored. chunk_queue_visibility_timeout = 7200 # 2 hours - this may need to be adjusted. region = 'us-east-1' archive_id = "archive_id_to_restore" retrieve_archive = True # set to false if you do not want to restore from Glacier - useful for restarting or parallel processing of the chunk queue. workers = 12 # the number of parallel worker threads for downloading chunks. def setup_queues_and_topic(): sqs = boto3.client('sqs') sns = boto3.client('sns') # Create the SNS topic topic_response = sns.create_topic( Name=sns_topic_name ) topic_arn = topic_response['TopicArn'] print("Creating the SNS topic " + topic_arn) # Create the notification queue notify_queue_response = sqs.create_queue( QueueName=notify_queue_name, Attributes={ 'VisibilityTimeout': '300', # 5 minutes 'ReceiveMessageWaitTimeSeconds': '20' # Enable long polling } ) notify_queue_url = notify_queue_response['QueueUrl'] print("Creating the archive-retrieval notification queue " + notify_queue_url) # Create the chunk download queue chunk_queue_response = sqs.create_queue( QueueName=chunk_download_queue_name, Attributes={ 'VisibilityTimeout': str(chunk_queue_visibility_timeout), # 5 minutes 'ReceiveMessageWaitTimeSeconds': '0' } ) chunk_queue_url = chunk_queue_response['QueueUrl'] print("Creating the chunk ready notification queue " + chunk_queue_url) # Get the ARN for the notification queue notify_queue_attributes = sqs.get_queue_attributes( QueueUrl=notify_queue_url, AttributeNames=['QueueArn'] ) notify_queue_arn = notify_queue_attributes['Attributes']['QueueArn'] # Set up the SNS topic policy on the notification queue queue_policy = { "Version": "2012-10-17", "Statement": [{ "Sid": "allow-sns-messages", "Effect": "Allow", "Principal": {"AWS": "*"}, "Action": "SQS:SendMessage", "Resource": notify_queue_arn, "Condition": { "ArnEquals": { "aws:SourceArn": topic_arn } } }] } # Set the queue policy sqs.set_queue_attributes( QueueUrl=notify_queue_url, Attributes={ 'Policy': json.dumps(queue_policy) } ) # Subscribe the notification queue to the SNS topic sns.subscribe( TopicArn=topic_arn, Protocol='sqs', Endpoint=notify_queue_arn ) return { 'topic_arn': topic_arn, 'notify_queue_url': notify_queue_url, 'chunk_queue_url': chunk_queue_url } def split_and_send_chunks(archive_size, job_id,chunk_queue_url): ranges = [] current = 0 chunk_number = 0 while current < archive_size: chunk_number += 1 next_range = min(current + chunk_size - 1, archive_size - 1) ranges.append((current, next_range, chunk_number)) current = next_range + 1 # Send messages to SQS queue for start, end, chunk_number in ranges: body = {"start": start, "end": end, "job_id": job_id, "chunk_number": chunk_number} body = json.dumps(body) print("Sending SQS message for range:" + str(body)) response = sqs.send_message( QueueUrl=chunk_queue_url, MessageBody=str(body) ) def GetJobOutputChunks(job_id, byterange, chunk_number): glacier = boto3.client('glacier') response = glacier.get_job_output( vaultName=vault_name, jobId=job_id, range=byterange, ) with open(os.path.join(output_file_path,str(chunk_number)+".chunk"), 'wb') as output_file: output_file.write(response['body'].read()) return response def ReceiveArchiveReadyMessages(notify_queue_url,chunk_queue_url): response = sqs.receive_message( QueueUrl=notify_queue_url, AttributeNames=['All'], MaxNumberOfMessages=1, WaitTimeSeconds=20, MessageAttributeNames=['Message'] ) print("Polling archive retrieval job ready queue...") # Checking that there is a Messages key before proceeding. No 'Messages' key likely means the queue is empty if 'Messages' in response: print("Received a message from the archive retrieval job queue") jsonresponse = response # Loading the string into JSON and checking that ArchiveSizeInBytes key is present before continuing. jsonresponse=json.loads(jsonresponse['Messages'][0]['Body']) jsonresponse=json.loads(jsonresponse['Message']) if 'ArchiveSizeInBytes' in jsonresponse: receipt_handle = response['Messages'][0]['ReceiptHandle'] if jsonresponse['ArchiveSizeInBytes']: archive_size = jsonresponse['ArchiveSizeInBytes'] print(f'Received message: {response}') if archive_size > chunk_size: split_and_send_chunks(archive_size, jsonresponse['JobId'],chunk_queue_url) sqs.delete_message( QueueUrl=notify_queue_url, ReceiptHandle=receipt_handle) else: print("No ArchiveSizeInBytes value found in message") print(response) else: print('No messages available in the queue at this time.') time.sleep(1) def ReceiveArchiveChunkMessages(chunk_queue_url): response = sqs.receive_message( QueueUrl=chunk_queue_url, AttributeNames=['All'], MaxNumberOfMessages=1, WaitTimeSeconds=0, MessageAttributeNames=['Message'] ) print("Polling archive chunk queue...") print(response) # Checking that there is a Messages key before proceeding. No 'Messages' key likely means the queue is empty if 'Messages' in response: jsonresponse = response # Loading the string into JSON and checking that ArchiveSizeInBytes key is present before continuing. jsonresponse=json.loads(jsonresponse['Messages'][0]['Body']) if 'job_id' in jsonresponse: #checking that there is a job id before continuing job_id = jsonresponse['job_id'] byterange = "bytes="+str(jsonresponse['start']) + '-' + str(jsonresponse['end']) chunk_number = jsonresponse['chunk_number'] receipt_handle = response['Messages'][0]['ReceiptHandle'] if jsonresponse['job_id']: print(f'Received message: {response}') GetJobOutputChunks(job_id,byterange,chunk_number) sqs.delete_message( QueueUrl=chunk_queue_url, ReceiptHandle=receipt_handle) else: print('No messages available in the chunk queue at this time.') def initiate_archive_retrieval(archive_id, topic_arn): glacier = boto3.client('glacier') job_parameters = { "Type": "archive-retrieval", "ArchiveId": archive_id, "Description": "Archive retrieval job", "SNSTopic": topic_arn, "Tier": "Bulk" # You can change this to "Standard" or "Expedited" based on your needs } try: response = glacier.initiate_job( vaultName=vault_name, jobParameters=job_parameters ) print("Archive retrieval job initiated:") print(f"Job ID: {response['jobId']}") print(f"Job parameters: {job_parameters}") print(f"Complete response: {json.dumps(response, indent=2)}") return response['jobId'] except Exception as e: print(f"Error initiating archive retrieval job: {str(e)}") raise def run_async_tasks(chunk_queue_url, workers): max_workers = workers # Set the desired maximum number of concurrent tasks with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: for _ in range(max_workers): executor.submit(ReceiveArchiveChunkMessages, chunk_queue_url) # One time setup of the necessary queues and topics. queue_and_topic_atts = setup_queues_and_topic() topic_arn = queue_and_topic_atts['topic_arn'] notify_queue_url = queue_and_topic_atts['notify_queue_url'] chunk_queue_url = queue_and_topic_atts['chunk_queue_url'] if retrieve_archive: print("Retrieving the defined archive... The topic arn we will notify when recalling the archive is: "+topic_arn) job_id = initiate_archive_retrieval(archive_id, topic_arn) else: print("Retrieve archive is false, polling queues and downloading only.") while True: ReceiveArchiveReadyMessages(notify_queue_url,chunk_queue_url) run_async_tasks(chunk_queue_url,workers)

Verwenden des Skripts

Gehen Sie wie folgt vor, um dieses Skript zu verwenden:

  1. Ersetzen Sie die Platzhalterwerte im Skript durch Ihre spezifischen Informationen:

    • output_file_path: Verzeichnis, in dem Chunk-Dateien gespeichert werden

    • vault_name: Name Ihres S3 Glacier-Tresors

    • notify_queue_name: Name für die Warteschlange für Jobbenachrichtigungen

    • chunk_download_queue_name: Name für die Chunk-Download-Warteschlange

    • sns_topic_name: Name für das SNS-Thema

    • region: AWS Region, in der sich Ihr Tresor befindet

    • archive_id: ID des abzurufenden Archivs

  2. Führen Sie das Skript aus:

    python download_large_archive.py
  3. Nachdem alle Chunks heruntergeladen wurden, können Sie sie mit einem Befehl wie dem folgenden zu einer einzigen Datei kombinieren:

    cat /path/to/chunks/*.chunk > complete_archive.file

Wichtige Überlegungen

Beachten Sie bei der Verwendung dieses Skripts Folgendes:

  • Das Abrufen von Archiven aus S3 Glacier kann je nach ausgewählter Abrufstufe mehrere Stunden dauern.

  • Das Skript wird unbegrenzt ausgeführt und fragt kontinuierlich die Warteschlangen ab. Möglicherweise möchten Sie eine Kündigungsbedingung hinzufügen, die Ihren spezifischen Anforderungen entspricht.

  • Stellen Sie sicher, dass Sie über ausreichend Festplattenspeicher verfügen, um alle Teile Ihres Archivs zu speichern.

  • Wenn das Skript unterbrochen wird, können Sie es mit neu starten, retrieve_archive=False um mit dem Herunterladen von Chunks fortzufahren, ohne einen neuen Abrufjob zu starten.

  • Passen Sie die workers Parameter chunk_size und an Ihre Netzwerkbandbreite und Systemressourcen an.

  • Für Amazon S3-Abrufe, Amazon SNS- und Amazon SQS SQS-Nutzung AWS fallen Standardgebühren an.