AWS SDK for C++

AWS SDK for C++ Version 1.11.745

Loading...
Searching...
No Matches
TransferHandle.h
1
6#pragma once
7
8#include <aws/transfer/Transfer_EXPORTS.h>
9#include <aws/core/utils/memory/stl/AWSString.h>
10#include <aws/core/utils/memory/stl/AWSSet.h>
11#include <aws/core/utils/memory/stl/AWSMap.h>
12#include <aws/core/utils/UUID.h>
13#include <aws/core/client/AWSError.h>
14#include <aws/core/client/AsyncCallerContext.h>
15#include <aws/s3/S3Errors.h>
16#include <aws/s3/model/ChecksumAlgorithm.h>
17#include <iostream>
18#include <atomic>
19#include <mutex>
20#include <condition_variable>
21
22namespace Aws
23{
24 namespace Utils
25 {
26 template < typename T > class Array;
27 }
28
29 namespace Transfer
30 {
31 class TransferHandle;
32
33 typedef std::function<Aws::IOStream*(void)> CreateDownloadStreamCallback;
34
35 static const char CLASS_TAG[] = "TransferManager";
36
38 {
40 versionId("")
41 {}
42
44
45 // TBI: controls for in-memory parts vs. resumable file-based parts with state serialization to/from file
46 };
47
48 class AWS_TRANSFER_API PartState
49 {
50 public:
52 PartState(int partId, uint64_t bestProgressInBytes, uint64_t sizeInBytes, bool lastPart = false);
53
54 int GetPartId() const { return m_partId; }
55
56 uint64_t GetBestProgressInBytes() const { return m_bestProgressInBytes; }
57 void SetBestProgressInBytes(uint64_t progressInBytes) { m_bestProgressInBytes = progressInBytes; }
58
59 uint64_t GetSizeInBytes() const { return m_sizeInBytes; }
60 void SetSizeInBytes(uint64_t sizeInBytes) { m_sizeInBytes = sizeInBytes; }
61
62 void Reset();
63
64 void OnDataTransferred(uint64_t amount, const std::shared_ptr<TransferHandle> &transferHandle);
65
66 void SetETag(const Aws::String& eTag) { m_eTag = eTag; }
67 const Aws::String& GetETag() const { return m_eTag; }
68
69 Aws::IOStream *GetDownloadPartStream() const { return m_downloadPartStream; }
70 void SetDownloadPartStream(Aws::IOStream *downloadPartStream) { m_downloadPartStream = downloadPartStream; }
71
72 unsigned char* GetDownloadBuffer() const { return m_downloadBuffer; }
73 void SetDownloadBuffer(unsigned char* downloadBuffer) { m_downloadBuffer = downloadBuffer; }
74
75 void SetRangeBegin(uint64_t rangeBegin) { m_rangeBegin = rangeBegin; }
76 uint64_t GetRangeBegin() const { return m_rangeBegin; }
77
78 bool IsLastPart() { return m_lastPart; }
79 void SetLastPart() { m_lastPart = true; }
80
81 Aws::String GetChecksum() const { return m_checksum; };
82 void SetChecksum(const Aws::String& checksum) { m_checksum = checksum; }
83 private:
84
85 int m_partId = 0;
86
87 Aws::String m_eTag;
88 uint64_t m_currentProgressInBytes = 0;
89 uint64_t m_bestProgressInBytes = 0;
90 uint64_t m_sizeInBytes = 0;
91 uint64_t m_rangeBegin = 0;
92
93 std::atomic<Aws::IOStream *> m_downloadPartStream;
94 std::atomic<unsigned char*> m_downloadBuffer;
95 bool m_lastPart = false;
96 Aws::String m_checksum;
97 };
98
99 using PartPointer = std::shared_ptr< PartState >;
101
102 enum class TransferStatus
103 {
104 //this value is only used for directory synchronization
106 //Operation is still queued and has not begun processing
108 //Operation is now running
110 //Operation was canceled. A Canceled operation can still be retried
111 CANCELED,
112 //Operation failed, A failed operation can still be retried.
113 FAILED,
114 //Operation was successful
115 COMPLETED,
116 //Operation either failed or was canceled and a user deleted the multipart upload from S3.
117 ABORTED
118 };
119
121 {
122 UPLOAD,
124 };
125
133 class AWS_TRANSFER_API TransferHandle
134 {
135 public:
139 TransferHandle(const Aws::String& bucketName, const Aws::String& keyName, uint64_t totalSize, const Aws::String& targetFilePath = "");
140
144 TransferHandle(const Aws::String& bucketName, const Aws::String& keyName, const Aws::String& targetFilePath = "");
145
149 TransferHandle(const Aws::String& bucketName, const Aws::String& keyName, CreateDownloadStreamCallback createDownloadStreamFn, const Aws::String& targetFilePath = "");
150
154 TransferHandle(const Aws::String& bucketName, const Aws::String& keyName,
155 const uint64_t fileOffset, const uint64_t downloadBytes,
156 CreateDownloadStreamCallback createDownloadStreamFn, const Aws::String& targetFilePath = "");
157
158
160
164 inline bool IsMultipart() const { return m_isMultipart.load(); }
168 inline void SetIsMultipart(bool value) { m_isMultipart.store(value); }
172 inline const Aws::String GetMultiPartId() const { std::lock_guard<std::mutex> locker(m_getterSetterLock); return m_multipartId; }
176 inline void SetMultipartId(const Aws::String& value) { std::lock_guard<std::mutex> locker(m_getterSetterLock); m_multipartId = value; }
184 void ChangePartToCompleted(const PartPointer& partState, const Aws::String &eTag);
192 bool HasPendingParts() const;
196 void AddPendingPart(const PartPointer& partState);
204 bool HasQueuedParts() const;
208 void AddQueuedPart(const PartPointer& partState);
216 bool HasFailedParts() const;
220 void ChangePartToFailed(const PartPointer& partState);
224 void GetAllPartsTransactional(PartStateMap& queuedParts, PartStateMap& pendingParts,
225 PartStateMap& failedParts, PartStateMap& completedParts);
229 bool HasParts() const;
233 bool ShouldContinue() const;
238 void Cancel();
239
243 void Restart();
254 inline uint64_t GetBytesTransferred() const { return m_bytesTransferred.load(); }
258 void UpdateBytesTransferred(uint64_t amount) { m_bytesTransferred.fetch_add(amount); }
259
263 inline uint64_t GetBytesOffset() const { return m_offset; }
267 inline uint64_t GetBytesTotalSize() const { return m_bytesTotalSize.load(); }
271 inline void SetBytesTotalSize(uint64_t value) { m_bytesTotalSize.store(value); }
272
279 inline uint64_t GetBytesAvailableFromStart() const { return m_bytesAvailableFromStart.load(std::memory_order_relaxed); }
280
284 inline const Aws::String& GetBucketName() const { return m_bucket; }
288 inline const Aws::String& GetKey() const { return m_key; }
293 inline const Aws::String& GetTargetFilePath() const { return m_fileName; }
294
298 const Aws::String GetVersionId() const { std::lock_guard<std::mutex> locker(m_getterSetterLock); return m_versionId; }
299 void SetVersionId(const Aws::String& versionId) { std::lock_guard<std::mutex> locker(m_getterSetterLock); m_versionId = versionId; }
300
304 const Aws::String GetEtag() const { std::lock_guard<std::mutex> locker(m_getterSetterLock); return m_etag; }
305 void SetEtag(const Aws::String& etag) { std::lock_guard<std::mutex> locker(m_getterSetterLock); m_etag = etag; }
306
310 inline TransferDirection GetTransferDirection() const { return m_direction; }
314 inline const Aws::String GetContentType() const { std::lock_guard<std::mutex> locker(m_getterSetterLock); return m_contentType; }
318 inline void SetContentType(const Aws::String& value) { std::lock_guard<std::mutex> locker(m_getterSetterLock); m_contentType = value; }
323 inline const Aws::Map<Aws::String, Aws::String> GetMetadata() const { std::lock_guard<std::mutex> locker(m_getterSetterLock); return m_metadata; }
328 inline void SetMetadata(const Aws::Map<Aws::String, Aws::String>& value) { std::lock_guard<std::mutex> locker(m_getterSetterLock); m_metadata = value; }
329
333 inline void AddMetadataEntry(const Aws::String& key, const Aws::String& value) { std::lock_guard<std::mutex> locker(m_getterSetterLock); m_metadata[key] = value; }
334
338 inline void SetContext(const std::shared_ptr<const Aws::Client::AsyncCallerContext>& context) { std::lock_guard<std::mutex> locker(m_getterSetterLock); m_context = context; }
339
343 inline std::shared_ptr<const Aws::Client::AsyncCallerContext> GetContext() const { std::lock_guard<std::mutex> locker(m_getterSetterLock); return m_context; }
344
353
358 inline const Aws::Client::AWSError<Aws::S3::S3Errors> GetLastError() const { std::lock_guard<std::mutex> locker(m_getterSetterLock); return m_lastError; }
363 inline void SetError(const Aws::Client::AWSError<Aws::S3::S3Errors>& error) { std::lock_guard<std::mutex> locker(m_getterSetterLock); m_lastError = error; }
367 void WaitUntilFinished() const;
368
369 const CreateDownloadStreamCallback& GetCreateDownloadStreamFunction() const { return m_createDownloadStreamFn; }
370
375 Aws::String WritePartToDownloadStream(Aws::IOStream* partStream, uint64_t writeOffset);
376 void AddChecksumForPart(Aws:: IOStream* partStream, const PartPointer& shared);
377
379
381 {
382 bool expected = false;
383 return m_lastPart.compare_exchange_strong(expected, true/*desired*/);
384 }
385
386 /*
387 * Returns a unique identifier tied to this particular transfer handle.
388 */
390
391 Aws::String GetChecksum() const { return m_checksum; }
392 void SetChecksum(const Aws::String& checksum) { this->m_checksum = checksum; }
393
394 Aws::S3::Model::ChecksumAlgorithm GetChecksumAlgorithm() const { std::lock_guard<std::mutex> locker(m_getterSetterLock); return m_checksumAlgorithm; }
395 void SetChecksumAlgorithm (const Aws::S3::Model::ChecksumAlgorithm& checksumAlgorithm) { std::lock_guard<std::mutex> locker(m_getterSetterLock); m_checksumAlgorithm = checksumAlgorithm; }
396
397 private:
398 void CleanupDownloadStream();
399
400 std::atomic<bool> m_isMultipart;
401 Aws::String m_multipartId;
402 TransferDirection m_direction;
403 PartStateMap m_completedParts;
404 PartStateMap m_pendingParts;
405 PartStateMap m_queuedParts;
406 PartStateMap m_failedParts;
407 std::atomic<uint64_t> m_bytesTransferred;
408 std::atomic<bool> m_lastPart;
409 std::atomic<uint64_t> m_bytesTotalSize;
410 std::atomic<uint64_t> m_bytesAvailableFromStart;
411 /* The next part number to watch, that is able to grow m_bytesAvailableFromStart. */
412 uint32_t m_nextPartToWatch;
413 uint64_t m_offset;
414 Aws::String m_bucket;
415 Aws::String m_key;
416 Aws::String m_fileName;
417 Aws::String m_contentType;
418 Aws::String m_versionId;
419 Aws::String m_etag;
421 TransferStatus m_status;
423 std::atomic<bool> m_cancel;
424 std::shared_ptr<const Aws::Client::AsyncCallerContext> m_context;
425 const Utils::UUID m_handleId;
426
427 CreateDownloadStreamCallback m_createDownloadStreamFn;
428 Aws::IOStream* m_downloadStream;
429 /* in case customer stream is not based off 0 */
430 uint64_t m_downloadStreamBaseOffset;
431
432 mutable std::mutex m_downloadStreamLock;
433 mutable std::mutex m_partsLock;
434 mutable std::mutex m_statusLock;
435 mutable std::condition_variable m_waitUntilFinishedSignal;
436 mutable std::mutex m_getterSetterLock;
437 Aws::String m_checksum;
438 Aws::S3::Model::ChecksumAlgorithm m_checksumAlgorithm;
439 };
440
442 }
443}
const Aws::String & GetETag() const
void OnDataTransferred(uint64_t amount, const std::shared_ptr< TransferHandle > &transferHandle)
uint64_t GetBestProgressInBytes() const
void SetDownloadPartStream(Aws::IOStream *downloadPartStream)
unsigned char * GetDownloadBuffer() const
void SetDownloadBuffer(unsigned char *downloadBuffer)
PartState(int partId, uint64_t bestProgressInBytes, uint64_t sizeInBytes, bool lastPart=false)
void SetETag(const Aws::String &eTag)
uint64_t GetRangeBegin() const
void SetRangeBegin(uint64_t rangeBegin)
Aws::String GetChecksum() const
uint64_t GetSizeInBytes() const
void SetSizeInBytes(uint64_t sizeInBytes)
void SetBestProgressInBytes(uint64_t progressInBytes)
void SetChecksum(const Aws::String &checksum)
Aws::IOStream * GetDownloadPartStream() const
const Aws::String & GetKey() const
const Aws::String GetVersionId() const
uint64_t GetBytesAvailableFromStart() const
std::shared_ptr< const Aws::Client::AsyncCallerContext > GetContext() const
void UpdateStatus(TransferStatus value)
PartStateMap GetPendingParts() const
const Aws::String GetContentType() const
TransferStatus GetStatus() const
PartStateMap GetFailedParts() const
void AddQueuedPart(const PartPointer &partState)
PartStateMap GetCompletedParts() const
const CreateDownloadStreamCallback & GetCreateDownloadStreamFunction() const
TransferHandle(const Aws::String &bucketName, const Aws::String &keyName, const uint64_t fileOffset, const uint64_t downloadBytes, CreateDownloadStreamCallback createDownloadStreamFn, const Aws::String &targetFilePath="")
TransferHandle(const Aws::String &bucketName, const Aws::String &keyName, const Aws::String &targetFilePath="")
void UpdateBytesTransferred(uint64_t amount)
void SetChecksumAlgorithm(const Aws::S3::Model::ChecksumAlgorithm &checksumAlgorithm)
void ChangePartToCompleted(const PartPointer &partState, const Aws::String &eTag)
void SetBytesTotalSize(uint64_t value)
const Aws::String & GetTargetFilePath() const
void SetMetadata(const Aws::Map< Aws::String, Aws::String > &value)
PartStateMap GetQueuedParts() const
Aws::String GetId() const
Aws::S3::Model::ChecksumAlgorithm GetChecksumAlgorithm() const
const Aws::String GetEtag() const
const Aws::String GetMultiPartId() const
void SetMultipartId(const Aws::String &value)
TransferHandle(const Aws::String &bucketName, const Aws::String &keyName, uint64_t totalSize, const Aws::String &targetFilePath="")
void SetContext(const std::shared_ptr< const Aws::Client::AsyncCallerContext > &context)
void AddMetadataEntry(const Aws::String &key, const Aws::String &value)
void AddChecksumForPart(Aws::IOStream *partStream, const PartPointer &shared)
TransferDirection GetTransferDirection() const
Aws::String WritePartToDownloadStream(Aws::IOStream *partStream, uint64_t writeOffset)
const Aws::Map< Aws::String, Aws::String > GetMetadata() const
void AddPendingPart(const PartPointer &partState)
uint64_t GetBytesTransferred() const
void SetEtag(const Aws::String &etag)
Aws::String GetChecksum() const
const Aws::Client::AWSError< Aws::S3::S3Errors > GetLastError() const
void SetError(const Aws::Client::AWSError< Aws::S3::S3Errors > &error)
TransferHandle(const Aws::String &bucketName, const Aws::String &keyName, CreateDownloadStreamCallback createDownloadStreamFn, const Aws::String &targetFilePath="")
void SetChecksum(const Aws::String &checksum)
void ApplyDownloadConfiguration(const DownloadConfiguration &downloadConfig)
void GetAllPartsTransactional(PartStateMap &queuedParts, PartStateMap &pendingParts, PartStateMap &failedParts, PartStateMap &completedParts)
const Aws::String & GetBucketName() const
void SetVersionId(const Aws::String &versionId)
void SetContentType(const Aws::String &value)
void ChangePartToFailed(const PartPointer &partState)
Aws::Map< int, PartPointer > PartStateMap
static const char CLASS_TAG[]
AWS_TRANSFER_API Aws::OStream & operator<<(Aws::OStream &s, TransferStatus status)
std::shared_ptr< PartState > PartPointer
std::function< Aws::IOStream *(void)> CreateDownloadStreamCallback
std::map< K, V, std::less< K >, Aws::Allocator< std::pair< const K, V > > > Map
std::basic_iostream< char, std::char_traits< char > > IOStream
std::array< T, N > Array
std::basic_string< char, std::char_traits< char >, Aws::Allocator< char > > String
std::basic_ostream< char, std::char_traits< char > > OStream