Simultaneidad en AWS SDK para Rust - AWS SDK para Rust

Simultaneidad en AWS SDK para Rust

AWS SDK para Rust no proporciona control de simultaneidad, pero los usuarios tienen muchas opciones para implementar el suyo propio.

Términos

Los términos relacionados con este tema son fáciles de confundir y algunos términos se han convertido en sinónimos a pesar de que originalmente representaban conceptos distintos. En esta guía, se define lo siguiente:

  • Tarea: «unidad de trabajo» que el programa ejecutará hasta su finalización o intentará ejecutarla hasta completarla.

  • Computación secuencial: cuando se ejecutan varias tareas una tras otra.

  • Computación simultánea: cuando se ejecutan varias tareas en períodos de tiempo superpuestos.

  • Simultaneidad: capacidad de un equipo para completar varias tareas en un orden arbitrario.

  • Multitarea: capacidad de un equipo para ejecutar varias tareas al mismo tiempo.

  • Condición de carrera: cuando el comportamiento del programa cambia en función del momento en que se inicia una tarea o del tiempo que tarda en procesarla.

  • Contención: conflicto por el acceso a un recurso compartido. Cuando dos o más tareas desean acceder a un recurso al mismo tiempo, ese recurso está «en contención».

  • Interbloqueo: estado en el que no se puede avanzar más. Suele suceder porque dos tareas desean adquirir los recursos de la otra, pero ninguna de ellas liberará sus recursos hasta que el recurso de la otra esté disponible. Los interbloqueos hacen que un programa deje de responder de manera parcial o total.

Ejemplo sencillo

Nuestro primer ejemplo es un programa secuencial. En ejemplos posteriores, cambiaremos este código mediante técnicas de simultaneidad. Los ejemplos posteriores reutilizan el mismo método build_client_and_list_objects_to_download() y hacen cambios en main(). Ejecute el siguiente comando para agregar dependencias al proyecto:

  • cargo add aws-sdk-s3

  • cargo add aws-config tokio --features tokio/full

El siguiente ejemplo de tarea consiste en descargar todos los archivos de un bucket de Amazon Simple Storage Service:

  1. Comience por enumerar todos los archivos. Guarde las claves en una lista.

  2. Repase la lista y descargue cada archivo uno por uno.

use aws_sdk_s3::{Client, Error}; const EXAMPLE_BUCKET: &str = "amzn-s3-demo-bucket"; // Update to name of bucket you own. // This initialization function won't be reproduced in // examples following this one, in order to save space. async fn build_client_and_list_objects_to_download() -> (Client, Vec<String>) { let cfg = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; let client = Client::new(&cfg); let objects_to_download: Vec<_> = client .list_objects_v2() .bucket(EXAMPLE_BUCKET) .send() .await .expect("listing objects succeeds") .contents() .into_iter() .flat_map(aws_sdk_s3::types::Object::key) .map(ToString::to_string) .collect(); (client, objects_to_download) }
#[tokio::main] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; for object in objects_to_download { let res = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET) .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("reading body succeeds").into_bytes(); std::fs::write(object, body).expect("write succeeds"); } }
nota

En estos ejemplos, no trataremos los errores y daremos por sentado que el bucket de ejemplo no tiene objetos con claves que parezcan rutas de archivos. Por lo tanto, no abordaremos la creación de directorios anidados.

Debido a la arquitectura de los equipos modernos, podemos reescribir este programa para que sea mucho más eficiente. Lo haremos en un ejemplo posterior, pero primero, veamos algunos conceptos más.

Propiedad y mutabilidad

Cada valor en Rust tiene un único propietario. Cuando un propietario queda fuera del ámbito de aplicación, también se eliminarán todos los valores que posea. El propietario puede proporcionar una o varias referencias inmutables a un valor o a una única referencia mutable. El compilador de Rust se encarga de garantizar que ninguna referencia sobreviva a su propietario.

Se necesita una planificación y un diseño adicionales cuando varias tareas necesitan acceder de forma mutable al mismo recurso. En la computación secuencial, cada tarea puede acceder de forma mutable al mismo recurso sin problemas, ya que se ejecutan una tras otra en una secuencia. Sin embargo, en la computación simultánea, las tareas se pueden ejecutar en cualquier orden y al mismo tiempo. Por lo tanto, debemos hacer más para demostrarle al compilador que es imposible hacer múltiples referencias mutables (o, al menos, que se produzca un fallo si se dan).

