From 9d9f1e6f4c590eb159e401e6c4dcdedea217b2a2 Mon Sep 17 00:00:00 2001 From: Victor Vazquez Date: Tue, 5 Jan 2021 18:38:06 -0800 Subject: [PATCH] remove cancellation check from retry policies (#1196) * remove cancellation check from retry policies * Update BodyStream to do cancellation check on Read --- .../inc/azure/core/http/body_stream.hpp | 48 +++++++++++++------ .../core/http/winhttp/win_http_client.hpp | 20 ++++---- sdk/core/azure-core/src/http/body_stream.cpp | 17 +++---- sdk/core/azure-core/src/http/curl/curl.cpp | 4 +- .../src/http/curl/curl_session_private.hpp | 20 ++++---- sdk/core/azure-core/src/http/retry_policy.cpp | 3 +- .../azure-core/src/http/transport_policy.cpp | 2 + .../src/http/winhttp/win_http_transport.cpp | 9 ++-- .../azure/storage/common/reliable_stream.hpp | 3 +- .../src/reliable_stream.cpp | 3 +- .../src/storage_retry_policy.cpp | 3 +- 11 files changed, 74 insertions(+), 58 deletions(-) diff --git a/sdk/core/azure-core/inc/azure/core/http/body_stream.hpp b/sdk/core/azure-core/inc/azure/core/http/body_stream.hpp index 536876619..51c8e3c00 100644 --- a/sdk/core/azure-core/inc/azure/core/http/body_stream.hpp +++ b/sdk/core/azure-core/inc/azure/core/http/body_stream.hpp @@ -37,6 +37,20 @@ namespace Azure { namespace Core { namespace Http { *@brief Used to read data to/from a service. */ class BodyStream { + private: + /** + * @brief Read portion of data into a buffer. + * + * @remark This is the `OnRead` implementation that all derived classes need to provide. + * + * @param context #Context so that operation can be cancelled. + * @param buffer Pointer to a byte buffer to read the data into. + * @param count Size of the buffer to read the data into. + * + * @return Number of bytes read. + */ + virtual int64_t OnRead(Context const& context, uint8_t* buffer, int64_t count) = 0; + public: /// Destructor. virtual ~BodyStream() = default; @@ -67,7 +81,11 @@ namespace Azure { namespace Core { namespace Http { * * @return Number of bytes read. */ - virtual int64_t Read(Context const& context, uint8_t* buffer, int64_t count) = 0; + int64_t Read(Context const& context, uint8_t* buffer, int64_t count) + { + context.ThrowIfCancelled(); + return OnRead(context, buffer, count); + }; /** * @brief Read #BodyStream into a buffer until the buffer is filled, or until the stream is @@ -107,6 +125,8 @@ namespace Azure { namespace Core { namespace Http { int64_t m_length; int64_t m_offset = 0; + int64_t OnRead(Context const& context, uint8_t* buffer, int64_t count) override; + public: // Forbid constructor for rval so we don't end up storing dangling ptr MemoryBodyStream(std::vector const&&) = delete; @@ -134,8 +154,6 @@ namespace Azure { namespace Core { namespace Http { int64_t Length() const override { return this->m_length; } - int64_t Read(Context const& context, uint8_t* buffer, int64_t count) override; - void Rewind() override { m_offset = 0; } }; @@ -144,6 +162,15 @@ namespace Azure { namespace Core { namespace Http { * @remark Used for requests with no body. */ class NullBodyStream : public Azure::Core::Http::BodyStream { + private: + int64_t OnRead(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) override + { + (void)context; + (void)buffer; + (void)count; + return 0; + }; + public: /// Constructor. explicit NullBodyStream() {} @@ -152,14 +179,6 @@ namespace Azure { namespace Core { namespace Http { void Rewind() override {} - int64_t Read(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) override - { - (void)context; - (void)buffer; - (void)count; - return 0; - }; - /** * @brief Gets a singleton instance of a #NullBodyStream. */ @@ -186,6 +205,8 @@ namespace Azure { namespace Core { namespace Http { // mutable int64_t m_offset; + int64_t OnRead(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) override; + public: #if defined(AZ_PLATFORM_POSIX) /** @@ -216,8 +237,6 @@ namespace Azure { namespace Core { namespace Http { // Rewind seek back to 0 void Rewind() override { this->m_offset = 0; } - int64_t Read(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) override; - int64_t Length() const override { return this->m_length; }; }; @@ -230,6 +249,8 @@ namespace Azure { namespace Core { namespace Http { int64_t m_length; int64_t m_bytesRead = 0; + int64_t OnRead(Context const& context, uint8_t* buffer, int64_t count) override; + public: /** * @brief Construct from another #BodyStream. @@ -248,7 +269,6 @@ namespace Azure { namespace Core { namespace Http { this->m_inner->Rewind(); this->m_bytesRead = 0; } - int64_t Read(Context const& context, uint8_t* buffer, int64_t count) override; }; }}} // namespace Azure::Core::Http diff --git a/sdk/core/azure-core/inc/azure/core/http/winhttp/win_http_client.hpp b/sdk/core/azure-core/inc/azure/core/http/winhttp/win_http_client.hpp index dd21cc0fb..6fdc56345 100644 --- a/sdk/core/azure-core/inc/azure/core/http/winhttp/win_http_client.hpp +++ b/sdk/core/azure-core/inc/azure/core/http/winhttp/win_http_client.hpp @@ -84,6 +84,16 @@ namespace Azure { namespace Core { namespace Http { int64_t m_streamTotalRead; + /** + * @brief Implement #BodyStream `OnRead`. Calling this function pulls data from the wire. + * + * @param context #Context so that operation can be cancelled. + * @param buffer Buffer where data from wire is written to. + * @param count The number of bytes to read from the network. + * @return The actual number of bytes read from the network. + */ + int64_t OnRead(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) override; + public: WinHttpStream(std::unique_ptr handleManager, int64_t contentLength) : m_handleManager(std::move(handleManager)), m_contentLength(contentLength), @@ -97,16 +107,6 @@ namespace Azure { namespace Core { namespace Http { * @return The size of the payload. */ int64_t Length() const override { return this->m_contentLength; } - - /** - * @brief Implement #BodyStream read. Calling this function pulls data from the wire. - * - * @param context #Context so that operation can be cancelled. - * @param buffer Buffer where data from wire is written to. - * @param count The number of bytes to read from the network. - * @return The actual number of bytes read from the network. - */ - int64_t Read(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) override; }; } // namespace Details diff --git a/sdk/core/azure-core/src/http/body_stream.cpp b/sdk/core/azure-core/src/http/body_stream.cpp index e5f26874f..3809989f2 100644 --- a/sdk/core/azure-core/src/http/body_stream.cpp +++ b/sdk/core/azure-core/src/http/body_stream.cpp @@ -70,10 +70,9 @@ std::vector BodyStream::ReadToEnd(Context const& context, BodyStream& b } } -int64_t MemoryBodyStream::Read(Context const& context, uint8_t* buffer, int64_t count) +int64_t MemoryBodyStream::OnRead(Context const& context, uint8_t* buffer, int64_t count) { - context.ThrowIfCancelled(); - + (void)context; int64_t copy_length = std::min(count, static_cast(this->m_length - this->m_offset)); // Copy what's left or just the count std::memcpy(buffer, this->m_data + m_offset, static_cast(copy_length)); @@ -84,10 +83,9 @@ int64_t MemoryBodyStream::Read(Context const& context, uint8_t* buffer, int64_t } #if defined(AZ_PLATFORM_POSIX) -int64_t FileBodyStream::Read(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) +int64_t FileBodyStream::OnRead(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) { - context.ThrowIfCancelled(); - + (void)context; auto result = pread( this->m_fd, buffer, @@ -103,10 +101,9 @@ int64_t FileBodyStream::Read(Azure::Core::Context const& context, uint8_t* buffe return result; } #elif defined(AZ_PLATFORM_WINDOWS) -int64_t FileBodyStream::Read(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) +int64_t FileBodyStream::OnRead(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) { - context.ThrowIfCancelled(); - + (void)context; DWORD numberOfBytesRead; auto o = OVERLAPPED(); o.Offset = (DWORD)(this->m_baseOffset + this->m_offset); @@ -136,7 +133,7 @@ int64_t FileBodyStream::Read(Azure::Core::Context const& context, uint8_t* buffe } #endif -int64_t LimitBodyStream::Read(Context const& context, uint8_t* buffer, int64_t count) +int64_t LimitBodyStream::OnRead(Context const& context, uint8_t* buffer, int64_t count) { (void)context; // Read up to count or whatever length is remaining; whichever is less diff --git a/sdk/core/azure-core/src/http/curl/curl.cpp b/sdk/core/azure-core/src/http/curl/curl.cpp index b97cd7869..569abbb92 100644 --- a/sdk/core/azure-core/src/http/curl/curl.cpp +++ b/sdk/core/azure-core/src/http/curl/curl.cpp @@ -604,10 +604,8 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse( } // Read from curl session -int64_t CurlSession::Read(Context const& context, uint8_t* buffer, int64_t count) +int64_t CurlSession::OnRead(Context const& context, uint8_t* buffer, int64_t count) { - context.ThrowIfCancelled(); - if (count <= 0 || this->IsEOF()) { return 0; diff --git a/sdk/core/azure-core/src/http/curl/curl_session_private.hpp b/sdk/core/azure-core/src/http/curl/curl_session_private.hpp index 45ef673f0..d4462a634 100644 --- a/sdk/core/azure-core/src/http/curl/curl_session_private.hpp +++ b/sdk/core/azure-core/src/http/curl/curl_session_private.hpp @@ -342,6 +342,16 @@ namespace Azure { namespace Core { namespace Http { */ bool m_keepAlive = true; + /** + * @brief Implement #BodyStream `OnRead`. Calling this function pulls data from the wire. + * + * @param context #Context so that operation can be cancelled. + * @param buffer Buffer where data from wire is written to. + * @param count The number of bytes to read from the network. + * @return The actual number of bytes read from the network. + */ + int64_t OnRead(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) override; + public: /** * @brief Construct a new Curl Session object. Init internal libcurl handler. @@ -390,16 +400,6 @@ namespace Azure { namespace Core { namespace Http { * @return The size of the payload. */ int64_t Length() const override { return m_contentLength; } - - /** - * @brief Implement #BodyStream read. Calling this function pulls data from the wire. - * - * @param context #Context so that operation can be cancelled. - * @param buffer Buffer where data from wire is written to. - * @param count The number of bytes to read from the network. - * @return The actual number of bytes read from the network. - */ - int64_t Read(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) override; }; }}} // namespace Azure::Core::Http diff --git a/sdk/core/azure-core/src/http/retry_policy.cpp b/sdk/core/azure-core/src/http/retry_policy.cpp index 3b3d7c71d..81f2eb133 100644 --- a/sdk/core/azure-core/src/http/retry_policy.cpp +++ b/sdk/core/azure-core/src/http/retry_policy.cpp @@ -177,12 +177,11 @@ std::unique_ptr Azure::Core::Http::RetryPolicy::Send( // we proceed immediately if it is 0. if (retryAfter.count() > 0) { + ctx.ThrowIfCancelled(); std::this_thread::sleep_for(retryAfter); } // Restore the original query parameters before next retry request.GetUrl().SetQueryParameters(std::move(originalQueryParameters)); - - ctx.ThrowIfCancelled(); } } diff --git a/sdk/core/azure-core/src/http/transport_policy.cpp b/sdk/core/azure-core/src/http/transport_policy.cpp index 06c625cab..73b5b2043 100644 --- a/sdk/core/azure-core/src/http/transport_policy.cpp +++ b/sdk/core/azure-core/src/http/transport_policy.cpp @@ -25,6 +25,8 @@ std::unique_ptr TransportPolicy::Send( NextHttpPolicy nextHttpPolicy) const { (void)nextHttpPolicy; + ctx.ThrowIfCancelled(); + /** * The transport policy is always the last policy. * Call the transport and return diff --git a/sdk/core/azure-core/src/http/winhttp/win_http_transport.cpp b/sdk/core/azure-core/src/http/winhttp/win_http_transport.cpp index 071ebfd6e..6d8531286 100644 --- a/sdk/core/azure-core/src/http/winhttp/win_http_transport.cpp +++ b/sdk/core/azure-core/src/http/winhttp/win_http_transport.cpp @@ -557,10 +557,8 @@ std::unique_ptr WinHttpTransport::Send(Context const& context, Requ } // Read the response from the sent request. -int64_t Details::WinHttpStream::Read(Context const& context, uint8_t* buffer, int64_t count) +int64_t Details::WinHttpStream::OnRead(Context const& context, uint8_t* buffer, int64_t count) { - context.ThrowIfCancelled(); - if (count <= 0 || this->m_isEOF) { return 0; @@ -587,14 +585,15 @@ int64_t Details::WinHttpStream::Read(Context const& context, uint8_t* buffer, in + std::to_string(error) + "."); } - context.ThrowIfCancelled(); - DWORD numberOfBytesToRead = numberOfBytesAvailable; if (numberOfBytesAvailable > count) { numberOfBytesToRead = static_cast(count); } + // Check context cancellation again before the next I/O + context.ThrowIfCancelled(); + if (!WinHttpReadData( this->m_handleManager->m_requestHandle, (LPVOID)(buffer), diff --git a/sdk/storage/azure-storage-common/inc/azure/storage/common/reliable_stream.hpp b/sdk/storage/azure-storage-common/inc/azure/storage/common/reliable_stream.hpp index 6f80a5604..d33966dd8 100644 --- a/sdk/storage/azure-storage-common/inc/azure/storage/common/reliable_stream.hpp +++ b/sdk/storage/azure-storage-common/inc/azure/storage/common/reliable_stream.hpp @@ -52,6 +52,8 @@ namespace Azure { namespace Storage { // Options to use when getting a new bodyStream like current offset HttpGetterInfo m_retryInfo; + int64_t OnRead(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) override; + public: explicit ReliableStream( std::unique_ptr inner, @@ -68,7 +70,6 @@ namespace Azure { namespace Storage { this->m_inner->Rewind(); this->m_retryInfo.Offset = 0; } - int64_t Read(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) override; }; }} // namespace Azure::Storage diff --git a/sdk/storage/azure-storage-common/src/reliable_stream.cpp b/sdk/storage/azure-storage-common/src/reliable_stream.cpp index 21edb1547..a28beaefd 100644 --- a/sdk/storage/azure-storage-common/src/reliable_stream.cpp +++ b/sdk/storage/azure-storage-common/src/reliable_stream.cpp @@ -10,8 +10,9 @@ using Azure::Core::Http::BodyStream; namespace Azure { namespace Storage { - int64_t ReliableStream::Read(Context const& context, uint8_t* buffer, int64_t count) + int64_t ReliableStream::OnRead(Context const& context, uint8_t* buffer, int64_t count) { + (void)context; for (int64_t intent = 1;; intent++) { // check if we need to get inner stream diff --git a/sdk/storage/azure-storage-common/src/storage_retry_policy.cpp b/sdk/storage/azure-storage-common/src/storage_retry_policy.cpp index 3dfeed4e7..e3aada3fc 100644 --- a/sdk/storage/azure-storage-common/src/storage_retry_policy.cpp +++ b/sdk/storage/azure-storage-common/src/storage_retry_policy.cpp @@ -95,8 +95,6 @@ namespace Azure { namespace Storage { namespace Details { switchHost(); - ctx.ThrowIfCancelled(); - const int64_t baseRetryDelayMs = m_options.RetryDelay.count(); const int64_t maxRetryDelayMs = m_options.MaxRetryDelay.count(); int64_t retryDelayMs = maxRetryDelayMs; @@ -124,6 +122,7 @@ namespace Azure { namespace Storage { namespace Details { if (retryDelayMs != 0) { + ctx.ThrowIfCancelled(); std::this_thread::sleep_for(std::chrono::milliseconds(retryDelayMs)); } }