

 **このページは、ボールトと 2012 年リリース当時の REST API を使用する、Amazon Glacier サービスの既存のお客様のみを対象としています。**

アーカイブストレージソリューションをお探しの場合は、Amazon S3 の Amazon Glacier ストレージクラス (S3 Glacier Instant Retrieval、S3 Glacier Flexible Retrieval、S3 Glacier Deep Archive) を使用することをお勧めします。これらのストレージオプションの詳細については、「[Amazon Glacier ストレージクラス](https://aws.amazon.com/s3/storage-classes/glacier/)」を参照してください。

Amazon Glacier (元のスタンドアロンボールトベースのサービス) は、新規顧客を受け入れなくなりました。Amazon Glacier は、ボールトにデータを保存する独自の API を備えたスタンドアロンサービスであり、Amazon S3 および Amazon S3 Glacier ストレージクラスとは異なります。既存のデータは Amazon Glacier で無期限に安全性が確保され、引き続きアクセス可能です。移行は必要ありません。低コストの長期アーカイブストレージの場合、 は [Amazon S3 Glacier ストレージクラス](https://aws.amazon.com/s3/storage-classes/glacier/) AWS を推奨します。これにより、S3 バケットベースの APIs、低コスト、 AWS サービス統合で優れたカスタマーエクスペリエンスを実現できます。 AWS リージョン 拡張機能が必要な場合は、[Amazon Glacier ボールトから Amazon S3 Glacier ストレージクラスにデータを転送するためのAWS ソリューションガイダンス](https://aws.amazon.com/solutions/guidance/data-transfer-from-amazon-s3-glacier-vaults-to-amazon-s3/)を使用して、Amazon S3 Glacier ストレージクラスへの移行を検討してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Python での並列処理を使用した大きなアーカイブのダウンロード
<a name="downloading-large-archive-parallel-python"></a>

このトピックでは、Python での並列処理を使用して Amazon S3 Glacier (S3 Glacier) から大きなアーカイブをダウンロードする方法について説明します。この方法では、個別に処理できるように細かく分割することで、どのサイズのアーカイブも確実にダウンロードできます。

## 概要:
<a name="downloading-large-archive-python-overview"></a>

この例で提供されている Python スクリプトは、次のタスクを実行します。

1. 通知に必要な AWS リソース (Amazon SNS トピックと Amazon SQS キュー) を設定します。

1. Amazon Glacier でアーカイブ取得ジョブを開始します

1. ジョブ完了通知の Amazon SQS キューをモニタリングします

1. 大きなアーカイブを管理しやすいチャンクに分割します

1. 複数のワーカースレッドを使用してチャンクを並行してダウンロードします

1. 後で再組み立てできるように各チャンクをディスクに保存します

## 前提条件
<a name="downloading-large-archive-python-prerequisites"></a>

開始する前に、以下の準備が整っていることを確認します。
+ Python 3.6 以降がインストールされていること。
+ AWS SDK for Python (Boto3) がインストールされている
+ AWS Amazon Glacier、Amazon SNS、Amazon SQS の適切なアクセス許可で設定された 認証情報
+ ダウンロードしたアーカイブチャンクを保存するのに十分なディスク容量

## 例: Python での並列処理を使用したアーカイブのダウンロード
<a name="downloading-large-archive-python-code"></a>

次の Python スクリプトは、並列処理を使用して Amazon Glacier から大きなアーカイブをダウンロードする方法を示しています。

```
import boto3
import time
import json
import jmespath
import re
import concurrent.futures
import os

output_file_path = "{{output_directory_path}}"
vault_name = "{{vault_name}}"

chunk_size = 1000000000 #1gb - size of chunks for parallel download.
notify_queue_name = '{{GlacierJobCompleteNotifyQueue}}' # SQS queue for Glacier recall notification
chunk_download_queue_name='{{GlacierChunkReadyNotifyQueue}}' # SQS queue for chunks
sns_topic_name = '{{GlacierRecallJobCompleted}}' # the SNS topic to be notified when Glacier archive is restored.
chunk_queue_visibility_timeout = 7200 # 2 hours - this may need to be adjusted.
region = '{{us-east-1}}'
archive_id = "{{archive_id_to_restore}}"
retrieve_archive = True # set to false if you do not want to restore from Glacier - useful for restarting or parallel processing of the chunk queue.
workers = 12 # the number of parallel worker threads for downloading chunks. 

def setup_queues_and_topic():
    sqs = boto3.client('sqs')
    sns = boto3.client('sns')

    # Create the SNS topic
    topic_response = sns.create_topic(
        Name=sns_topic_name
    )
    topic_arn = topic_response['TopicArn']
    print("Creating the SNS topic " + topic_arn)

    # Create the notification queue
    notify_queue_response = sqs.create_queue(
        QueueName=notify_queue_name,
        Attributes={
            'VisibilityTimeout': '300',  # 5 minutes
            'ReceiveMessageWaitTimeSeconds': '20'  # Enable long polling
        }
    )
    notify_queue_url = notify_queue_response['QueueUrl']
    print("Creating the archive-retrieval notification queue " + notify_queue_url)

    # Create the chunk download queue
    chunk_queue_response = sqs.create_queue(
        QueueName=chunk_download_queue_name,
        Attributes={
            'VisibilityTimeout': str(chunk_queue_visibility_timeout),  # 5 minutes
            'ReceiveMessageWaitTimeSeconds': '0'
        }
    )
    chunk_queue_url = chunk_queue_response['QueueUrl']

    print("Creating the chunk ready notification queue " + chunk_queue_url)


   # Get the ARN for the notification queue
    notify_queue_attributes = sqs.get_queue_attributes(
        QueueUrl=notify_queue_url,
        AttributeNames=['QueueArn']
    )
    notify_queue_arn = notify_queue_attributes['Attributes']['QueueArn']

    # Set up the SNS topic policy on the notification queue
    queue_policy = {
        "Version": "2012-10-17",		 	 	 
        "Statement": [{
            "Sid": "allow-sns-messages",
            "Effect": "Allow",
            "Principal": {"AWS": "*"},
            "Action": "SQS:SendMessage",
            "Resource": notify_queue_arn,
            "Condition": {
                "ArnEquals": {
                    "aws:SourceArn": topic_arn
                }
            }
        }]
    }

    # Set the queue policy
    sqs.set_queue_attributes(
        QueueUrl=notify_queue_url,
        Attributes={
            'Policy': json.dumps(queue_policy)
        }
    )

    # Subscribe the notification queue to the SNS topic
    sns.subscribe(
        TopicArn=topic_arn,
        Protocol='sqs',
        Endpoint=notify_queue_arn
    )

    return {
        'topic_arn': topic_arn,
        'notify_queue_url': notify_queue_url,
        'chunk_queue_url': chunk_queue_url
    }


def split_and_send_chunks(archive_size, job_id,chunk_queue_url):
    ranges = []
    current = 0
    chunk_number = 0

    while current < archive_size:
        chunk_number += 1
        next_range = min(current + chunk_size - 1, archive_size - 1)
        ranges.append((current, next_range, chunk_number))
        current = next_range + 1

    # Send messages to SQS queue
    for start, end, chunk_number in ranges:
        body = {"start": start, "end": end, "job_id": job_id, "chunk_number": chunk_number}
        body = json.dumps(body)
        print("Sending SQS message for range:" + str(body))
        response = sqs.send_message(
            QueueUrl=chunk_queue_url,
            MessageBody=str(body)
        )

def GetJobOutputChunks(job_id, byterange, chunk_number):
    glacier = boto3.client('glacier')
    response = glacier.get_job_output(
        vaultName=vault_name,
        jobId=job_id,
        range=byterange,

    )

    with open(os.path.join(output_file_path,str(chunk_number)+".chunk"), 'wb') as output_file:
        output_file.write(response['body'].read())

    return response

def ReceiveArchiveReadyMessages(notify_queue_url,chunk_queue_url):

    response = sqs.receive_message(
        QueueUrl=notify_queue_url,
        AttributeNames=['All'],
        MaxNumberOfMessages=1,
        WaitTimeSeconds=20,
        MessageAttributeNames=['Message']
    )
    print("Polling archive retrieval job ready queue...")
    # Checking that there is a Messages key before proceeding. No 'Messages' key likely means the queue is empty

    if 'Messages' in response:
        print("Received a message from the archive retrieval job queue")
        jsonresponse = response
        # Loading the string into JSON and checking that ArchiveSizeInBytes key is present before continuing.
        jsonresponse=json.loads(jsonresponse['Messages'][0]['Body'])
        jsonresponse=json.loads(jsonresponse['Message'])
        if 'ArchiveSizeInBytes' in jsonresponse:
            receipt_handle = response['Messages'][0]['ReceiptHandle']    
            if jsonresponse['ArchiveSizeInBytes']:
                archive_size = jsonresponse['ArchiveSizeInBytes']

                print(f'Received message: {response}')      
                if archive_size > chunk_size:
                    split_and_send_chunks(archive_size, jsonresponse['JobId'],chunk_queue_url)

                    sqs.delete_message(
                    QueueUrl=notify_queue_url,
                    ReceiptHandle=receipt_handle)

            else:
                print("No ArchiveSizeInBytes value found in message")
                print(response)

    else:
        print('No messages available in the queue at this time.')

    time.sleep(1)

def ReceiveArchiveChunkMessages(chunk_queue_url):
    response = sqs.receive_message(
        QueueUrl=chunk_queue_url,
        AttributeNames=['All'],
        MaxNumberOfMessages=1,
        WaitTimeSeconds=0,
        MessageAttributeNames=['Message']
    )
    print("Polling archive chunk queue...")
    print(response)
    # Checking that there is a Messages key before proceeding. No 'Messages' key likely means the queue is empty
    if 'Messages' in response:
        jsonresponse = response
        # Loading the string into JSON and checking that ArchiveSizeInBytes key is present before continuing.
        jsonresponse=json.loads(jsonresponse['Messages'][0]['Body'])
        if 'job_id' in jsonresponse: #checking that there is a job id before continuing
            job_id = jsonresponse['job_id']
            byterange = "bytes="+str(jsonresponse['start']) + '-' + str(jsonresponse['end'])
            chunk_number = jsonresponse['chunk_number']
            receipt_handle = response['Messages'][0]['ReceiptHandle']
            if jsonresponse['job_id']:
                print(f'Received message: {response}')
                GetJobOutputChunks(job_id,byterange,chunk_number)
                sqs.delete_message(
                QueueUrl=chunk_queue_url,
                ReceiptHandle=receipt_handle)
    else:
        print('No messages available in the chunk queue at this time.')

def initiate_archive_retrieval(archive_id, topic_arn):
    glacier = boto3.client('glacier')

    job_parameters = {
        "Type": "archive-retrieval",
        "ArchiveId": archive_id,
        "Description": "Archive retrieval job",
        "SNSTopic": topic_arn,
        "Tier": "Bulk"  # You can change this to "Standard" or "Expedited" based on your needs
    }

    try:
        response = glacier.initiate_job(
            vaultName=vault_name,
            jobParameters=job_parameters
        )

        print("Archive retrieval job initiated:")
        print(f"Job ID: {response['jobId']}")
        print(f"Job parameters: {job_parameters}")
        print(f"Complete response: {json.dumps(response, indent=2)}")

        return response['jobId']

    except Exception as e:
        print(f"Error initiating archive retrieval job: {str(e)}")
        raise

def run_async_tasks(chunk_queue_url, workers):
    max_workers = workers  # Set the desired maximum number of concurrent tasks
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        for _ in range(max_workers):
            executor.submit(ReceiveArchiveChunkMessages, chunk_queue_url)

# One time setup of the necessary queues and topics. 
queue_and_topic_atts = setup_queues_and_topic()

topic_arn = queue_and_topic_atts['topic_arn']
notify_queue_url = queue_and_topic_atts['notify_queue_url']
chunk_queue_url = queue_and_topic_atts['chunk_queue_url']

if retrieve_archive:
    print("Retrieving the defined archive... The topic arn we will notify when recalling the archive is: "+topic_arn)
    job_id = initiate_archive_retrieval(archive_id, topic_arn)
else:
    print("Retrieve archive is false, polling queues and downloading only.")

while True:
   ReceiveArchiveReadyMessages(notify_queue_url,chunk_queue_url)
   run_async_tasks(chunk_queue_url,workers)
```

## スクリプトを使用する
<a name="downloading-large-archive-python-usage"></a>

このスクリプトを使用するには、次のステップに従います。

1. スクリプトのプレースホルダー値をユーザー固有の情報に置き換えます。
   + {{output\_file\_path}}: チャンクファイルが保存されるディレクトリ
   + {{vault\_name}}: S3 Glacier ボールトの名前
   + {{notify\_queue\_name}}: ジョブ通知キューの名前
   + {{chunk\_download\_queue\_name}}: チャンクダウンロードキューの名前
   + {{sns\_topic\_name}}: SNS トピックの名前
   + {{region}}: ボールトが配置されている AWS リージョン
   + {{archive\_id}}: 取得するアーカイブの ID

1.  スクリプトを実行します。

   ```
   python download_large_archive.py
   ```

1. すべてのチャンクをダウンロードしたら、次のようなコマンドを使用して 1 つのファイルにまとめることができます。

   ```
   cat /path/to/chunks/*.chunk > complete_archive.file
   ```

## 重要な考慮事項
<a name="downloading-large-archive-python-considerations"></a>

このスクリプトを使用する場合は、次の点に留意してください。
+ 選択した取得階層によって、S3 Glacier からのアーカイブの取得を完了するまでに数時間かかる場合があります。
+ スクリプトは無期限に実行され、キューを継続的にポーリングします。特定の要件に基づいて終了条件を追加することもできます。
+ アーカイブのすべてのチャンクを保存するのに十分なディスク容量があることを確認します。
+ スクリプトが中断された場合は、`retrieve_archive=False` で再起動して、新しい取得ジョブを開始せずにチャンクのダウンロードを続行できます。
+ ネットワーク帯域幅とシステムリソースに基づいて、{{chunk\_size}} パラメータと {{workers}} パラメータを調整します。
+ Amazon S3 の取得、Amazon SNS、Amazon SQS の使用には、標準 AWS 料金が適用されます。