From 5426cf7a0733d2dfa6e837454ca4f35beca77246 Mon Sep 17 00:00:00 2001 From: Victor Vazquez Date: Mon, 3 Aug 2020 10:09:12 -0700 Subject: [PATCH] Retriable stream (#369) * Adding retriable_stream --- sdk/core/azure-core/inc/http/curl/curl.hpp | 2 - sdk/storage/CMakeLists.txt | 2 + sdk/storage/inc/common/reliable_stream.hpp | 74 ++++++++++++++++++++++ sdk/storage/src/common/reliable_stream.cpp | 52 +++++++++++++++ 4 files changed, 128 insertions(+), 2 deletions(-) create mode 100644 sdk/storage/inc/common/reliable_stream.hpp create mode 100644 sdk/storage/src/common/reliable_stream.cpp diff --git a/sdk/core/azure-core/inc/http/curl/curl.hpp b/sdk/core/azure-core/inc/http/curl/curl.hpp index 09ef42e48..c35345ea8 100644 --- a/sdk/core/azure-core/inc/http/curl/curl.hpp +++ b/sdk/core/azure-core/inc/http/curl/curl.hpp @@ -403,8 +403,6 @@ namespace Azure { namespace Core { namespace Http { int64_t Length() const override { return this->m_contentLength; } - void Rewind() override {} - int64_t Read(Azure::Core::Context& context, uint8_t* buffer, int64_t count) override; }; diff --git a/sdk/storage/CMakeLists.txt b/sdk/storage/CMakeLists.txt index 3fbcd5fe3..1987a41c3 100644 --- a/sdk/storage/CMakeLists.txt +++ b/sdk/storage/CMakeLists.txt @@ -21,6 +21,7 @@ set(AZURE_STORAGE_COMMON_HEADER inc/common/constants.hpp inc/common/crypt.hpp inc/common/file_io.hpp + inc/common/reliable_stream.hpp inc/common/shared_key_policy.hpp inc/common/storage_common.hpp inc/common/storage_credential.hpp @@ -35,6 +36,7 @@ set(AZURE_STORAGE_COMMON_SOURCE src/common/common_headers_request_policy.cpp src/common/crypt.cpp src/common/file_io.cpp + src/common/reliable_stream.cpp src/common/shared_key_policy.cpp src/common/storage_credential.cpp src/common/storage_error.cpp diff --git a/sdk/storage/inc/common/reliable_stream.hpp b/sdk/storage/inc/common/reliable_stream.hpp new file mode 100644 index 000000000..355dc5588 --- /dev/null +++ b/sdk/storage/inc/common/reliable_stream.hpp @@ -0,0 +1,74 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// SPDX-License-Identifier: MIT + +#include "context.hpp" +#include "http/body_stream.hpp" + +#include +#include + +namespace Azure { namespace Storage { + + // options used by the fm callback that will get a bodyStream starting from last offset + struct HTTPGetterInfo + { + int64_t Offset = 0; + }; + + // Defines a fn signature to be use to get a bodyStream from an specific offset. + typedef std::function< + std::unique_ptr(Azure::Core::Context&, HTTPGetterInfo const&)> + HTTPGetter; + + // Options used by reliable stream + struct ReliableStreamOptions + { + // configures the maximun retries to be done. + int64_t MaxRetryRequests; + // Use for testing purposes only. + bool DoInjectError; + }; + + /** + * @brief Decorates a body stream by providing reliability while readind from it. + * ReliableStream uses an HTTPGetter callback (provided on constructor) to get a bodyStream + * starting on last known offset to resume a fail Read() operation. + * + * @remark An HTTPGetter callback is expected to verify the initial `eTag` from first http request + * to ensure read operation will continue on the same content. + * + * @remark An HTTPGetter callback is expected to calculate and set the range header based on the + * offset provided by the ReliableStream. + * + */ + class ReliableStream : public Azure::Core::Http::BodyStream { + private: + // initial bodyStream. + std::unique_ptr m_inner; + // Configuration for the re-triable stream + ReliableStreamOptions const m_options; + // callback to get a bodyStream in case Read operation fails + HTTPGetter m_httpGetter; + // Options to use when getting a new bodyStream like current offset + HTTPGetterInfo m_retryInfo; + + public: + explicit ReliableStream( + std::unique_ptr inner, + ReliableStreamOptions const options, + HTTPGetter httpGetter) + : m_inner(std::move(inner)), m_options(options), m_httpGetter(std::move(httpGetter)) + { + } + + int64_t Length() const override { return this->m_inner->Length(); } + void Rewind() override + { + // Rewind directly from a transportAdapter body stream (like libcurl) would throw + this->m_inner->Rewind(); + this->m_retryInfo.Offset = 0; + } + int64_t Read(Azure::Core::Context& context, uint8_t* buffer, int64_t count) override; + }; + +}} // namespace Azure::Storage diff --git a/sdk/storage/src/common/reliable_stream.cpp b/sdk/storage/src/common/reliable_stream.cpp new file mode 100644 index 000000000..1870e7d87 --- /dev/null +++ b/sdk/storage/src/common/reliable_stream.cpp @@ -0,0 +1,52 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// SPDX-License-Identifier: MIT + +#include "common/reliable_stream.hpp" +#include "http/http.hpp" + +using Azure::Core::Context; +using Azure::Core::Http::BodyStream; + +namespace Azure { namespace Storage { + + int64_t ReliableStream::Read(Context& context, uint8_t* buffer, int64_t count) + { + if (this->m_options.DoInjectError) + { + throw std::runtime_error("Injected error"); + } + + for (int64_t intent = 1;; intent++) + { + // check if we need to get inner stream + if (this->m_inner == nullptr) + { + // Get a a bodyStream that starts from last known offset + // if this fails, throw bubbles up + // As m_inner is unique_pr, it will be destructed on reassignment, cleaning up network + // session. + this->m_inner = this->m_httpGetter(context, this->m_retryInfo); + } + try + { + auto const readBytes = this->m_inner->Read(context, buffer, count); + // update offset + this->m_retryInfo.Offset += readBytes; + return readBytes; + } + catch (std::runtime_error const& e) + { + // forget about the inner stream. We will need to request a new one + // As m_inner is unique_pr, it will be destructed on reassignment (cleaning up network + // session). + this->m_inner.release(); + (void)e; // todo: maybe log the exception in the future? + if (intent == this->m_options.MaxRetryRequests) + { + // max retry. End loop. Rethrow + throw; + } + } + } + } +}} // namespace Azure::Storage