Retriable stream (#369)

* Adding retriable_stream
This commit is contained in:
Victor Vazquez 2020-08-03 10:09:12 -07:00 committed by GitHub
parent 22f4ba12fc
commit 5426cf7a07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 128 additions and 2 deletions

View File

@ -403,8 +403,6 @@ namespace Azure { namespace Core { namespace Http {
int64_t Length() const override { return this->m_contentLength; }
void Rewind() override {}
int64_t Read(Azure::Core::Context& context, uint8_t* buffer, int64_t count) override;
};

View File

@ -21,6 +21,7 @@ set(AZURE_STORAGE_COMMON_HEADER
inc/common/constants.hpp
inc/common/crypt.hpp
inc/common/file_io.hpp
inc/common/reliable_stream.hpp
inc/common/shared_key_policy.hpp
inc/common/storage_common.hpp
inc/common/storage_credential.hpp
@ -35,6 +36,7 @@ set(AZURE_STORAGE_COMMON_SOURCE
src/common/common_headers_request_policy.cpp
src/common/crypt.cpp
src/common/file_io.cpp
src/common/reliable_stream.cpp
src/common/shared_key_policy.cpp
src/common/storage_credential.cpp
src/common/storage_error.cpp

View File

@ -0,0 +1,74 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
#include "context.hpp"
#include "http/body_stream.hpp"
#include <algorithm>
#include <functional>
namespace Azure { namespace Storage {
// options used by the fm callback that will get a bodyStream starting from last offset
struct HTTPGetterInfo
{
int64_t Offset = 0;
};
// Defines a fn signature to be use to get a bodyStream from an specific offset.
typedef std::function<
std::unique_ptr<Azure::Core::Http::BodyStream>(Azure::Core::Context&, HTTPGetterInfo const&)>
HTTPGetter;
// Options used by reliable stream
struct ReliableStreamOptions
{
// configures the maximun retries to be done.
int64_t MaxRetryRequests;
// Use for testing purposes only.
bool DoInjectError;
};
/**
* @brief Decorates a body stream by providing reliability while readind from it.
* ReliableStream uses an HTTPGetter callback (provided on constructor) to get a bodyStream
* starting on last known offset to resume a fail Read() operation.
*
* @remark An HTTPGetter callback is expected to verify the initial `eTag` from first http request
* to ensure read operation will continue on the same content.
*
* @remark An HTTPGetter callback is expected to calculate and set the range header based on the
* offset provided by the ReliableStream.
*
*/
class ReliableStream : public Azure::Core::Http::BodyStream {
private:
// initial bodyStream.
std::unique_ptr<Azure::Core::Http::BodyStream> m_inner;
// Configuration for the re-triable stream
ReliableStreamOptions const m_options;
// callback to get a bodyStream in case Read operation fails
HTTPGetter m_httpGetter;
// Options to use when getting a new bodyStream like current offset
HTTPGetterInfo m_retryInfo;
public:
explicit ReliableStream(
std::unique_ptr<Azure::Core::Http::BodyStream> inner,
ReliableStreamOptions const options,
HTTPGetter httpGetter)
: m_inner(std::move(inner)), m_options(options), m_httpGetter(std::move(httpGetter))
{
}
int64_t Length() const override { return this->m_inner->Length(); }
void Rewind() override
{
// Rewind directly from a transportAdapter body stream (like libcurl) would throw
this->m_inner->Rewind();
this->m_retryInfo.Offset = 0;
}
int64_t Read(Azure::Core::Context& context, uint8_t* buffer, int64_t count) override;
};
}} // namespace Azure::Storage

View File

@ -0,0 +1,52 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
#include "common/reliable_stream.hpp"
#include "http/http.hpp"
using Azure::Core::Context;
using Azure::Core::Http::BodyStream;
namespace Azure { namespace Storage {
int64_t ReliableStream::Read(Context& context, uint8_t* buffer, int64_t count)
{
if (this->m_options.DoInjectError)
{
throw std::runtime_error("Injected error");
}
for (int64_t intent = 1;; intent++)
{
// check if we need to get inner stream
if (this->m_inner == nullptr)
{
// Get a a bodyStream that starts from last known offset
// if this fails, throw bubbles up
// As m_inner is unique_pr, it will be destructed on reassignment, cleaning up network
// session.
this->m_inner = this->m_httpGetter(context, this->m_retryInfo);
}
try
{
auto const readBytes = this->m_inner->Read(context, buffer, count);
// update offset
this->m_retryInfo.Offset += readBytes;
return readBytes;
}
catch (std::runtime_error const& e)
{
// forget about the inner stream. We will need to request a new one
// As m_inner is unique_pr, it will be destructed on reassignment (cleaning up network
// session).
this->m_inner.release();
(void)e; // todo: maybe log the exception in the future?
if (intent == this->m_options.MaxRetryRequests)
{
// max retry. End loop. Rethrow
throw;
}
}
}
}
}} // namespace Azure::Storage