From 5f93ee3d3ee5f0f8706b019e1d08af0199a51e53 Mon Sep 17 00:00:00 2001 From: JinmingHu Date: Tue, 6 Dec 2022 11:58:17 +0800 Subject: [PATCH] add private interface to disable reliable stream (#4158) --- .../src/reliable_stream.cpp | 76 +++++++++++-------- 1 file changed, 44 insertions(+), 32 deletions(-) diff --git a/sdk/storage/azure-storage-common/src/reliable_stream.cpp b/sdk/storage/azure-storage-common/src/reliable_stream.cpp index 8611a3719..ab55d9a55 100644 --- a/sdk/storage/azure-storage-common/src/reliable_stream.cpp +++ b/sdk/storage/azure-storage-common/src/reliable_stream.cpp @@ -8,42 +8,54 @@ using Azure::Core::Context; using Azure::Core::IO::BodyStream; -namespace Azure { namespace Storage { namespace _internal { +namespace Azure { namespace Storage { - size_t ReliableStream::OnRead(uint8_t* buffer, size_t count, Context const& context) - { - (void)context; - for (int64_t intent = 1;; intent++) + namespace { + bool g_reliableStreamEnabled = true; + } + + namespace _detail { + void EnableReliableStream() { g_reliableStreamEnabled = true; } + void DisableReliableStream() { g_reliableStreamEnabled = false; } + } // namespace _detail + + namespace _internal { + + size_t ReliableStream::OnRead(uint8_t* buffer, size_t count, Context const& context) { - // check if we need to get inner stream - if (this->m_inner == nullptr) + (void)context; + for (int64_t intent = 1;; intent++) { - // Get a bodyStream that starts from last known offset - // if this fails, throw bubbles up - // As m_inner is unique_pr, it will be destructed on reassignment, cleaning up network - // session. - this->m_inner = this->m_streamReconnector(this->m_retryOffset, context); - } - try - { - auto const readBytes = this->m_inner->Read(buffer, count, context); - // update offset - this->m_retryOffset += readBytes; - return readBytes; - } - catch (std::runtime_error const& e) - { - // forget about the inner stream. We will need to request a new one - // As m_inner is unique_pr, it will be destructed on reassignment (cleaning up network - // session). - this->m_inner.reset(); - (void)e; // todo: maybe log the exception in the future? - if (intent == this->m_options.MaxRetryRequests) + // check if we need to get inner stream + if (this->m_inner == nullptr) { - // max retry. End loop. Rethrow - throw; + // Get a bodyStream that starts from last known offset + // if this fails, throw bubbles up + // As m_inner is unique_pr, it will be destructed on reassignment, cleaning up network + // session. + this->m_inner = this->m_streamReconnector(this->m_retryOffset, context); + } + try + { + auto const readBytes = this->m_inner->Read(buffer, count, context); + // update offset + this->m_retryOffset += readBytes; + return readBytes; + } + catch (std::runtime_error const& e) + { + // forget about the inner stream. We will need to request a new one + // As m_inner is unique_pr, it will be destructed on reassignment (cleaning up network + // session). + this->m_inner.reset(); + (void)e; // todo: maybe log the exception in the future? + if (intent == this->m_options.MaxRetryRequests || !g_reliableStreamEnabled) + { + // max retry. End loop. Rethrow + throw; + } } } } - } -}}} // namespace Azure::Storage::_internal + } // namespace _internal +}} // namespace Azure::Storage