Simultaneidade no AWS SDK para Rust
O AWS SDK para Rust não fornece controle de simultaneidade, mas os usuários têm muitas opções para implementar seus próprios controles.
Termos
Os termos relacionados a esse assunto são fáceis de confundir, e alguns deles até se tornaram sinônimos, embora originalmente representassem conceitos diferentes. Neste guia, definiremos o seguinte:
-
Tarefa: uma “unidade de trabalho” que seu programa executará ou tentará executar até a conclusão.
-
Computação sequencial: quando várias tarefas são executadas uma após a outra.
-
Computação simultânea: quando várias tarefas são executadas em períodos sobrepostos.
-
Simultaneidade: a capacidade de um computador concluir várias tarefas em uma ordem arbitrária.
-
Multitarefa: a capacidade de um computador executar várias tarefas simultaneamente.
-
Condição de corrida: quando o comportamento do seu programa muda com base no momento em que uma tarefa é iniciada ou no tempo necessário para processá-la.
-
Contenção: conflito sobre o acesso a um recurso compartilhado. Quando duas ou mais tarefas desejam acessar um recurso ao mesmo tempo, esse recurso está “em contenção”.
-
Deadlock: um estado em que não é possível progredir. Isso normalmente acontece porque duas tarefas desejam adquirir os recursos uma da outra, mas nenhuma delas liberará seus recursos até que o recurso da outra esteja disponível. Os deadlocks fazem com que um programa fique parcial ou totalmente sem resposta.
Um exemplo simples
Nosso primeiro exemplo é um programa sequencial. Em exemplos posteriores, alteraremos esse código usando técnicas de simultaneidade. Exemplos posteriores reutilizam o mesmo método build_client_and_list_objects_to_download() e fazem alterações em main(). Execute os seguintes comandos para adicionar dependências ao seu projeto:
-
cargo add aws-sdk-s3 -
cargo add aws-config tokio --features tokio/full
O seguinte exemplo de tarefa é para baixar todos os arquivos em um bucket do Amazon Simple Storage Service:
-
Comece listando todos os arquivos. Salve as chaves em uma lista.
-
Itere sobre a lista, baixando um arquivo por vez
use aws_sdk_s3::{Client, Error}; const EXAMPLE_BUCKET: &str = "amzn-s3-demo-bucket"; // Update to name of bucket you own. // This initialization function won't be reproduced in // examples following this one, in order to save space. async fn build_client_and_list_objects_to_download() -> (Client, Vec<String>) { let cfg = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; let client = Client::new(&cfg); let objects_to_download: Vec<_> = client .list_objects_v2() .bucket(EXAMPLE_BUCKET) .send() .await .expect("listing objects succeeds") .contents() .into_iter() .flat_map(aws_sdk_s3::types::Object::key) .map(ToString::to_string) .collect(); (client, objects_to_download) }
#[tokio::main] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; for object in objects_to_download { let res = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET) .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("reading body succeeds").into_bytes(); std::fs::write(object, body).expect("write succeeds"); } }
nota
Nesses exemplos, não trataremos de erros e presumimos que o bucket de exemplo não tenha objetos com chaves que pareçam com caminhos de arquivo. Portanto, não abordaremos a criação de diretórios aninhados.
Devido à arquitetura dos computadores modernos, podemos reescrever esse programa para ser muito mais eficiente. Faremos isso em um exemplo posterior, mas primeiro, vamos aprender mais alguns conceitos.
Propriedade e mutabilidade
Cada valor no Rust tem um único proprietário. Quando um proprietário sai do escopo, todos os valores que ele possui também são descartados. O proprietário pode fornecer uma ou mais referências imutáveis a um valor ou uma única referência mutável. O compilador do Rust é responsável por garantir que nenhuma referência ultrapasse o limite do proprietário.
Planejamento e design adicionais são necessários quando várias tarefas precisam acessar mutavelmente o mesmo recurso. Na computação sequencial, cada tarefa pode acessar mutavelmente o mesmo recurso sem contenção porque elas são executadas uma após a outra em sequência. No entanto, na computação simultânea, as tarefas podem ser executadas em qualquer ordem e ao mesmo tempo. Portanto, devemos fazer mais para provar ao compilador que não é possível executar várias referências mutáveis (ou pelo menos travar se ocorrerem).
A biblioteca padrão do Rust fornece muitas ferramentas para ajudar. Para obter mais informações sobre esses tópicos, consulte Variables and Mutability
Mais termos!
As listas a seguir são de “objetos de sincronização”. Juntas, elas são as ferramentas necessárias para convencer o compilador de que nosso programa simultâneo não violará as regras de propriedade.
Objetos de sincronização de biblioteca padrão
-
Arc
: um ponteiro contador referenciado atomicamente. Quando os dados são agrupados em um Arc, eles podem ser compartilhados livremente, sem se preocupar com nenhum proprietário específico descartando o valor mais cedo. Nesse sentido, a propriedade do valor se torna “compartilhada”. Os valores em umArcnão podem ser mutáveis, mas podem ter mutabilidade interna. -
Barrier
: garante que vários encadeamentos aguardem uns aos outros para chegar a um ponto no programa antes de continuarem a execução juntos. -
Condvar
: uma variável de condição que fornece a capacidade de bloquear um encadeamento enquanto se espera outro evento ocorrer. -
Mutex
: um mecanismo de exclusão mútua que garante que no máximo um encadeamento por vez possa acessar alguns dados. De modo geral, um bloqueio de Mutexnunca deve ser colocado em um ponto.awaitdo código.
Objetos de sincronização do Tokio
Embora os AWS SDKs tenham a intenção de ser async independentes de runtime, recomendamos usar objetos de sincronização do tokio para casos específicos.
Como reescrever o exemplo para ser mais eficiente (simultaneidade de thread único)
No exemplo modificado a seguir, usamos futures_util::future::join_allget_object simultaneamente. Execute o seguinte comando para adicionar uma nova dependência ao projeto:
-
cargo add futures-util
#[tokio::main] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; let get_object_futures = objects_to_download.into_iter().map(|object| { let req = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET); async { let res = req .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("body succeeds").into_bytes(); // Note that we MUST use the async runtime's preferred way // of writing files. Otherwise, this call would block, // potentially causing a deadlock. tokio::fs::write(object, body).await.expect("write succeeds"); } }); futures_util::future::join_all(get_object_futures).await; }
Essa é a maneira mais simples de se beneficiar da simultaneidade, mas também tem alguns problemas que podem não ser óbvios à primeira vista:
-
Criamos todas as entradas de solicitação ao mesmo tempo. Se não houver memória suficiente para armazenar todas as entradas da solicitação
get_object, um erro de alocação “sem memória” será exibido. -
Criamos e aguardamos todos os futuros ao mesmo tempo. O Amazon S3 limita as solicitações de controle de utilização se tentarmos baixar muitos recursos de uma vez.
Para corrigir esses problemas, precisamos limitar a quantidade de solicitações que estamos enviando a qualquer momento. Faremos isso com um semáforotokio:
use std::sync::Arc; use tokio::sync::Semaphore; const CONCURRENCY_LIMIT: usize = 50; #[tokio::main(flavor = "current_thread")] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; let concurrency_semaphore = Arc::new(Semaphore::new(CONCURRENCY_LIMIT)); let get_object_futures = objects_to_download.into_iter().map(|object| { // Since each future needs to acquire a permit, we need to clone // the Arc'd semaphore before passing it in. let semaphore = concurrency_semaphore.clone(); // We also need to clone the client so each task has its own handle. let client = client.clone(); async move { let permit = semaphore .acquire() .await .expect("we'll get a permit if we wait long enough"); let res = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET) .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("body succeeds").into_bytes(); tokio::fs::write(object, body).await.expect("write succeeds"); std::mem::drop(permit); } }); futures_util::future::join_all(get_object_futures).await; }
Corrigimos o possível problema de uso de memória movendo a criação da solicitação para o bloco async. Dessa forma, as solicitações não serão criadas até a hora de serem enviadas.
nota
Se você tiver memória, talvez seja mais eficiente criar todas as entradas de sua solicitação de uma só vez e mantê-las na memória até que estejam prontas para serem enviadas. Para tentar isso, mova a criação de entrada da solicitação para fora do bloco async.
Também corrigimos o problema de enviar muitas solicitações de uma só vez, limitando as solicitações em andamento a CONCURRENCY_LIMIT.
nota
O valor certo para CONCURRENCY_LIMIT é diferente para cada projeto. Ao criar e enviar suas próprias solicitações, tente configurá-las o mais alto possível sem enfrentar erros de controle de utilização. Embora seja possível atualizar dinamicamente o limite de simultaneidade com base na proporção de respostas de controle de utilização bem-sucedidas e limitadas que um serviço envia de volta, isso está fora do escopo deste guia devido à sua complexidade.
Como reescrever o exemplo para ser mais eficiente (simultaneidade multiencadeada)
Nos dois exemplos anteriores, realizamos solicitações simultaneamente. Embora isso seja mais eficiente do que executá-los de forma síncrona, podemos tornar os processos ainda mais eficientes usando multiencadeação. Para fazer isso com tokio, precisaremos gerá-las como tarefas separadas.
nota
Este exemplo exige que você use o runtime multiencadeado tokio. Esse runtime é fechado atrás do recurso rt-multi-thread. Você precisará executar o programa em uma máquina com vários núcleos.
Execute o seguinte comando para adicionar uma nova dependência ao projeto:
-
cargo add tokio --features=rt-multi-thread
// Set this based on the amount of cores your target machine has. const THREADS: usize = 8; #[tokio::main(flavor = "multi_thread")] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; let concurrency_semaphore = Arc::new(Semaphore::new(THREADS)); let get_object_task_handles = objects_to_download.into_iter().map(|object| { // Since each future needs to acquire a permit, we need to clone // the Arc'd semaphore before passing it in. let semaphore = concurrency_semaphore.clone(); // We also need to clone the client so each task has its own handle. let client = client.clone(); // Note this difference! We're using `tokio::task::spawn` to // immediately begin running these requests. tokio::task::spawn(async move { let permit = semaphore .acquire() .await .expect("we'll get a permit if we wait long enough"); let res = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET) .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("body succeeds").into_bytes(); tokio::fs::write(object, body).await.expect("write succeeds"); std::mem::drop(permit); }) }); futures_util::future::join_all(get_object_task_handles).await; }
Dividir o trabalho em tarefas pode ser complexo. Fazer E/S (entrada/saída) normalmente causa um bloqueio. Os runtimes podem ter dificuldade em equilibrar as necessidades de tarefas de longa duração com as de tarefas de curta duração. Seja qual for o runtime escolhido, leia as recomendações deles sobre a maneira mais eficiente de dividir seu trabalho em tarefas. Para obter as recomendações do runtime tokio, consulte Módulo tokio::task
Depurar aplicações com vários segmentos
As tarefas executadas simultaneamente podem ser executadas em qualquer ordem. Dessa forma, os logs de programas simultâneos podem ser muito difíceis de ler. No SDK para Rust, recomendamos usar o sistema de registro em log tracing. Ele pode agrupar logs com suas tarefas específicas, não importa quando estejam em execução. Para obter orientações, consulte Configurar e usar o registro em log no AWS SDK para Rust.
Uma ferramenta muito útil para identificar tarefas bloqueadas é tokio-consoletokio-console, é possível exibir uma visualização ao vivo das tarefas que o programa está executando. Essa exibição inclui informações úteis, como quanto tempo uma tarefa gastou esperando para adquirir recursos compartilhados ou a quantas vezes ela foi pesquisada.