Carga de flujos en Amazon S3 mediante AWS SDK for Java 2.x - AWS SDK for Java 2.x

Carga de flujos en Amazon S3 mediante AWS SDK for Java 2.x

Cuando se utiliza un flujo para cargar contenido en S3 utilizando putObject o uploadPart, se utiliza una clase de fábrica RequestBody para que la API sincrónica suministre el flujo. Para la API asincrónica, AsyncRequestBody es la clase de fábrica equivalente.

¿Qué métodos cargan flujos?

En la API sincrónica, puede usar los siguientes métodos de fábrica de RequestBody para suministrar el flujo:

  • fromInputStream(InputStream inputStream, long contentLength)

    fromContentProvider(ContentStreamProvider provider, long contentLength, String mimeType)

    • El ContentStreamProvider tiene el método de fábrica fromInputStream(InputStream inputStream)

  • fromContentProvider(ContentStreamProvider provider, String mimeType)

En la API asincrónica, puede usar los siguientes métodos de fábrica de AsyncRequestBody.

  • fromInputStream(InputStream inputStream, Long contentLength, ExecutorService executor)

  • fromInputStream(AsyncRequestBodyFromInputStreamConfiguration configuration)

    • Se utiliza AsyncRequestBodyFromInputStreamConfiguration.Builder para suministrar el flujo

  • fromInputStream(Consumer<AsyncRequestBodyFromInputStreamConfiguration.Builder> configuration)

  • forBlockingInputStream(Long contentLength)

Realización de la carga

Si conoce la longitud del flujo

Como puede ver en la firma de los métodos mostrados anteriormente, la mayoría de los métodos aceptan un parámetro de longitud del contenido.

Si conoce la longitud del contenido en bytes, proporcione el valor exacto:

// Always provide the exact content length when it's available. long contentLength = 1024; // Exact size in bytes. s3Client.putObject(req -> req .bucket("amzn-s3-demo-bucket") .key("my-key"), RequestBody.fromInputStream(inputStream, contentLength));
aviso

Al cargar desde un flujo de entrada, si la longitud del contenido especificada no coincide con el recuento real de bytes, es posible que se produzca lo siguiente:

  • Objetos truncados si la longitud especificada es demasiado pequeña

  • Errores de carga o conexiones bloqueadas si la longitud especificada es demasiado grande

Si no conoce la longitud del flujo

Uso de la API sincrónica

Utilice la fromContentProvider(ContentStreamProvider provider, String mimeType):

public PutObjectResponse syncClient_stream_unknown_size(String bucketName, String key, InputStream inputStream) { S3Client s3Client = S3Client.create(); RequestBody body = RequestBody.fromContentProvider(ContentStreamProvider.fromInputStream(inputStream), "text/plain"); PutObjectResponse putObjectResponse = s3Client.putObject(b -> b.bucket(BUCKET_NAME).key(KEY_NAME), body); return putObjectResponse; }

Dado que el SDK almacena en búfer de memoria todo el flujo para calcular la longitud del contenido, pueden producirse problemas de memoria con flujos grandes. Si necesita cargar flujos de gran tamaño con el cliente sincrónico, considere usar la API multiparte:

