add private interface to disable reliable stream (#4158)

This commit is contained in:
JinmingHu 2022-12-06 11:58:17 +08:00 committed by GitHub
parent 8ec8cc2ce5
commit 5f93ee3d3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -8,42 +8,54 @@
using Azure::Core::Context;
using Azure::Core::IO::BodyStream;
namespace Azure { namespace Storage { namespace _internal {
namespace Azure { namespace Storage {
size_t ReliableStream::OnRead(uint8_t* buffer, size_t count, Context const& context)
{
(void)context;
for (int64_t intent = 1;; intent++)
namespace {
bool g_reliableStreamEnabled = true;
}
namespace _detail {
void EnableReliableStream() { g_reliableStreamEnabled = true; }
void DisableReliableStream() { g_reliableStreamEnabled = false; }
} // namespace _detail
namespace _internal {
size_t ReliableStream::OnRead(uint8_t* buffer, size_t count, Context const& context)
{
// check if we need to get inner stream
if (this->m_inner == nullptr)
(void)context;
for (int64_t intent = 1;; intent++)
{
// Get a bodyStream that starts from last known offset
// if this fails, throw bubbles up
// As m_inner is unique_pr, it will be destructed on reassignment, cleaning up network
// session.
this->m_inner = this->m_streamReconnector(this->m_retryOffset, context);
}
try
{
auto const readBytes = this->m_inner->Read(buffer, count, context);
// update offset
this->m_retryOffset += readBytes;
return readBytes;
}
catch (std::runtime_error const& e)
{
// forget about the inner stream. We will need to request a new one
// As m_inner is unique_pr, it will be destructed on reassignment (cleaning up network
// session).
this->m_inner.reset();
(void)e; // todo: maybe log the exception in the future?
if (intent == this->m_options.MaxRetryRequests)
// check if we need to get inner stream
if (this->m_inner == nullptr)
{
// max retry. End loop. Rethrow
throw;
// Get a bodyStream that starts from last known offset
// if this fails, throw bubbles up
// As m_inner is unique_pr, it will be destructed on reassignment, cleaning up network
// session.
this->m_inner = this->m_streamReconnector(this->m_retryOffset, context);
}
try
{
auto const readBytes = this->m_inner->Read(buffer, count, context);
// update offset
this->m_retryOffset += readBytes;
return readBytes;
}
catch (std::runtime_error const& e)
{
// forget about the inner stream. We will need to request a new one
// As m_inner is unique_pr, it will be destructed on reassignment (cleaning up network
// session).
this->m_inner.reset();
(void)e; // todo: maybe log the exception in the future?
if (intent == this->m_options.MaxRetryRequests || !g_reliableStreamEnabled)
{
// max retry. End loop. Rethrow
throw;
}
}
}
}
}
}}} // namespace Azure::Storage::_internal
} // namespace _internal
}} // namespace Azure::Storage