本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
适用于 Rust 的 AWS SDK 中的并发性
适用于 Rust 的 AWS SDK 不提供并发控制,但用户有很多选择可以自行实现并发控制。
术语
与该主题相关的术语很容易混淆,有些术语虽然最初代表不同的概念,但现在已经变成了同义词。在本指南中,我们将定义以下内容:
-
任务:您的程序将运行至完成或尝试运行至完成的某些“工作单元”。
-
顺序计算:一个接一个地执行多个任务。
-
并行计算:在重叠的时间段内执行多个任务。
-
并发性:计算机以任意顺序完成多个任务的能力。
-
多任务处理:计算机同时运行多个任务的能力。
-
竞态条件:程序的行为根据任务启动时间或处理任务所需的时间而发生变化。
-
争用:因访问共享资源而发生的冲突。当两个或多个任务想要同时访问一个资源时,该资源即处于“争用状态”。
-
死锁:一种无法取得更多进展的状态。之所以发生这种情况,通常是因为两个任务都想要获取对方的资源,但在另一个任务的资源可用之前,两个任务都不会释放自己的资源。死锁会导致程序部分或完全没有响应。
一个简单示例
我们的第一个示例是一个顺序程序。在后面的示例中,我们将使用并发技术更改此代码。后面的示例重复使用相同的 build_client_and_list_objects_to_download() 方法并在 main() 中进行更改。运行以下命令来添加项目的依赖项:
-
cargo add aws-sdk-s3 -
cargo add aws-config tokio --features tokio/full
以下示例任务是下载 Amazon Simple Storage Service 存储桶中的所有文件:
-
首先列出所有文件。将密钥保存在列表中。
-
遍历列表,依次下载每个文件
use aws_sdk_s3::{Client, Error}; const EXAMPLE_BUCKET: &str = "amzn-s3-demo-bucket"; // Update to name of bucket you own. // This initialization function won't be reproduced in // examples following this one, in order to save space. async fn build_client_and_list_objects_to_download() -> (Client, Vec<String>) { let cfg = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; let client = Client::new(&cfg); let objects_to_download: Vec<_> = client .list_objects_v2() .bucket(EXAMPLE_BUCKET) .send() .await .expect("listing objects succeeds") .contents() .into_iter() .flat_map(aws_sdk_s3::types::Object::key) .map(ToString::to_string) .collect(); (client, objects_to_download) }
#[tokio::main] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; for object in objects_to_download { let res = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET) .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("reading body succeeds").into_bytes(); std::fs::write(object, body).expect("write succeeds"); } }
注意
在这些示例中,我们不会处理错误,并且我们假设示例存储桶中没有密钥看起来像文件路径的对象。因此,我们不会介绍如何创建嵌套目录。
由于现代化计算机的架构,我们可以重写这个程序以提高效率。我们将在后面的示例中这样做,但首先,让我们再学习几个概念。
所有权和可变性
Rust 中的每个值都有一个所有者。当所有者超出范围时,其拥有的所有值也将被删除。所有者可以提供一个或多个指向某个值的不可变引用,或者提供一个可变引用。Rust 编译器负责确保任何引用的有效期都不会超过其所有者。
当多个任务需要以可变方式访问同一个资源时,需要进行额外的规划和设计。在顺序计算中,每个任务都能以可变方式访问相同的资源而不会发生争用,因为它们按顺序一个接一个地运行。但是,在并行计算中,任务可以按任意顺序同时运行。因此,我们必须做更多的工作来向编译器证明多个可变引用是不可能的(或者在发生这种情况时至少会导致崩溃)。
Rust 标准库提供了许多工具来帮助我们实现这一目标。有关这些主题的更多信息,请参阅《Rust 编程语言》一书中的变量与可变性
更多术语!
以下是“同步对象”列表。总而言之,它们是使编译器相信我们的并发程序不会违反所有权规则所必需的工具。
虽然 AWS SDK 旨在与 async 运行时无关,但我们建议在特定情况下使用 tokio 同步对象。
重写我们的示例以提高效率(单线程并发)
在以下修改后的示例中,我们使用 futures_util::future::join_allget_object请求。运行以下命令来添加项目的新依赖项。
-
cargo add futures-util
#[tokio::main] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; let get_object_futures = objects_to_download.into_iter().map(|object| { let req = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET); async { let res = req .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("body succeeds").into_bytes(); // Note that we MUST use the async runtime's preferred way // of writing files. Otherwise, this call would block, // potentially causing a deadlock. tokio::fs::write(object, body).await.expect("write succeeds"); } }); futures_util::future::join_all(get_object_futures).await; }
这是从并发中受益的最简单方法,但它也有一些最初可能并不明显的问题:
-
我们同时创建所有请求输入。如果我们没有足够的内存来容纳所有
get_object请求输入,那么我们将遇到“内存不足”分配错误。 -
我们同时创造和等待所有 Future。如果我们试图一次下载过多内容,Amazon S3 会限制请求。
要解决这两个问题,我们必须限制同时发送的请求数量。我们将使用 tokio 信号量
use std::sync::Arc; use tokio::sync::Semaphore; const CONCURRENCY_LIMIT: usize = 50; #[tokio::main(flavor = "current_thread")] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; let concurrency_semaphore = Arc::new(Semaphore::new(CONCURRENCY_LIMIT)); let get_object_futures = objects_to_download.into_iter().map(|object| { // Since each future needs to acquire a permit, we need to clone // the Arc'd semaphore before passing it in. let semaphore = concurrency_semaphore.clone(); // We also need to clone the client so each task has its own handle. let client = client.clone(); async move { let permit = semaphore .acquire() .await .expect("we'll get a permit if we wait long enough"); let res = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET) .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("body succeeds").into_bytes(); tokio::fs::write(object, body).await.expect("write succeeds"); std::mem::drop(permit); } }); futures_util::future::join_all(get_object_futures).await; }
我们通过将请求创建移到 async 数据块中,解决了潜在内存使用问题。这样,只有在需要发送请求时才会创建请求。
注意
如果内存足够,一次性创建所有请求输入并将它们保存在内存中,直到准备好发送为止,可能会更有效率。要尝试此操作,请将请求输入创建移到 async 数据块之外。
我们还通过将传输中的请求限制为发送至 CONCURRENCY_LIMIT,解决了同时发送过多请求的问题。
注意
每个项目的正确 CONCURRENCY_LIMIT 值都不一样。在构建和发送自己的请求时,尽量将其设置得尽可能高,以免遇到节流错误。虽然可以根据服务返回的成功响应与节流响应的比率动态更新并发限制,但由于其复杂性,这超出了本指南的范围。
重写我们的示例以提高效率(多线程并发)
在前两个示例中,我们并行执行了请求。虽然这比同步运行效率更高,但我们可以使用多线程来进一步提高效率。要使用 tokio 实现此目标,我们需要将它们作为单独的任务生成。
注意
此示例要求您使用多线程 tokio 运行时。此运行时需要启用 rt-multi-thread 功能才能使用。当然,您需要在多核机器上运行您的程序。
运行以下命令来添加项目的新依赖项。
-
cargo add tokio --features=rt-multi-thread
// Set this based on the amount of cores your target machine has. const THREADS: usize = 8; #[tokio::main(flavor = "multi_thread")] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; let concurrency_semaphore = Arc::new(Semaphore::new(THREADS)); let get_object_task_handles = objects_to_download.into_iter().map(|object| { // Since each future needs to acquire a permit, we need to clone // the Arc'd semaphore before passing it in. let semaphore = concurrency_semaphore.clone(); // We also need to clone the client so each task has its own handle. let client = client.clone(); // Note this difference! We're using `tokio::task::spawn` to // immediately begin running these requests. tokio::task::spawn(async move { let permit = semaphore .acquire() .await .expect("we'll get a permit if we wait long enough"); let res = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET) .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("body succeeds").into_bytes(); tokio::fs::write(object, body).await.expect("write succeeds"); std::mem::drop(permit); }) }); futures_util::future::join_all(get_object_task_handles).await; }
将工作划分为任务可能很复杂。执行 I/O(输入/输出)通常会导致阻塞。运行时可能很难在长时间运行的任务的需求和短期任务的需求之间取得平衡。无论您选择哪种运行时,请务必阅读其建议,以最有效的方式将您的工作划分为任务。有关 tokio 运行时建议,请参阅模块 tokio::task
调试多线程应用程序
并发运行的任务可以按任意顺序运行。因此,并发程序的日志可能很难读取。在适用于 Rust 的 SDK 中,我们建议使用 tracing 日志记录系统。它可以根据特定任务将日志分组,而无论它们何时运行。有关指南,请参阅在适用于 Rust 的 AWS SDK 中配置和使用日志记录。
识别已锁定任务的一个非常有用的工具是 tokio-consoletokio-console 应用程序,您可以看到程序正在运行的任务的实时视图。此视图包含有用的信息,例如任务等待获取共享资源所花费的时间或被轮询的次数。