public static void uploadStreamToS3(String bucketName, String key, InputStream inputStream) { // Create S3 client S3Client s3Client = S3Client.create(); try { // Step 1: Initiate the multipart upload CreateMultipartUploadRequest createMultipartUploadRequest = CreateMultipartUploadRequest.builder() .bucket(bucketName) .key(key) .build(); CreateMultipartUploadResponse createResponse = s3Client.createMultipartUpload(createMultipartUploadRequest); String uploadId = createResponse.uploadId(); System.out.println("Started multipart upload with ID: " + uploadId); // Step 2: Upload parts List<CompletedPart> completedParts = new ArrayList<>(); int partNumber = 1; byte[] buffer = new byte[PART_SIZE]; int bytesRead; try { while ((bytesRead = readFullyOrToEnd(inputStream, buffer)) > 0) { // Create request to upload a part UploadPartRequest uploadPartRequest = UploadPartRequest.builder() .bucket(bucketName) .key(key) .uploadId(uploadId) .partNumber(partNumber) .build(); // If we didn't read a full buffer, create a properly sized byte array RequestBody requestBody; if (bytesRead < PART_SIZE) { byte[] lastPartBuffer = new byte[bytesRead]; System.arraycopy(buffer, 0, lastPartBuffer, 0, bytesRead); requestBody = RequestBody.fromBytes(lastPartBuffer); } else { requestBody = RequestBody.fromBytes(buffer); } // Upload the part and save the response's ETag UploadPartResponse uploadPartResponse = s3Client.uploadPart(uploadPartRequest, requestBody); CompletedPart part = CompletedPart.builder() .partNumber(partNumber) .eTag(uploadPartResponse.eTag()) .build(); completedParts.add(part); System.out.println("Uploaded part " + partNumber + " with size " + bytesRead + " bytes"); partNumber++; } // Step 3: Complete the multipart upload CompletedMultipartUpload completedMultipartUpload = CompletedMultipartUpload.builder() .parts(completedParts) .build(); CompleteMultipartUploadRequest completeRequest = CompleteMultipartUploadRequest.builder() .bucket(bucketName) .key(key) .uploadId(uploadId) .multipartUpload(completedMultipartUpload) .build(); CompleteMultipartUploadResponse completeResponse = s3Client.completeMultipartUpload(completeRequest); System.out.println("Multipart upload completed. Object URL: " + completeResponse.location()); } catch (Exception e) { // If an error occurs, abort the multipart upload System.err.println("Error during multipart upload: " + e.getMessage()); AbortMultipartUploadRequest abortRequest = AbortMultipartUploadRequest.builder() .bucket(bucketName) .key(key) .uploadId(uploadId) .build(); s3Client.abortMultipartUpload(abortRequest); System.err.println("Multipart upload aborted"); } finally { try { inputStream.close(); } catch (IOException e) { System.err.println("Error closing input stream: " + e.getMessage()); } } } finally { s3Client.close(); } } /** * Reads from the input stream into the buffer, attempting to fill the buffer completely * or until the end of the stream is reached. * * @param inputStream the input stream to read from * @param buffer the buffer to fill * @return the number of bytes read, or -1 if the end of the stream is reached before any bytes are read * @throws IOException if an I/O error occurs */ private static int readFullyOrToEnd(InputStream inputStream, byte[] buffer) throws IOException { int totalBytesRead = 0; int bytesRead; while (totalBytesRead < buffer.length) { bytesRead = inputStream.read(buffer, totalBytesRead, buffer.length - totalBytesRead); if (bytesRead == -1) { break; // End of stream } totalBytesRead += bytesRead; } return totalBytesRead > 0 ? totalBytesRead : -1; }
nota

Para la mayoría de los casos de uso, recomendamos utilizar la API de cliente asincrónica para los flujos de tamaño desconocido. Este método permite transferencias paralelas y ofrece una interfaz de programación más sencilla, ya que el SDK controla la segmentación de flujos en fragmentos multiparte si el flujo es grande.

Tanto el cliente asincrónico estándar de S3 con mutiparte habilitada como el cliente de S3 basado en AWS CRT implementan este método. Mostramos ejemplos de este método en la siguiente sección.

Uso de la API asincrónica

Puede proporcionar null para el argumento contentLength a fromInputStream(InputStream inputStream, Long contentLength, ExecutorService executor)

ejemplo utilizando el cliente asincrónico basado en AWS CRT:
public PutObjectResponse crtClient_stream_unknown_size(String bucketName, String key, InputStream inputStream) { S3AsyncClient s3AsyncClient = S3AsyncClient.crtCreate(); ExecutorService executor = Executors.newSingleThreadExecutor(); AsyncRequestBody body = AsyncRequestBody.fromInputStream(inputStream, null, executor); // 'null' indicates that the // content length is unknown. CompletableFuture<PutObjectResponse> responseFuture = s3AsyncClient.putObject(r -> r.bucket(bucketName).key(key), body) .exceptionally(e -> { if (e != null){ logger.error(e.getMessage(), e); } return null; }); PutObjectResponse response = responseFuture.join(); // Wait for the response. executor.shutdown(); return response; }
ejemplo utilizando el cliente asincrónico estándar con multiparte habilitada:
public PutObjectResponse asyncClient_multipart_stream_unknown_size(String bucketName, String key, InputStream inputStream) { S3AsyncClient s3AsyncClient = S3AsyncClient.builder().multipartEnabled(true).build(); ExecutorService executor = Executors.newSingleThreadExecutor(); AsyncRequestBody body = AsyncRequestBody.fromInputStream(inputStream, null, executor); // 'null' indicates that the // content length is unknown. CompletableFuture<PutObjectResponse> responseFuture = s3AsyncClient.putObject(r -> r.bucket(bucketName).key(key), body) .exceptionally(e -> { if (e != null) { logger.error(e.getMessage(), e); } return null; }); PutObjectResponse response = responseFuture.join(); // Wait for the response. executor.shutdown(); return response; }