

 **此頁面僅適用於使用 Vaults 和 2012 年原始 REST API 的 Amazon Glacier 服務的現有客戶。**

如果您要尋找封存儲存解決方案，建議您在 Amazon Glacier Instant Retrieval、S3 Glacier Flexible Retrieval 和 S3 Glacier Deep Archive 中使用 Amazon Glacier 儲存類別。 Amazon S3 若要進一步了解這些儲存選項，請參閱 [Amazon Glacier 儲存類別](https://aws.amazon.com/s3/storage-classes/glacier/)。

Amazon Glacier （原始獨立保存庫型服務） 不再接受新客戶。Amazon Glacier 是一項獨立服務，具有自己的 APIs，可將資料存放在保存庫中，並與 Amazon S3 和 Amazon S3 Glacier 儲存類別不同。您現有的資料將在 Amazon Glacier 中無限期保持安全且可存取。不需要遷移。對於低成本、長期的封存儲存， AWS 建議使用 [Amazon S3 Glacier 儲存類別](https://aws.amazon.com/s3/storage-classes/glacier/)，透過 S3 儲存貯體型 APIs、完整 AWS 區域 可用性、降低成本 AWS 和服務整合，提供卓越的客戶體驗。如果您想要增強功能，請考慮使用我們的解決方案指南，將資料從 Amazon S3 Glacier 保存庫傳輸至 Amazon S3 Glacier 儲存類別，以遷移至 Amazon S3 Glacier 儲存類別。 [AWS Amazon Glacier Amazon S3 ](https://aws.amazon.com/solutions/guidance/data-transfer-from-amazon-s3-glacier-vaults-to-amazon-s3/)

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 Python 平行處理下載大型封存
<a name="downloading-large-archive-parallel-python"></a>

本主題說明如何使用 Python 平行處理從 Amazon S3 Glacier (S3 Glacier) 下載大型封存。此方法可讓您將任何大小的封存可靠地下載，方法是將其分成可獨立處理的較小部分。

## 概觀
<a name="downloading-large-archive-python-overview"></a>

此範例中提供的 Python 指令碼會執行下列任務：

1. 設定通知的必要 AWS 資源 (Amazon SNS 主題和 Amazon SQS 佇列）

1. 使用 Amazon Glacier 啟動封存擷取任務

1. 監控任務完成通知的 Amazon SQS 佇列

1. 將大型封存分割為可管理區塊

1. 使用多個工作者執行緒平行下載區塊

1. 將每個區塊儲存至磁碟，以供日後重組

## 先決條件
<a name="downloading-large-archive-python-prerequisites"></a>

開始之前，請確定您已：
+ 已安裝 Python 3.6 或更新版本
+ AWS 已安裝適用於 Python (Boto3) 的 SDK
+ AWS 針對 Amazon Glacier、Amazon SNS 和 Amazon SQS 設定具有適當許可的憑證
+ 有足夠的磁碟空間來存放下載的封存區塊

## 範例：使用 Python 平行處理下載封存
<a name="downloading-large-archive-python-code"></a>

下列 Python 指令碼示範如何使用平行處理從 Amazon Glacier 下載大型封存：

```
import boto3
import time
import json
import jmespath
import re
import concurrent.futures
import os

output_file_path = "{{output_directory_path}}"
vault_name = "{{vault_name}}"

chunk_size = 1000000000 #1gb - size of chunks for parallel download.
notify_queue_name = '{{GlacierJobCompleteNotifyQueue}}' # SQS queue for Glacier recall notification
chunk_download_queue_name='{{GlacierChunkReadyNotifyQueue}}' # SQS queue for chunks
sns_topic_name = '{{GlacierRecallJobCompleted}}' # the SNS topic to be notified when Glacier archive is restored.
chunk_queue_visibility_timeout = 7200 # 2 hours - this may need to be adjusted.
region = '{{us-east-1}}'
archive_id = "{{archive_id_to_restore}}"
retrieve_archive = True # set to false if you do not want to restore from Glacier - useful for restarting or parallel processing of the chunk queue.
workers = 12 # the number of parallel worker threads for downloading chunks. 

def setup_queues_and_topic():
    sqs = boto3.client('sqs')
    sns = boto3.client('sns')

    # Create the SNS topic
    topic_response = sns.create_topic(
        Name=sns_topic_name
    )
    topic_arn = topic_response['TopicArn']
    print("Creating the SNS topic " + topic_arn)

    # Create the notification queue
    notify_queue_response = sqs.create_queue(
        QueueName=notify_queue_name,
        Attributes={
            'VisibilityTimeout': '300',  # 5 minutes
            'ReceiveMessageWaitTimeSeconds': '20'  # Enable long polling
        }
    )
    notify_queue_url = notify_queue_response['QueueUrl']
    print("Creating the archive-retrieval notification queue " + notify_queue_url)

    # Create the chunk download queue
    chunk_queue_response = sqs.create_queue(
        QueueName=chunk_download_queue_name,
        Attributes={
            'VisibilityTimeout': str(chunk_queue_visibility_timeout),  # 5 minutes
            'ReceiveMessageWaitTimeSeconds': '0'
        }
    )
    chunk_queue_url = chunk_queue_response['QueueUrl']

    print("Creating the chunk ready notification queue " + chunk_queue_url)


   # Get the ARN for the notification queue
    notify_queue_attributes = sqs.get_queue_attributes(
        QueueUrl=notify_queue_url,
        AttributeNames=['QueueArn']
    )
    notify_queue_arn = notify_queue_attributes['Attributes']['QueueArn']

    # Set up the SNS topic policy on the notification queue
    queue_policy = {
        "Version": "2012-10-17",		 	 	 
        "Statement": [{
            "Sid": "allow-sns-messages",
            "Effect": "Allow",
            "Principal": {"AWS": "*"},
            "Action": "SQS:SendMessage",
            "Resource": notify_queue_arn,
            "Condition": {
                "ArnEquals": {
                    "aws:SourceArn": topic_arn
                }
            }
        }]
    }

    # Set the queue policy
    sqs.set_queue_attributes(
        QueueUrl=notify_queue_url,
        Attributes={
            'Policy': json.dumps(queue_policy)
        }
    )

    # Subscribe the notification queue to the SNS topic
    sns.subscribe(
        TopicArn=topic_arn,
        Protocol='sqs',
        Endpoint=notify_queue_arn
    )

    return {
        'topic_arn': topic_arn,
        'notify_queue_url': notify_queue_url,
        'chunk_queue_url': chunk_queue_url
    }


def split_and_send_chunks(archive_size, job_id,chunk_queue_url):
    ranges = []
    current = 0
    chunk_number = 0

    while current < archive_size:
        chunk_number += 1
        next_range = min(current + chunk_size - 1, archive_size - 1)
        ranges.append((current, next_range, chunk_number))
        current = next_range + 1

    # Send messages to SQS queue
    for start, end, chunk_number in ranges:
        body = {"start": start, "end": end, "job_id": job_id, "chunk_number": chunk_number}
        body = json.dumps(body)
        print("Sending SQS message for range:" + str(body))
        response = sqs.send_message(
            QueueUrl=chunk_queue_url,
            MessageBody=str(body)
        )

def GetJobOutputChunks(job_id, byterange, chunk_number):
    glacier = boto3.client('glacier')
    response = glacier.get_job_output(
        vaultName=vault_name,
        jobId=job_id,
        range=byterange,

    )

    with open(os.path.join(output_file_path,str(chunk_number)+".chunk"), 'wb') as output_file:
        output_file.write(response['body'].read())

    return response

def ReceiveArchiveReadyMessages(notify_queue_url,chunk_queue_url):

    response = sqs.receive_message(
        QueueUrl=notify_queue_url,
        AttributeNames=['All'],
        MaxNumberOfMessages=1,
        WaitTimeSeconds=20,
        MessageAttributeNames=['Message']
    )
    print("Polling archive retrieval job ready queue...")
    # Checking that there is a Messages key before proceeding. No 'Messages' key likely means the queue is empty

    if 'Messages' in response:
        print("Received a message from the archive retrieval job queue")
        jsonresponse = response
        # Loading the string into JSON and checking that ArchiveSizeInBytes key is present before continuing.
        jsonresponse=json.loads(jsonresponse['Messages'][0]['Body'])
        jsonresponse=json.loads(jsonresponse['Message'])
        if 'ArchiveSizeInBytes' in jsonresponse:
            receipt_handle = response['Messages'][0]['ReceiptHandle']    
            if jsonresponse['ArchiveSizeInBytes']:
                archive_size = jsonresponse['ArchiveSizeInBytes']

                print(f'Received message: {response}')      
                if archive_size > chunk_size:
                    split_and_send_chunks(archive_size, jsonresponse['JobId'],chunk_queue_url)

                    sqs.delete_message(
                    QueueUrl=notify_queue_url,
                    ReceiptHandle=receipt_handle)

            else:
                print("No ArchiveSizeInBytes value found in message")
                print(response)

    else:
        print('No messages available in the queue at this time.')

    time.sleep(1)

def ReceiveArchiveChunkMessages(chunk_queue_url):
    response = sqs.receive_message(
        QueueUrl=chunk_queue_url,
        AttributeNames=['All'],
        MaxNumberOfMessages=1,
        WaitTimeSeconds=0,
        MessageAttributeNames=['Message']
    )
    print("Polling archive chunk queue...")
    print(response)
    # Checking that there is a Messages key before proceeding. No 'Messages' key likely means the queue is empty
    if 'Messages' in response:
        jsonresponse = response
        # Loading the string into JSON and checking that ArchiveSizeInBytes key is present before continuing.
        jsonresponse=json.loads(jsonresponse['Messages'][0]['Body'])
        if 'job_id' in jsonresponse: #checking that there is a job id before continuing
            job_id = jsonresponse['job_id']
            byterange = "bytes="+str(jsonresponse['start']) + '-' + str(jsonresponse['end'])
            chunk_number = jsonresponse['chunk_number']
            receipt_handle = response['Messages'][0]['ReceiptHandle']
            if jsonresponse['job_id']:
                print(f'Received message: {response}')
                GetJobOutputChunks(job_id,byterange,chunk_number)
                sqs.delete_message(
                QueueUrl=chunk_queue_url,
                ReceiptHandle=receipt_handle)
    else:
        print('No messages available in the chunk queue at this time.')

def initiate_archive_retrieval(archive_id, topic_arn):
    glacier = boto3.client('glacier')

    job_parameters = {
        "Type": "archive-retrieval",
        "ArchiveId": archive_id,
        "Description": "Archive retrieval job",
        "SNSTopic": topic_arn,
        "Tier": "Bulk"  # You can change this to "Standard" or "Expedited" based on your needs
    }

    try:
        response = glacier.initiate_job(
            vaultName=vault_name,
            jobParameters=job_parameters
        )

        print("Archive retrieval job initiated:")
        print(f"Job ID: {response['jobId']}")
        print(f"Job parameters: {job_parameters}")
        print(f"Complete response: {json.dumps(response, indent=2)}")

        return response['jobId']

    except Exception as e:
        print(f"Error initiating archive retrieval job: {str(e)}")
        raise

def run_async_tasks(chunk_queue_url, workers):
    max_workers = workers  # Set the desired maximum number of concurrent tasks
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        for _ in range(max_workers):
            executor.submit(ReceiveArchiveChunkMessages, chunk_queue_url)

# One time setup of the necessary queues and topics. 
queue_and_topic_atts = setup_queues_and_topic()

topic_arn = queue_and_topic_atts['topic_arn']
notify_queue_url = queue_and_topic_atts['notify_queue_url']
chunk_queue_url = queue_and_topic_atts['chunk_queue_url']

if retrieve_archive:
    print("Retrieving the defined archive... The topic arn we will notify when recalling the archive is: "+topic_arn)
    job_id = initiate_archive_retrieval(archive_id, topic_arn)
else:
    print("Retrieve archive is false, polling queues and downloading only.")

while True:
   ReceiveArchiveReadyMessages(notify_queue_url,chunk_queue_url)
   run_async_tasks(chunk_queue_url,workers)
```

## 使用指令碼
<a name="downloading-large-archive-python-usage"></a>

若要使用此指令碼，請遵循下列步驟：

1. 將指令碼中的預留位置值取代為您的特定資訊：
   + {{output\_file\_path}}：儲存區塊檔案的目錄
   + {{vault\_name}}：S3 Glacier 保存庫的名稱
   + {{notify\_queue\_name}}：任務通知佇列的名稱
   + {{chunk\_download\_queue\_name}}：區塊下載佇列的名稱
   + {{sns\_topic\_name}}：SNS 主題的名稱
   + {{region}}：region 您的保存庫所在的 AWS 區域
   + {{archive\_id}}：要擷取的封存 ID

1. 執行 指令碼：

   ```
   python download_large_archive.py
   ```

1. 下載所有區塊之後，您可以使用下列命令將它們合併為單一檔案：

   ```
   cat /path/to/chunks/*.chunk > complete_archive.file
   ```

## 重要考量
<a name="downloading-large-archive-python-considerations"></a>

使用此指令碼時，請記住下列事項：
+ 從 S3 Glacier 擷取封存可能需要數小時才能完成，具體取決於選取的擷取層。
+ 指令碼會無限期執行，持續輪詢佇列。您可能想要根據您的特定需求新增終止條件。
+ 請確定您有足夠的磁碟空間來存放封存的所有區塊。
+ 如果指令碼中斷，您可以使用 重新啟動指令碼`retrieve_archive=False`，以繼續下載區塊，而無需啟動新的擷取任務。
+ 根據您的網路頻寬和系統資源調整 {{chunk\_size}} 和{{工作者}}參數。
+ Amazon S3 擷取、Amazon SNS 和 Amazon SQS 用量需支付標準 AWS 費用。