기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
AWS SDK for Rust의 동시성
AWS SDK for Rust는 동시성 제어를 제공하지 않지만 사용자에게는 자체 구현을 위한 다양한 옵션이 있습니다.
용어
이 주제와 관련된 용어는 혼동하기 쉬우며 일부 용어는 원래 별도의 개념을 나타내었지만 동의어가 되었습니다. 이 가이드에서는 다음을 정의합니다.
-
작업: 프로그램이 완료될 때까지 실행되거나 완료될 때까지 실행을 시도하는 일련의 ‘작업 단위’입니다.
-
순차 컴퓨팅: 여러 작업이 차례로 실행되는 경우입니다.
-
동시 컴퓨팅: 여러 작업이 중첩되는 기간에 실행되는 경우입니다.
-
동시성: 컴퓨터가 임의의 순서로 여러 작업을 완료할 수 있는 기능입니다.
-
멀티태스킹: 컴퓨터가 여러 작업을 동시에 실행하는 기능입니다.
-
레이스 조건: 작업이 시작된 시간 또는 작업을 처리하는 데 걸리는 시간에 따라 프로그램의 동작이 변경되는 경우입니다.
-
경합: 공유 리소스에 대한 액세스와 충돌합니다. 둘 이상의 작업이 리소스에 동시에 액세스하려는 경우 해당 리소스는 ‘경합’ 상태에 있습니다.
-
교착 상태: 더 이상 진행할 수 없는 상태입니다. 이는 일반적으로 두 작업이 서로의 리소스를 획득하려고 하지만 두 작업 모두 다른 작업의 리소스를 사용할 수 있을 때까지 리소스를 해제하지 않기 때문에 발생합니다. 교착 상태로 인해 프로그램이 부분적으로 또는 완전히 응답하지 않게 됩니다.
간단한 예제
첫 번째 예제는 순차적 프로그램입니다. 이후 예제에서는 동시성 기술을 사용하여 이 코드를 변경하겠습니다. 이후 예제에서는 동일한 build_client_and_list_objects_to_download() 메서드를 재사용하고 main() 내에서 변경합니다. 다음 명령을 실행하여 프로젝트의 종속성을 설치하세요.
-
cargo add aws-sdk-s3 -
cargo add aws-config tokio --features tokio/full
다음 예제 작업은 Amazon Simple Storage Service 버킷의 모든 파일을 다운로드하는 것입니다.
-
먼저 모든 파일을 나열합니다. 목록에 키를 저장합니다.
-
목록을 반복하여 각 파일을 차례로 다운로드합니다.
use aws_sdk_s3::{Client, Error}; const EXAMPLE_BUCKET: &str = "amzn-s3-demo-bucket"; // Update to name of bucket you own. // This initialization function won't be reproduced in // examples following this one, in order to save space. async fn build_client_and_list_objects_to_download() -> (Client, Vec<String>) { let cfg = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; let client = Client::new(&cfg); let objects_to_download: Vec<_> = client .list_objects_v2() .bucket(EXAMPLE_BUCKET) .send() .await .expect("listing objects succeeds") .contents() .into_iter() .flat_map(aws_sdk_s3::types::Object::key) .map(ToString::to_string) .collect(); (client, objects_to_download) }
#[tokio::main] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; for object in objects_to_download { let res = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET) .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("reading body succeeds").into_bytes(); std::fs::write(object, body).expect("write succeeds"); } }
참고
이 예제에서는 오류를 처리하지 않으며 예제 버킷에 파일 경로처럼 보이는 키가 있는 객체가 없다고 가정합니다. 따라서 중첩 디렉터리 생성은 다루지 않습니다.
최신 컴퓨터의 아키텍처로 인해 이 프로그램을 훨씬 더 효율적으로 다시 작성할 수 있습니다. 이후 예제에서는 이 작업을 수행하지만 먼저 몇 가지 개념을 더 살펴보겠습니다.
소유권 및 변경 가능성
Rust의 각 값에는 단일 소유자가 있습니다. 소유자가 범위를 벗어나면 소유한 모든 값도 삭제됩니다. 소유자는 값에 대한 변경 불가능한 참조 하나 이상 또는 변경 가능한 참조 하나를 제공할 수 있습니다. Rust 컴파일러는 참조가 소유자보다 오래 지속되지 않도록 할 책임이 있습니다.
여러 작업이 동일한 리소스에 변경 가능하게 액세스해야 하는 경우 추가 계획 및 설계가 필요합니다. 순차 컴퓨팅에서 각 작업은 시퀀스에서 차례로 실행되므로 경합 없이 동일한 리소스에 변경 가능하게 액세스할 수 있습니다. 그러나 동시 컴퓨팅에서는 작업이 어떤 순서로든 동시에 실행될 수 있습니다. 따라서 여러 개의 변경 가능한 참조가 불가능하다는 것을 컴파일러에 증명하기 위해(또는 적어도 충돌이 발생하는 경우 충돌을 증명하기 위해) 더 많은 작업을 수행해야 합니다.
Rust 표준 라이브러리는 이를 달성하는 데 도움이 되는 다양한 도구를 제공합니다. 이러한 주제에 대한 자세한 내용은 Rust 프로그래밍 언어 북의 변수 및 변경 가능성
더 많은 용어를 소개합니다!
다음은 ‘동기화 객체’의 목록입니다. 또한 동시 프로그램이 소유권 규칙을 위반하지 않을 것이라고 컴파일러를 설득하는 데 필요한 도구입니다.
-
Arc
: Atomically Reference-Counted 포인터입니다. 데이터가 Arc에 래핑되면 특정 소유자가 값을 조기에 삭제할 염려 없이 자유롭게 공유할 수 있습니다. 이러한 의미에서 값의 소유권은 ‘공유’가 됩니다.Arc내의 값은 변경할 수 없지만 내부는 변경할 수 있습니다. -
장벽
: 여러 스레드가 프로그램의 특정 지점에 도달할 때까지 기다렸다가 계속 실행합니다. -
Condvar
: 이벤트가 발생할 때까지 기다리는 동안 스레드를 차단하는 기능을 제공하는 Condition Variable입니다. -
뮤텍스
: 한 번에 최대 하나의 스레드가 일부 데이터에 액세스할 수 있도록 하는 Mutual Exclusion 메커니즘입니다. 일반적으로 Mutex잠금은 코드의 한.await지점에서 보유해서는 안 됩니다.
AWS SDK는 async 런타임에 구애받지 않도록 설계되었지만 특정 경우에 tokio 동기화 객체를 사용하는 것이 좋습니다.
더 효율적으로 예제를 다시 작성(단일 스레드 동시성)
다음 수정된 예제에서는 futures_util::future::join_allget_object 요청을 동시에 실행합니다. 다음 명령을 실행하여 프로젝트의 새 종속성을 설치하세요.
-
cargo add futures-util
#[tokio::main] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; let get_object_futures = objects_to_download.into_iter().map(|object| { let req = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET); async { let res = req .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("body succeeds").into_bytes(); // Note that we MUST use the async runtime's preferred way // of writing files. Otherwise, this call would block, // potentially causing a deadlock. tokio::fs::write(object, body).await.expect("write succeeds"); } }); futures_util::future::join_all(get_object_futures).await; }
이는 동시성을 활용하는 가장 간단한 방법이지만, 처음에는 명확하지 않을 수 있는 몇 가지 문제도 있습니다.
-
모든 요청 입력을 동시에 생성합니다. 모든
get_object요청 입력을 보관할 메모리가 충분하지 않으면 ‘out-of-memory’ 할당 오류가 발생합니다. -
모든 future를 동시에 생성하고 기다립니다. Amazon S3는 한 번에 너무 많이 다운로드하려고 하면 요청을 스로틀링합니다.
이 두 가지 문제를 모두 해결하려면 한 번에 보내는 요청의 양을 제한해야 합니다. tokio semaphore
use std::sync::Arc; use tokio::sync::Semaphore; const CONCURRENCY_LIMIT: usize = 50; #[tokio::main(flavor = "current_thread")] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; let concurrency_semaphore = Arc::new(Semaphore::new(CONCURRENCY_LIMIT)); let get_object_futures = objects_to_download.into_iter().map(|object| { // Since each future needs to acquire a permit, we need to clone // the Arc'd semaphore before passing it in. let semaphore = concurrency_semaphore.clone(); // We also need to clone the client so each task has its own handle. let client = client.clone(); async move { let permit = semaphore .acquire() .await .expect("we'll get a permit if we wait long enough"); let res = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET) .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("body succeeds").into_bytes(); tokio::fs::write(object, body).await.expect("write succeeds"); std::mem::drop(permit); } }); futures_util::future::join_all(get_object_futures).await; }
요청 생성을 async 블록으로 이동하여 잠재적인 메모리 사용량 문제를 해결했습니다. 이렇게 하면 요청을 보낼 때까지 요청이 생성되지 않습니다.
참고
메모리가 있는 경우 모든 요청 입력을 한 번에 생성하고 전송할 준비가 될 때까지 메모리에 보관하는 것이 더 효율적일 수 있습니다. 이렇게 하려면 요청 입력 생성을 async 블록 외부로 이동합니다.
또한 진행 중인 요청을 CONCURRENCY_LIMIT로 제한하여 한 번에 너무 많은 요청을 보내는 문제를 해결했습니다.
참고
CONCURRENCY_LIMIT의 올바른 값은 프로젝트마다 다릅니다. 자체 요청을 구성하고 전송할 때 제한 오류가 발생하지 않도록 최대한 높게 설정해 보세요. 서비스가 반송하는 응답의 성공과 제한된 응답의 비율을 기반으로 동시성 제한을 동적으로 업데이트할 수 있지만 복잡성으로 인해 이 가이드의 범위를 벗어납니다.
더 효율적으로 예제를 다시 작성(다중 스레드 동시성)
이전 두 예제에서는 요청을 동시에 수행했습니다. 이는 동기식으로 실행하는 것보다 효율적이지만 멀티스레딩을 사용하여 사물을 훨씬 더 효율적으로 만들 수 있습니다. tokio를 사용하여 이 작업을 수행하려면 별도의 작업으로 생성해야 합니다.
참고
이 예제에서는 다중 스레드 tokio 런타임을 사용해야 합니다. 이 런타임은 rt-multi-thread 기능 뒤에서 제한됩니다. 물론 멀티 코어 시스템에서 프로그램을 실행해야 합니다.
다음 명령을 실행하여 프로젝트의 새 종속성을 설치하세요.
-
cargo add tokio --features=rt-multi-thread
// Set this based on the amount of cores your target machine has. const THREADS: usize = 8; #[tokio::main(flavor = "multi_thread")] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; let concurrency_semaphore = Arc::new(Semaphore::new(THREADS)); let get_object_task_handles = objects_to_download.into_iter().map(|object| { // Since each future needs to acquire a permit, we need to clone // the Arc'd semaphore before passing it in. let semaphore = concurrency_semaphore.clone(); // We also need to clone the client so each task has its own handle. let client = client.clone(); // Note this difference! We're using `tokio::task::spawn` to // immediately begin running these requests. tokio::task::spawn(async move { let permit = semaphore .acquire() .await .expect("we'll get a permit if we wait long enough"); let res = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET) .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("body succeeds").into_bytes(); tokio::fs::write(object, body).await.expect("write succeeds"); std::mem::drop(permit); }) }); futures_util::future::join_all(get_object_task_handles).await; }
작업(work)을 작업(task) 단위로 나누는 것은 복잡할 수 있습니다. I/O(입력/출력)를 수행하는 것은 일반적으로 차단됩니다. 런타임은 장기 실행 작업의 요구 사항과 단기 실행 작업의 요구 사항의 균형을 맞추는 데 어려움을 겪을 수 있습니다. 어떤 런타임을 선택하든 작업(work)을 작업(task) 단위로 나누는 가장 효율적인 방법을 위해 권장 사항을 읽어야 합니다. tokio 런타임 권장 사항은 모듈 tokio::task
다중 스레드 앱 디버깅
동시에 실행되는 작업은 어떤 순서로든 실행할 수 있습니다. 따라서 동시 프로그램의 로그를 읽기가 매우 어려울 수 있습니다. SDK for Rust에서는 tracing 로깅 시스템을 사용하는 것이 좋습니다. 실행 시점에 관계없이 로그를 특정 작업으로 그룹화할 수 있습니다. 자세한 지침은 AWS SDK for Rust에서 로깅 구성 및 사용을 참조하세요.
잠긴 작업을 식별하는 데 매우 유용한 도구는 비동기 Rust 프로그램을 위한 진단 및 디버깅 도구인 tokio-consoletokio-console 앱을 실행하면 프로그램이 실행 중인 작업을 실시간으로 볼 수 있습니다. 이 보기에는 작업이 공유 리소스 획득을 기다리는 데 소요된 시간 또는 폴링된 시간과 같은 유용한 정보가 포함되어 있습니다.