Esta página destina-se somente a clientes atuais do serviço Amazon Glacier que usam cofres e a API REST original de 2012.
Se você estiver procurando soluções de armazenamento de arquivos do Amazon Glacier, recomendamos usar as classes de armazenamento do Amazon S3, S3 Glacier Instant Retrieval, S3 Glacier Flexible Retrieval e S3 Glacier Deep Archive. Para saber mais sobre essas opções de armazenamento, consulte Classes de armazenamento do Amazon Glacier
O Amazon Glacier (serviço autônomo original baseado em cofre) não aceitará mais novos clientes a partir de 15 de dezembro de 2025, sem impacto para os clientes existentes. O Amazon Glacier é um serviço independente com suas próprias APIs que armazena dados em cofres e é diferente das classes de armazenamento do Amazon S3 e Amazon S3 Glacier. Seus dados existentes permanecerão seguros e acessíveis no Amazon Glacier indefinidamente. Nenhuma migração é necessária. Para armazenamento de arquivamento de baixo custo e longo prazo, a AWS recomenda as classes de armazenamento do Amazon S3 Glacier
Baixando um arquivo grande usando processamento paralelo com Python
Este tópico descreve como baixar um arquivo grande do Amazon S3 Glacier (S3 Glacier) usando processamento paralelo com Python. Essa abordagem permite que você baixe arquivos de qualquer tamanho de forma confiável, dividindo-os em partes menores que podem ser processadas de forma independente.
Visão geral
O script Python fornecido nesse exemplo realiza as seguintes tarefas:
-
Configura os recursos da AWS necessários (tópico do Amazon SNS e filas do Amazon SQS) para notificações
-
Inicia um trabalho de recuperação de arquivo com o Amazon Glacier
-
Monitora uma fila do Amazon SQS em busca de notificações de conclusão de trabalho
-
Divide o grande arquivo em partes gerenciáveis
-
Baixa partes em paralelo usando vários threads de trabalho
-
Salva cada pedaço em disco para posterior remontagem
Pré-requisitos
Antes de começar, você deve ter o seguinte:
-
Python 3.6 ou superior instalado.
-
SDK da AWS para Python (Boto3) instalado.
-
Credenciais da AWS configuradas com as permissões apropriadas para o Amazon Glacier, Amazon SNS e Amazon SQS
-
Espaço em disco suficiente para armazenar os pedaços de arquivo baixados
Exemplo: baixando um arquivo usando processamento paralelo com Python
O script Python a seguir demonstra como fazer o download de um grande arquivo do Amazon Glacier usando processamento paralelo:
import boto3 import time import json import jmespath import re import concurrent.futures import os output_file_path = "output_directory_path" vault_name = "vault_name" chunk_size = 1000000000 #1gb - size of chunks for parallel download. notify_queue_name = 'GlacierJobCompleteNotifyQueue' # SQS queue for Glacier recall notification chunk_download_queue_name='GlacierChunkReadyNotifyQueue' # SQS queue for chunks sns_topic_name = 'GlacierRecallJobCompleted' # the SNS topic to be notified when Glacier archive is restored. chunk_queue_visibility_timeout = 7200 # 2 hours - this may need to be adjusted. region = 'us-east-1' archive_id = "archive_id_to_restore" retrieve_archive = True # set to false if you do not want to restore from Glacier - useful for restarting or parallel processing of the chunk queue. workers = 12 # the number of parallel worker threads for downloading chunks. def setup_queues_and_topic(): sqs = boto3.client('sqs') sns = boto3.client('sns') # Create the SNS topic topic_response = sns.create_topic( Name=sns_topic_name ) topic_arn = topic_response['TopicArn'] print("Creating the SNS topic " + topic_arn) # Create the notification queue notify_queue_response = sqs.create_queue( QueueName=notify_queue_name, Attributes={ 'VisibilityTimeout': '300', # 5 minutes 'ReceiveMessageWaitTimeSeconds': '20' # Enable long polling } ) notify_queue_url = notify_queue_response['QueueUrl'] print("Creating the archive-retrieval notification queue " + notify_queue_url) # Create the chunk download queue chunk_queue_response = sqs.create_queue( QueueName=chunk_download_queue_name, Attributes={ 'VisibilityTimeout': str(chunk_queue_visibility_timeout), # 5 minutes 'ReceiveMessageWaitTimeSeconds': '0' } ) chunk_queue_url = chunk_queue_response['QueueUrl'] print("Creating the chunk ready notification queue " + chunk_queue_url) # Get the ARN for the notification queue notify_queue_attributes = sqs.get_queue_attributes( QueueUrl=notify_queue_url, AttributeNames=['QueueArn'] ) notify_queue_arn = notify_queue_attributes['Attributes']['QueueArn'] # Set up the SNS topic policy on the notification queue queue_policy = { "Version": "2012-10-17", "Statement": [{ "Sid": "allow-sns-messages", "Effect": "Allow", "Principal": {"AWS": "*"}, "Action": "SQS:SendMessage", "Resource": notify_queue_arn, "Condition": { "ArnEquals": { "aws:SourceArn": topic_arn } } }] } # Set the queue policy sqs.set_queue_attributes( QueueUrl=notify_queue_url, Attributes={ 'Policy': json.dumps(queue_policy) } ) # Subscribe the notification queue to the SNS topic sns.subscribe( TopicArn=topic_arn, Protocol='sqs', Endpoint=notify_queue_arn ) return { 'topic_arn': topic_arn, 'notify_queue_url': notify_queue_url, 'chunk_queue_url': chunk_queue_url } def split_and_send_chunks(archive_size, job_id,chunk_queue_url): ranges = [] current = 0 chunk_number = 0 while current < archive_size: chunk_number += 1 next_range = min(current + chunk_size - 1, archive_size - 1) ranges.append((current, next_range, chunk_number)) current = next_range + 1 # Send messages to SQS queue for start, end, chunk_number in ranges: body = {"start": start, "end": end, "job_id": job_id, "chunk_number": chunk_number} body = json.dumps(body) print("Sending SQS message for range:" + str(body)) response = sqs.send_message( QueueUrl=chunk_queue_url, MessageBody=str(body) ) def GetJobOutputChunks(job_id, byterange, chunk_number): glacier = boto3.client('glacier') response = glacier.get_job_output( vaultName=vault_name, jobId=job_id, range=byterange, ) with open(os.path.join(output_file_path,str(chunk_number)+".chunk"), 'wb') as output_file: output_file.write(response['body'].read()) return response def ReceiveArchiveReadyMessages(notify_queue_url,chunk_queue_url): response = sqs.receive_message( QueueUrl=notify_queue_url, AttributeNames=['All'], MaxNumberOfMessages=1, WaitTimeSeconds=20, MessageAttributeNames=['Message'] ) print("Polling archive retrieval job ready queue...") # Checking that there is a Messages key before proceeding. No 'Messages' key likely means the queue is empty if 'Messages' in response: print("Received a message from the archive retrieval job queue") jsonresponse = response # Loading the string into JSON and checking that ArchiveSizeInBytes key is present before continuing. jsonresponse=json.loads(jsonresponse['Messages'][0]['Body']) jsonresponse=json.loads(jsonresponse['Message']) if 'ArchiveSizeInBytes' in jsonresponse: receipt_handle = response['Messages'][0]['ReceiptHandle'] if jsonresponse['ArchiveSizeInBytes']: archive_size = jsonresponse['ArchiveSizeInBytes'] print(f'Received message: {response}') if archive_size > chunk_size: split_and_send_chunks(archive_size, jsonresponse['JobId'],chunk_queue_url) sqs.delete_message( QueueUrl=notify_queue_url, ReceiptHandle=receipt_handle) else: print("No ArchiveSizeInBytes value found in message") print(response) else: print('No messages available in the queue at this time.') time.sleep(1) def ReceiveArchiveChunkMessages(chunk_queue_url): response = sqs.receive_message( QueueUrl=chunk_queue_url, AttributeNames=['All'], MaxNumberOfMessages=1, WaitTimeSeconds=0, MessageAttributeNames=['Message'] ) print("Polling archive chunk queue...") print(response) # Checking that there is a Messages key before proceeding. No 'Messages' key likely means the queue is empty if 'Messages' in response: jsonresponse = response # Loading the string into JSON and checking that ArchiveSizeInBytes key is present before continuing. jsonresponse=json.loads(jsonresponse['Messages'][0]['Body']) if 'job_id' in jsonresponse: #checking that there is a job id before continuing job_id = jsonresponse['job_id'] byterange = "bytes="+str(jsonresponse['start']) + '-' + str(jsonresponse['end']) chunk_number = jsonresponse['chunk_number'] receipt_handle = response['Messages'][0]['ReceiptHandle'] if jsonresponse['job_id']: print(f'Received message: {response}') GetJobOutputChunks(job_id,byterange,chunk_number) sqs.delete_message( QueueUrl=chunk_queue_url, ReceiptHandle=receipt_handle) else: print('No messages available in the chunk queue at this time.') def initiate_archive_retrieval(archive_id, topic_arn): glacier = boto3.client('glacier') job_parameters = { "Type": "archive-retrieval", "ArchiveId": archive_id, "Description": "Archive retrieval job", "SNSTopic": topic_arn, "Tier": "Bulk" # You can change this to "Standard" or "Expedited" based on your needs } try: response = glacier.initiate_job( vaultName=vault_name, jobParameters=job_parameters ) print("Archive retrieval job initiated:") print(f"Job ID: {response['jobId']}") print(f"Job parameters: {job_parameters}") print(f"Complete response: {json.dumps(response, indent=2)}") return response['jobId'] except Exception as e: print(f"Error initiating archive retrieval job: {str(e)}") raise def run_async_tasks(chunk_queue_url, workers): max_workers = workers # Set the desired maximum number of concurrent tasks with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: for _ in range(max_workers): executor.submit(ReceiveArchiveChunkMessages, chunk_queue_url) # One time setup of the necessary queues and topics. queue_and_topic_atts = setup_queues_and_topic() topic_arn = queue_and_topic_atts['topic_arn'] notify_queue_url = queue_and_topic_atts['notify_queue_url'] chunk_queue_url = queue_and_topic_atts['chunk_queue_url'] if retrieve_archive: print("Retrieving the defined archive... The topic arn we will notify when recalling the archive is: "+topic_arn) job_id = initiate_archive_retrieval(archive_id, topic_arn) else: print("Retrieve archive is false, polling queues and downloading only.") while True: ReceiveArchiveReadyMessages(notify_queue_url,chunk_queue_url) run_async_tasks(chunk_queue_url,workers)
Uso de um script
Para usar esse script, siga estas etapas:
-
Substitua os valores de espaço reservado no script por suas informações específicas:
-
output_file_path: Diretório onde os arquivos fragmentados serão salvos -
vault_name:Nome do seu cofre do S3 Glacier -
notify_queue_name:Nome da fila de notificação de trabalho -
chunk_download_queue_name:Nome da fila de download do fragmento -
sns_topic_name:Nome do tópico do SNS -
region: região da AWS onde seu cofre está localizado -
archive_id:ID do arquivo a ser recuperado
-
-
Execute o script :
python download_large_archive.py -
Depois que todos os fragmentos forem baixados, você poderá combiná-los em um único arquivo usando um comando como:
cat /path/to/chunks/*.chunk > complete_archive.file
Considerações importantes
Ao usar esse script, lembre-se do seguinte:
-
A recuperação de arquivo do S3 Glacier pode levar várias horas, dependendo do nível de recuperação selecionado.
-
O script é executado indefinidamente, pesquisando continuamente as filas. Você pode adicionar uma condição de enceramento com base em seus requisitos específicos.
-
Certifique-se de ter espaço em disco suficiente para armazenar todos os fragmentos do seu arquivo.
-
Se o script for interrompido, você poderá reiniciá-lo com
retrieve_archive=Falsepara continuar baixando fragmentos sem iniciar um novo trabalho de recuperação. -
Ajuste os parâmetros
chunk_sizeeworkerscom base na largura de banda da rede e nos recursos do sistema. -
As cobranças da AWS padrão se aplicam às recuperações do Amazon S3, Amazon SNS e uso do Amazon SQS.