AWS SDK for Rust での同時実行
AWS SDK for Rust は同時実行制御を提供しませんが、ユーザーは独自の同時実行制御を実装するための多くのオプションがあります。
用語
このテーマに関連する用語は混同されやすく、もともと別々の概念を表していたにもかかわらず、同義語となった用語もあります。このガイドでは、以下の用語を定義します。
-
タスク: プログラムが完了するまで実行する、または完了するまで実行を試みる「作業単位」。
-
シーケンシャルコンピューティング: 複数のタスクが順番に実行される場合。
-
同時実行コンピューティング: 複数のタスクが時間的に重なり合って実行される場合。
-
同時実行: コンピュータが複数のタスクを任意の順序で完了する能力。
-
マルチタスク: コンピュータが複数のタスクを同時に実行する能力。
-
レース条件: タスクの開始タイミング、またはタスクの処理時間に基づいてプログラムの動作が変化する場合。
-
競合: 共有リソースへのアクセスの競合。複数のタスクがリソースに同時にアクセスする場合、そのリソースは「競合中」になります。
-
デッドロック: これ以上進行できない状態。これは通常、2 つのタスクが互いのリソースを取得しようとしているが、どちらのタスクも他方のリソースが利用可能になるまでリソースを解放しないために発生します。デッドロックが発生すると、プログラムが部分的または完全に応答しなくなります。
シンプルな例
最初の例は、シーケンシャルプログラムです。後の例では、このコードを同時実行手法で変更します。後の例では、同じ build_client_and_list_objects_to_download() メソッドを再利用して、main() 内で変更を加えます。次のコマンドを実行して、プロジェクトに依存関係を追加します。
-
cargo add aws-sdk-s3 -
cargo add aws-config tokio --features tokio/full
次のタスク例では、Amazon Simple Storage Service バケット内のすべてのファイルをダウンロードしています。
-
まず、すべてのファイルを一覧表示します。キーをリストに保存します。
-
リストをイテレーションして、各ファイルを順番にダウンロードする
use aws_sdk_s3::{Client, Error}; const EXAMPLE_BUCKET: &str = "amzn-s3-demo-bucket"; // Update to name of bucket you own. // This initialization function won't be reproduced in // examples following this one, in order to save space. async fn build_client_and_list_objects_to_download() -> (Client, Vec<String>) { let cfg = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; let client = Client::new(&cfg); let objects_to_download: Vec<_> = client .list_objects_v2() .bucket(EXAMPLE_BUCKET) .send() .await .expect("listing objects succeeds") .contents() .into_iter() .flat_map(aws_sdk_s3::types::Object::key) .map(ToString::to_string) .collect(); (client, objects_to_download) }
#[tokio::main] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; for object in objects_to_download { let res = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET) .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("reading body succeeds").into_bytes(); std::fs::write(object, body).expect("write succeeds"); } }
注記
これらの例では、エラーは処理されず、サンプルバケットにファイルパスに似たキーを持つオブジェクトが存在しないことを前提としています。したがって、ネストされたディレクトリの作成については説明しません。
最新のコンピュータのアーキテクチャを使用すれば、このプログラムをはるかに効率的なものに書き直すことができます。これについては後の例で説明しますが、まずは概念についてさらにいくつか説明します。
所有権と可変性
Rust のそれぞれの値には単一の所有者を持ちます。所有者が範囲から外れると、その所有者が所有するすべての値も削除されます。所有者は、1 つ以上の値に対する変更不可能な参照または 1 つの変更可能な参照を提供できます。Rust コンパイラは、参照がその所有者を越えて存在しないことを保証します。
複数のタスクが同じリソースに可変的にアクセスする必要がある場合は、追加の計画と設計が必要になります。シーケンシャルコンピューティングでは、各タスクは順番に実行されるため、競合することなく可変的に同じリソースにアクセスできます。ただし、同時コンピューティングでは、タスクは任意の順序で同時に実行できます。したがって、複数の変更可能な参照が不可能であることをコンパイラに証明するため (または、少なくともクラッシュするように)、さらに多くの対策が必要です。
Rust 標準ライブラリには、これを実現するための多くのツールが用意されています。これらのトピックの詳細については、「Rust Programming Language」書籍の「変数と可変性
その他の用語
以下は「同期オブジェクト」の一覧です。これらは、コンパイラに対して同時実行プログラムが所有権ルールに反しないことを保証するために必要な手段です。
-
Arc
: アトミック参照カウント型ポインタ。 Arcでラップされたデータは、特定の所有者がその値を早期に削除することを気にせずに、自由に共有できます。この意味では、値の所有権は「共有」になります。Arc内の値は変更できませんが、内部可変性がある場合があります。 -
Barrier
: これにより、複数のスレッドがプログラム内の特定のポイントに到達するまで互いに待機し、その後まとめて実行を継続します。 -
Condvar
: 条件変数で、イベントの発生を待機している間、スレッドをブロックする機能を提供します。 -
Mutex
: 相互排他メカニズムで、特定のデータに対して同時にアクセスできるスレッドが最大で 1 つであることを保証します。一般的に、 Mutexロックは、コード内の.awaitポイントにまたがって保持することは避けてください。
AWS SDK は async ランタイムに依存しないことを想定していますが、特定のケースでは tokio 同期オブジェクトを使用することをお勧めします。
例をより効率的に書き直す (シングルスレッド同時実行)
次の変更例では、futures_util::future::join_allget_object リクエストを同時に実行します。次のコマンドを実行して、プロジェクトに新しい依存関係を追加します。
-
cargo add futures-util
#[tokio::main] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; let get_object_futures = objects_to_download.into_iter().map(|object| { let req = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET); async { let res = req .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("body succeeds").into_bytes(); // Note that we MUST use the async runtime's preferred way // of writing files. Otherwise, this call would block, // potentially causing a deadlock. tokio::fs::write(object, body).await.expect("write succeeds"); } }); futures_util::future::join_all(get_object_futures).await; }
これは同時実行のメリットを享受する最も簡単な方法ですが、最初はわかりにくい問題が一部あります。
-
すべてのリクエスト入力を同時に作成します。すべての
get_objectリクエスト入力を保持するのに十分なメモリがない場合、「メモリ不足」割り当てエラーが発生します。 -
すべての Future を同時に作成し、待機します。Amazon S3 では、一度に大量にダウンロードしようとするとリクエストをスロットリングします。
これらの問題を両方とも解決するには、一度に送信するリクエスト量を制限する必要があります。これを行うには、tokio セマフォ
use std::sync::Arc; use tokio::sync::Semaphore; const CONCURRENCY_LIMIT: usize = 50; #[tokio::main(flavor = "current_thread")] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; let concurrency_semaphore = Arc::new(Semaphore::new(CONCURRENCY_LIMIT)); let get_object_futures = objects_to_download.into_iter().map(|object| { // Since each future needs to acquire a permit, we need to clone // the Arc'd semaphore before passing it in. let semaphore = concurrency_semaphore.clone(); // We also need to clone the client so each task has its own handle. let client = client.clone(); async move { let permit = semaphore .acquire() .await .expect("we'll get a permit if we wait long enough"); let res = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET) .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("body succeeds").into_bytes(); tokio::fs::write(object, body).await.expect("write succeeds"); std::mem::drop(permit); } }); futures_util::future::join_all(get_object_futures).await; }
リクエストの作成を async ブロックに移動することにより、潜在的なメモリ使用量の問題を修正しました。この方法により、リクエストは送信するタイミングまで作成されません。
注記
メモリに余裕がある場合は、すべてのリクエスト入力を一度に作成し、送信の準備ができるまでメモリに保持する方が効率的な場合があります。これを試すには、リクエスト入力の作成を async ブロックの外側に移動します。
また、転送中のリクエストを CONCURRENCY_LIMIT に制限することにより、一度に大量のリクエストが送信される問題を修正しました。
注記
CONCURRENCY_LIMIT の適切な値はプロジェクトごとに異なります。独自のリクエストを作成して送信する場合は、スロットリングエラーが発生しないように、できるだけ高く設定してください。サービスが返す成功レスポンスとスロットリングレスポンスの比率に基づいて、同時実行の制限を動的に更新することは可能ですが、その複雑さのため、このガイドでは範囲外となります。
例をより効率的に書き直す (マルチスレッド同時実行)
前の 2 つの例では、リクエストを同時に実行しました。これは同期的に実行するよりも効率的ですが、マルチスレッドを使用することにより、さらに効率が高まります。tokio でこれを行うには、それらを別々のタスクとして生成する必要があります。
注記
この例では、マルチスレッド tokio ランタイムを使用する必要があります。このランタイムは rt-multi-thread 機能の背後で制限されます。また、当然ですが、マルチコアマシンでプログラムを実行する必要があります。
次のコマンドを実行して、プロジェクトに新しい依存関係を追加します。
-
cargo add tokio --features=rt-multi-thread
// Set this based on the amount of cores your target machine has. const THREADS: usize = 8; #[tokio::main(flavor = "multi_thread")] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; let concurrency_semaphore = Arc::new(Semaphore::new(THREADS)); let get_object_task_handles = objects_to_download.into_iter().map(|object| { // Since each future needs to acquire a permit, we need to clone // the Arc'd semaphore before passing it in. let semaphore = concurrency_semaphore.clone(); // We also need to clone the client so each task has its own handle. let client = client.clone(); // Note this difference! We're using `tokio::task::spawn` to // immediately begin running these requests. tokio::task::spawn(async move { let permit = semaphore .acquire() .await .expect("we'll get a permit if we wait long enough"); let res = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET) .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("body succeeds").into_bytes(); tokio::fs::write(object, body).await.expect("write succeeds"); std::mem::drop(permit); }) }); futures_util::future::join_all(get_object_task_handles).await; }
作業をタスクに分割するのは複雑な場合があります。通常、I/O (入力/出力) の実行はブロックされます。ランタイムは、長時間実行されるタスクのニーズと短時間実行されるタスクのニーズのバランスをとるのが難しい場合があります。どのランタイムを選択する場合でも、作業をタスクに分割する最も効率的な方法についての推奨事項を必ず確認してください。tokio ランタイムの推奨事項については、「モジュール tokio::task
マルチスレッドアプリケーションのデバッグ
同時に実行されるタスクは、任意の順序で実行できます。そのため、同時プログラムのログは非常に読みにくい場合があります。SDK for Rust では、tracing ロギングシステムを使用することをお勧めします。実行中であっても、ログを特定のタスクでグループ化できます。ガイダンスについては、「AWS SDK for Rust でのログ記録の設定と使用」を参照してください。
ロックされたタスクを特定するための非常に便利なツールとして tokio-consoletokio-console アプリケーションを実行すると、プログラムが実行されているタスクのライブビューが表示されます。このビューには、タスクが共有リソースの取得を待機するのにかかった時間や、ポーリングされた時間などの有用な情報が含まれています。