La biblioteca estándar de Rust proporciona muchas herramientas para ayudarnos a lograrlo. Para obtener más información sobre estos temas, consulte Variables y mutabilidad y Descripción de la propiedad en el libro Lenguaje de programación Rust.

¡Más términos!

A continuación se enumeran los «objetos de sincronización». En conjunto, son las herramientas necesarias para convencer al compilador de que nuestro programa simultáneo no infringirá las reglas de propiedad.

Objetos de sincronización de la biblioteca estándar:

  • Arco: un puntero con recuento de referencias atómico. Cuando los datos están encapsulados en un Arc, se pueden compartir libremente, sin preocuparse de que ningún propietario específico elimine el valor antes de tiempo. En este sentido, la propiedad del valor pasa a ser «compartida». Los valores dentro de un Arc no pueden ser mutables, pero pueden tener mutabilidad interior.

  • Barrera: garantiza que varios subprocesos esperen a que los demás lleguen a un punto del programa antes de continuar con la ejecución conjunta.

  • Condvar: variable de condición que permite bloquear un subproceso mientras se espera a que ocurra un evento.

  • Mutex: mecanismo de exclusión mutua que garantiza que, como máximo, un subproceso pueda acceder a algunos datos. En términos generales, un bloqueo Mutex unca debe mantenerse en un punto .await del código.

Objetos de sincronización Tokio:

Si bien los SDK de AWS están diseñados para ser async independientes del tiempo de ejecución, recomendamos el uso de objetos de sincronización tokio para casos específicos.

  • Mutex: similar al Mutex de la biblioteca estándar, pero con un coste ligeramente superior. A diferencia del Mutex estándar, este se puede mantener en un punto .await del código.

  • Sempahore: variable que se utiliza para controlar el acceso a un recurso común por parte de varias tareas.

Reescritura de nuestro ejemplo para que sea más eficiente (simultaneidad de un solo subproceso)

En el siguiente ejemplo modificado, utilizamos futures_util::future::join_all para ejecutar TODAS las solicitudes get_object de forma simultánea. Ejecute el siguiente comando para agregar una nueva dependencia al proyecto.

  • cargo add futures-util

#[tokio::main] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; let get_object_futures = objects_to_download.into_iter().map(|object| { let req = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET); async { let res = req .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("body succeeds").into_bytes(); // Note that we MUST use the async runtime's preferred way // of writing files. Otherwise, this call would block, // potentially causing a deadlock. tokio::fs::write(object, body).await.expect("write succeeds"); } }); futures_util::future::join_all(get_object_futures).await; }

Esta es la forma más sencilla de beneficiarse de la simultaneidad, pero también presenta algunos problemas que pueden no resultar evidentes a primera vista:

  1. Creamos todas las entradas de solicitud al mismo tiempo. Si no tenemos suficiente memoria para almacenar todas las entradas de la solicitud get_object, se producirá un error de asignación por «falta de memoria».

  2. Al mismo tiempo, creamos y esperamos todos los futuros. Amazon S3 limita las solicitudes si intentamos descargar demasiadas de una sola vez.

Para solucionar estos dos problemas, debemos limitar la cantidad de solicitudes que enviamos en un momento dado. Lo haremos con un semáforo tokio:

use std::sync::Arc; use tokio::sync::Semaphore; const CONCURRENCY_LIMIT: usize = 50; #[tokio::main(flavor = "current_thread")] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; let concurrency_semaphore = Arc::new(Semaphore::new(CONCURRENCY_LIMIT)); let get_object_futures = objects_to_download.into_iter().map(|object| { // Since each future needs to acquire a permit, we need to clone // the Arc'd semaphore before passing it in. let semaphore = concurrency_semaphore.clone(); // We also need to clone the client so each task has its own handle. let client = client.clone(); async move { let permit = semaphore .acquire() .await .expect("we'll get a permit if we wait long enough"); let res = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET) .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("body succeeds").into_bytes(); tokio::fs::write(object, body).await.expect("write succeeds"); std::mem::drop(permit); } }); futures_util::future::join_all(get_object_futures).await; }

Hemos solucionado el posible problema de uso de la memoria trasladando la creación de solicitudes al bloque async. De esta forma, las solicitudes no se crearán hasta que llegue el momento de enviarlas.

