remove cancellation check from retry policies (#1196)
* remove cancellation check from retry policies * Update BodyStream to do cancellation check on Read
This commit is contained in:
parent
e141cf1a6b
commit
9d9f1e6f4c
@ -37,6 +37,20 @@ namespace Azure { namespace Core { namespace Http {
|
||||
*@brief Used to read data to/from a service.
|
||||
*/
|
||||
class BodyStream {
|
||||
private:
|
||||
/**
|
||||
* @brief Read portion of data into a buffer.
|
||||
*
|
||||
* @remark This is the `OnRead` implementation that all derived classes need to provide.
|
||||
*
|
||||
* @param context #Context so that operation can be cancelled.
|
||||
* @param buffer Pointer to a byte buffer to read the data into.
|
||||
* @param count Size of the buffer to read the data into.
|
||||
*
|
||||
* @return Number of bytes read.
|
||||
*/
|
||||
virtual int64_t OnRead(Context const& context, uint8_t* buffer, int64_t count) = 0;
|
||||
|
||||
public:
|
||||
/// Destructor.
|
||||
virtual ~BodyStream() = default;
|
||||
@ -67,7 +81,11 @@ namespace Azure { namespace Core { namespace Http {
|
||||
*
|
||||
* @return Number of bytes read.
|
||||
*/
|
||||
virtual int64_t Read(Context const& context, uint8_t* buffer, int64_t count) = 0;
|
||||
int64_t Read(Context const& context, uint8_t* buffer, int64_t count)
|
||||
{
|
||||
context.ThrowIfCancelled();
|
||||
return OnRead(context, buffer, count);
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Read #BodyStream into a buffer until the buffer is filled, or until the stream is
|
||||
@ -107,6 +125,8 @@ namespace Azure { namespace Core { namespace Http {
|
||||
int64_t m_length;
|
||||
int64_t m_offset = 0;
|
||||
|
||||
int64_t OnRead(Context const& context, uint8_t* buffer, int64_t count) override;
|
||||
|
||||
public:
|
||||
// Forbid constructor for rval so we don't end up storing dangling ptr
|
||||
MemoryBodyStream(std::vector<uint8_t> const&&) = delete;
|
||||
@ -134,8 +154,6 @@ namespace Azure { namespace Core { namespace Http {
|
||||
|
||||
int64_t Length() const override { return this->m_length; }
|
||||
|
||||
int64_t Read(Context const& context, uint8_t* buffer, int64_t count) override;
|
||||
|
||||
void Rewind() override { m_offset = 0; }
|
||||
};
|
||||
|
||||
@ -144,6 +162,15 @@ namespace Azure { namespace Core { namespace Http {
|
||||
* @remark Used for requests with no body.
|
||||
*/
|
||||
class NullBodyStream : public Azure::Core::Http::BodyStream {
|
||||
private:
|
||||
int64_t OnRead(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) override
|
||||
{
|
||||
(void)context;
|
||||
(void)buffer;
|
||||
(void)count;
|
||||
return 0;
|
||||
};
|
||||
|
||||
public:
|
||||
/// Constructor.
|
||||
explicit NullBodyStream() {}
|
||||
@ -152,14 +179,6 @@ namespace Azure { namespace Core { namespace Http {
|
||||
|
||||
void Rewind() override {}
|
||||
|
||||
int64_t Read(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) override
|
||||
{
|
||||
(void)context;
|
||||
(void)buffer;
|
||||
(void)count;
|
||||
return 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Gets a singleton instance of a #NullBodyStream.
|
||||
*/
|
||||
@ -186,6 +205,8 @@ namespace Azure { namespace Core { namespace Http {
|
||||
// mutable
|
||||
int64_t m_offset;
|
||||
|
||||
int64_t OnRead(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) override;
|
||||
|
||||
public:
|
||||
#if defined(AZ_PLATFORM_POSIX)
|
||||
/**
|
||||
@ -216,8 +237,6 @@ namespace Azure { namespace Core { namespace Http {
|
||||
// Rewind seek back to 0
|
||||
void Rewind() override { this->m_offset = 0; }
|
||||
|
||||
int64_t Read(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) override;
|
||||
|
||||
int64_t Length() const override { return this->m_length; };
|
||||
};
|
||||
|
||||
@ -230,6 +249,8 @@ namespace Azure { namespace Core { namespace Http {
|
||||
int64_t m_length;
|
||||
int64_t m_bytesRead = 0;
|
||||
|
||||
int64_t OnRead(Context const& context, uint8_t* buffer, int64_t count) override;
|
||||
|
||||
public:
|
||||
/**
|
||||
* @brief Construct from another #BodyStream.
|
||||
@ -248,7 +269,6 @@ namespace Azure { namespace Core { namespace Http {
|
||||
this->m_inner->Rewind();
|
||||
this->m_bytesRead = 0;
|
||||
}
|
||||
int64_t Read(Context const& context, uint8_t* buffer, int64_t count) override;
|
||||
};
|
||||
|
||||
}}} // namespace Azure::Core::Http
|
||||
|
||||
@ -84,6 +84,16 @@ namespace Azure { namespace Core { namespace Http {
|
||||
|
||||
int64_t m_streamTotalRead;
|
||||
|
||||
/**
|
||||
* @brief Implement #BodyStream `OnRead`. Calling this function pulls data from the wire.
|
||||
*
|
||||
* @param context #Context so that operation can be cancelled.
|
||||
* @param buffer Buffer where data from wire is written to.
|
||||
* @param count The number of bytes to read from the network.
|
||||
* @return The actual number of bytes read from the network.
|
||||
*/
|
||||
int64_t OnRead(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) override;
|
||||
|
||||
public:
|
||||
WinHttpStream(std::unique_ptr<HandleManager> handleManager, int64_t contentLength)
|
||||
: m_handleManager(std::move(handleManager)), m_contentLength(contentLength),
|
||||
@ -97,16 +107,6 @@ namespace Azure { namespace Core { namespace Http {
|
||||
* @return The size of the payload.
|
||||
*/
|
||||
int64_t Length() const override { return this->m_contentLength; }
|
||||
|
||||
/**
|
||||
* @brief Implement #BodyStream read. Calling this function pulls data from the wire.
|
||||
*
|
||||
* @param context #Context so that operation can be cancelled.
|
||||
* @param buffer Buffer where data from wire is written to.
|
||||
* @param count The number of bytes to read from the network.
|
||||
* @return The actual number of bytes read from the network.
|
||||
*/
|
||||
int64_t Read(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) override;
|
||||
};
|
||||
} // namespace Details
|
||||
|
||||
|
||||
@ -70,10 +70,9 @@ std::vector<uint8_t> BodyStream::ReadToEnd(Context const& context, BodyStream& b
|
||||
}
|
||||
}
|
||||
|
||||
int64_t MemoryBodyStream::Read(Context const& context, uint8_t* buffer, int64_t count)
|
||||
int64_t MemoryBodyStream::OnRead(Context const& context, uint8_t* buffer, int64_t count)
|
||||
{
|
||||
context.ThrowIfCancelled();
|
||||
|
||||
(void)context;
|
||||
int64_t copy_length = std::min(count, static_cast<int64_t>(this->m_length - this->m_offset));
|
||||
// Copy what's left or just the count
|
||||
std::memcpy(buffer, this->m_data + m_offset, static_cast<size_t>(copy_length));
|
||||
@ -84,10 +83,9 @@ int64_t MemoryBodyStream::Read(Context const& context, uint8_t* buffer, int64_t
|
||||
}
|
||||
|
||||
#if defined(AZ_PLATFORM_POSIX)
|
||||
int64_t FileBodyStream::Read(Azure::Core::Context const& context, uint8_t* buffer, int64_t count)
|
||||
int64_t FileBodyStream::OnRead(Azure::Core::Context const& context, uint8_t* buffer, int64_t count)
|
||||
{
|
||||
context.ThrowIfCancelled();
|
||||
|
||||
(void)context;
|
||||
auto result = pread(
|
||||
this->m_fd,
|
||||
buffer,
|
||||
@ -103,10 +101,9 @@ int64_t FileBodyStream::Read(Azure::Core::Context const& context, uint8_t* buffe
|
||||
return result;
|
||||
}
|
||||
#elif defined(AZ_PLATFORM_WINDOWS)
|
||||
int64_t FileBodyStream::Read(Azure::Core::Context const& context, uint8_t* buffer, int64_t count)
|
||||
int64_t FileBodyStream::OnRead(Azure::Core::Context const& context, uint8_t* buffer, int64_t count)
|
||||
{
|
||||
context.ThrowIfCancelled();
|
||||
|
||||
(void)context;
|
||||
DWORD numberOfBytesRead;
|
||||
auto o = OVERLAPPED();
|
||||
o.Offset = (DWORD)(this->m_baseOffset + this->m_offset);
|
||||
@ -136,7 +133,7 @@ int64_t FileBodyStream::Read(Azure::Core::Context const& context, uint8_t* buffe
|
||||
}
|
||||
#endif
|
||||
|
||||
int64_t LimitBodyStream::Read(Context const& context, uint8_t* buffer, int64_t count)
|
||||
int64_t LimitBodyStream::OnRead(Context const& context, uint8_t* buffer, int64_t count)
|
||||
{
|
||||
(void)context;
|
||||
// Read up to count or whatever length is remaining; whichever is less
|
||||
|
||||
@ -604,10 +604,8 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse(
|
||||
}
|
||||
|
||||
// Read from curl session
|
||||
int64_t CurlSession::Read(Context const& context, uint8_t* buffer, int64_t count)
|
||||
int64_t CurlSession::OnRead(Context const& context, uint8_t* buffer, int64_t count)
|
||||
{
|
||||
context.ThrowIfCancelled();
|
||||
|
||||
if (count <= 0 || this->IsEOF())
|
||||
{
|
||||
return 0;
|
||||
|
||||
@ -342,6 +342,16 @@ namespace Azure { namespace Core { namespace Http {
|
||||
*/
|
||||
bool m_keepAlive = true;
|
||||
|
||||
/**
|
||||
* @brief Implement #BodyStream `OnRead`. Calling this function pulls data from the wire.
|
||||
*
|
||||
* @param context #Context so that operation can be cancelled.
|
||||
* @param buffer Buffer where data from wire is written to.
|
||||
* @param count The number of bytes to read from the network.
|
||||
* @return The actual number of bytes read from the network.
|
||||
*/
|
||||
int64_t OnRead(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) override;
|
||||
|
||||
public:
|
||||
/**
|
||||
* @brief Construct a new Curl Session object. Init internal libcurl handler.
|
||||
@ -390,16 +400,6 @@ namespace Azure { namespace Core { namespace Http {
|
||||
* @return The size of the payload.
|
||||
*/
|
||||
int64_t Length() const override { return m_contentLength; }
|
||||
|
||||
/**
|
||||
* @brief Implement #BodyStream read. Calling this function pulls data from the wire.
|
||||
*
|
||||
* @param context #Context so that operation can be cancelled.
|
||||
* @param buffer Buffer where data from wire is written to.
|
||||
* @param count The number of bytes to read from the network.
|
||||
* @return The actual number of bytes read from the network.
|
||||
*/
|
||||
int64_t Read(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) override;
|
||||
};
|
||||
|
||||
}}} // namespace Azure::Core::Http
|
||||
|
||||
@ -177,12 +177,11 @@ std::unique_ptr<RawResponse> Azure::Core::Http::RetryPolicy::Send(
|
||||
// we proceed immediately if it is 0.
|
||||
if (retryAfter.count() > 0)
|
||||
{
|
||||
ctx.ThrowIfCancelled();
|
||||
std::this_thread::sleep_for(retryAfter);
|
||||
}
|
||||
|
||||
// Restore the original query parameters before next retry
|
||||
request.GetUrl().SetQueryParameters(std::move(originalQueryParameters));
|
||||
|
||||
ctx.ThrowIfCancelled();
|
||||
}
|
||||
}
|
||||
|
||||
@ -25,6 +25,8 @@ std::unique_ptr<RawResponse> TransportPolicy::Send(
|
||||
NextHttpPolicy nextHttpPolicy) const
|
||||
{
|
||||
(void)nextHttpPolicy;
|
||||
ctx.ThrowIfCancelled();
|
||||
|
||||
/**
|
||||
* The transport policy is always the last policy.
|
||||
* Call the transport and return
|
||||
|
||||
@ -557,10 +557,8 @@ std::unique_ptr<RawResponse> WinHttpTransport::Send(Context const& context, Requ
|
||||
}
|
||||
|
||||
// Read the response from the sent request.
|
||||
int64_t Details::WinHttpStream::Read(Context const& context, uint8_t* buffer, int64_t count)
|
||||
int64_t Details::WinHttpStream::OnRead(Context const& context, uint8_t* buffer, int64_t count)
|
||||
{
|
||||
context.ThrowIfCancelled();
|
||||
|
||||
if (count <= 0 || this->m_isEOF)
|
||||
{
|
||||
return 0;
|
||||
@ -587,14 +585,15 @@ int64_t Details::WinHttpStream::Read(Context const& context, uint8_t* buffer, in
|
||||
+ std::to_string(error) + ".");
|
||||
}
|
||||
|
||||
context.ThrowIfCancelled();
|
||||
|
||||
DWORD numberOfBytesToRead = numberOfBytesAvailable;
|
||||
if (numberOfBytesAvailable > count)
|
||||
{
|
||||
numberOfBytesToRead = static_cast<DWORD>(count);
|
||||
}
|
||||
|
||||
// Check context cancellation again before the next I/O
|
||||
context.ThrowIfCancelled();
|
||||
|
||||
if (!WinHttpReadData(
|
||||
this->m_handleManager->m_requestHandle,
|
||||
(LPVOID)(buffer),
|
||||
|
||||
@ -52,6 +52,8 @@ namespace Azure { namespace Storage {
|
||||
// Options to use when getting a new bodyStream like current offset
|
||||
HttpGetterInfo m_retryInfo;
|
||||
|
||||
int64_t OnRead(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) override;
|
||||
|
||||
public:
|
||||
explicit ReliableStream(
|
||||
std::unique_ptr<Azure::Core::Http::BodyStream> inner,
|
||||
@ -68,7 +70,6 @@ namespace Azure { namespace Storage {
|
||||
this->m_inner->Rewind();
|
||||
this->m_retryInfo.Offset = 0;
|
||||
}
|
||||
int64_t Read(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) override;
|
||||
};
|
||||
|
||||
}} // namespace Azure::Storage
|
||||
|
||||
@ -10,8 +10,9 @@ using Azure::Core::Http::BodyStream;
|
||||
|
||||
namespace Azure { namespace Storage {
|
||||
|
||||
int64_t ReliableStream::Read(Context const& context, uint8_t* buffer, int64_t count)
|
||||
int64_t ReliableStream::OnRead(Context const& context, uint8_t* buffer, int64_t count)
|
||||
{
|
||||
(void)context;
|
||||
for (int64_t intent = 1;; intent++)
|
||||
{
|
||||
// check if we need to get inner stream
|
||||
|
||||
@ -95,8 +95,6 @@ namespace Azure { namespace Storage { namespace Details {
|
||||
|
||||
switchHost();
|
||||
|
||||
ctx.ThrowIfCancelled();
|
||||
|
||||
const int64_t baseRetryDelayMs = m_options.RetryDelay.count();
|
||||
const int64_t maxRetryDelayMs = m_options.MaxRetryDelay.count();
|
||||
int64_t retryDelayMs = maxRetryDelayMs;
|
||||
@ -124,6 +122,7 @@ namespace Azure { namespace Storage { namespace Details {
|
||||
|
||||
if (retryDelayMs != 0)
|
||||
{
|
||||
ctx.ThrowIfCancelled();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(retryDelayMs));
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user