nota

Si dispone de memoria suficiente, puede que sea más eficiente crear todas las entradas de solicitud a la vez y mantenerlas en la memoria hasta que estén listas para enviarse. Para intentarlo, traslade la creación de entradas de solicitudes fuera del bloque async.

También hemos solucionado el problema del envío de demasiadas solicitudes a la vez limitando las solicitudes en tránsito a CONCURRENCY_LIMIT.

nota

El valor correcto de CONCURRENCY_LIMIT es diferente para cada proyecto. Al crear y enviar sus propias solicitudes, intente establecerlo lo más alto que pueda sin que se produzcan errores de limitación. Si bien es posible actualizar de forma dinámica el límite de simultaneidad en función de la proporción entre las respuestas correctas y las limitadas que devuelve un servicio, esto queda fuera del ámbito de esta guía debido a su complejidad.

Reescritura de nuestro ejemplo para que sea más eficiente (simultaneidad multiproceso)

En los dos ejemplos anteriores, realizamos nuestras solicitudes de forma simultánea. Aunque esto es más eficiente que ejecutarlas de forma sincrónica, podemos hacer que sean aún más eficientes mediante el uso de varios subprocesos. Para hacerlo con tokio, tendremos que generarlos como tareas independientes.

nota

Este ejemplo requiere que utilice el tiempo de ejecución multiproceso de tokio. Este tiempo de ejecución está protegido por la función rt-multi-thread. Y, por supuesto, tendrás que ejecutar su programa en una máquina de varios núcleos.

Ejecute el siguiente comando para agregar una nueva dependencia al proyecto.

  • cargo add tokio --features=rt-multi-thread

// Set this based on the amount of cores your target machine has. const THREADS: usize = 8; #[tokio::main(flavor = "multi_thread")] async fn main() { let (client, objects_to_download) = build_client_and_list_objects_to_download().await; let concurrency_semaphore = Arc::new(Semaphore::new(THREADS)); let get_object_task_handles = objects_to_download.into_iter().map(|object| { // Since each future needs to acquire a permit, we need to clone // the Arc'd semaphore before passing it in. let semaphore = concurrency_semaphore.clone(); // We also need to clone the client so each task has its own handle. let client = client.clone(); // Note this difference! We're using `tokio::task::spawn` to // immediately begin running these requests. tokio::task::spawn(async move { let permit = semaphore .acquire() .await .expect("we'll get a permit if we wait long enough"); let res = client .get_object() .key(&object) .bucket(EXAMPLE_BUCKET) .send() .await .expect("get_object succeeds"); let body = res.body.collect().await.expect("body succeeds").into_bytes(); tokio::fs::write(object, body).await.expect("write succeeds"); std::mem::drop(permit); }) }); futures_util::future::join_all(get_object_task_handles).await; }

La división del trabajo en tareas puede resultar compleja. Las operaciones de E/S (entrada/salida) suelen bloquearse. Los tiempos de ejecución pueden tener dificultades para equilibrar las necesidades de las tareas de larga duración con las de las tareas de corta duración. Sea cual sea el tiempo de ejecución que elija, asegúrese de leer sus recomendaciones para encontrar la forma más eficiente de dividir el trabajo en tareas. Para ver las recomendaciones sobre el tiempo de ejecución tokio, consulte Módulo tokio::task.

Depuración de aplicaciones con varios subprocesos

Las tareas que se ejecutan de forma simultánea se pueden ejecutar en cualquier orden. Por lo tanto, los registros de los programas simultáneos pueden resultar muy difíciles de leer. En el SDK de Rust, recomendamos utilizar el sistema de registro tracing. Puede agrupar los registros con sus tareas específicas, sin importar cuándo se ejecuten. Para obtener instrucciones, consulte Configuración y uso del registro en AWS SDK de Rust.

Una herramienta muy útil para identificar las tareas que se han bloqueado es tokio-console, una herramienta de diagnóstico y depuración de programas asincrónicos de Rust. Al instrumentar y ejecutar el programa, y luego ejecutar la aplicación tokio-console, puede ver una vista en tiempo real de las tareas que está ejecutando el programa. Esta vista incluye información útil, como la cantidad de tiempo que una tarea ha dedicado a esperar para adquirir recursos compartidos o la cantidad de veces que se ha sondeado.