move HttpGetter and HttpGetterInfo to internal namespace (#2060)
* move HttpGetter and HttpGetterInfo to internal namespace * move to _internal
This commit is contained in:
parent
aa0a388e98
commit
a786595565
@ -177,17 +177,16 @@ namespace Azure { namespace Storage { namespace Blobs {
|
||||
// In case network failure during reading the body
|
||||
const Azure::ETag eTag = downloadResponse.Value.Details.ETag;
|
||||
|
||||
auto retryFunction =
|
||||
[this, options, eTag](
|
||||
const HttpGetterInfo& retryInfo,
|
||||
const Azure::Core::Context& context) -> std::unique_ptr<Azure::Core::IO::BodyStream> {
|
||||
auto retryFunction
|
||||
= [this, options, eTag](int64_t retryOffset, const Azure::Core::Context& context)
|
||||
-> std::unique_ptr<Azure::Core::IO::BodyStream> {
|
||||
DownloadBlobOptions newOptions = options;
|
||||
newOptions.Range = Core::Http::HttpRange();
|
||||
newOptions.Range.Value().Offset
|
||||
= (options.Range.HasValue() ? options.Range.Value().Offset : 0) + retryInfo.Offset;
|
||||
= (options.Range.HasValue() ? options.Range.Value().Offset : 0) + retryOffset;
|
||||
if (options.Range.HasValue() && options.Range.Value().Length.HasValue())
|
||||
{
|
||||
newOptions.Range.Value().Length = options.Range.Value().Length.Value() - retryInfo.Offset;
|
||||
newOptions.Range.Value().Length = options.Range.Value().Length.Value() - retryOffset;
|
||||
}
|
||||
if (!newOptions.AccessConditions.IfMatch.HasValue())
|
||||
{
|
||||
@ -196,9 +195,9 @@ namespace Azure { namespace Storage { namespace Blobs {
|
||||
return std::move(Download(newOptions, context).Value.BodyStream);
|
||||
};
|
||||
|
||||
ReliableStreamOptions reliableStreamOptions;
|
||||
_internal::ReliableStreamOptions reliableStreamOptions;
|
||||
reliableStreamOptions.MaxRetryRequests = _internal::ReliableStreamRetryCount;
|
||||
downloadResponse.Value.BodyStream = std::make_unique<ReliableStream>(
|
||||
downloadResponse.Value.BodyStream = std::make_unique<_internal::ReliableStream>(
|
||||
std::move(downloadResponse.Value.BodyStream), reliableStreamOptions, retryFunction);
|
||||
}
|
||||
if (downloadResponse.Value.BlobType == Models::BlobType::AppendBlob
|
||||
|
||||
@ -9,6 +9,7 @@
|
||||
### Breaking Changes
|
||||
|
||||
- Removed `Azure::Storage::Common::PackageVersion`.
|
||||
- Moved `ReliableStream` to internal namespace.
|
||||
|
||||
## 12.0.0-beta.9 (2021-03-23)
|
||||
|
||||
|
||||
@ -8,19 +8,7 @@
|
||||
#include <azure/core/context.hpp>
|
||||
#include <azure/core/io/body_stream.hpp>
|
||||
|
||||
namespace Azure { namespace Storage {
|
||||
|
||||
// options used by the fm callback that will get a bodyStream starting from last offset
|
||||
struct HttpGetterInfo
|
||||
{
|
||||
int64_t Offset = 0;
|
||||
};
|
||||
|
||||
// Defines a fn signature to be use to get a bodyStream from a specific offset.
|
||||
typedef std::function<std::unique_ptr<Azure::Core::IO::BodyStream>(
|
||||
HttpGetterInfo const&,
|
||||
Azure::Core::Context const&)>
|
||||
HTTPGetter;
|
||||
namespace Azure { namespace Storage { namespace _internal {
|
||||
|
||||
// Options used by reliable stream
|
||||
struct ReliableStreamOptions
|
||||
@ -48,9 +36,11 @@ namespace Azure { namespace Storage {
|
||||
// Configuration for the re-triable stream
|
||||
ReliableStreamOptions const m_options;
|
||||
// callback to get a bodyStream in case Read operation fails
|
||||
HTTPGetter m_httpGetter;
|
||||
std::function<
|
||||
std::unique_ptr<Azure::Core::IO::BodyStream>(int64_t, Azure::Core::Context const&)>
|
||||
m_streamReconnector;
|
||||
// Options to use when getting a new bodyStream like current offset
|
||||
HttpGetterInfo m_retryInfo;
|
||||
int64_t m_retryOffset;
|
||||
|
||||
int64_t OnRead(uint8_t* buffer, int64_t count, Azure::Core::Context const& context) override;
|
||||
|
||||
@ -58,8 +48,11 @@ namespace Azure { namespace Storage {
|
||||
explicit ReliableStream(
|
||||
std::unique_ptr<Azure::Core::IO::BodyStream> inner,
|
||||
ReliableStreamOptions const options,
|
||||
HTTPGetter httpGetter)
|
||||
: m_inner(std::move(inner)), m_options(options), m_httpGetter(std::move(httpGetter))
|
||||
std::function<
|
||||
std::unique_ptr<Azure::Core::IO::BodyStream>(int64_t, Azure::Core::Context const&)>
|
||||
streamReconnector)
|
||||
: m_inner(std::move(inner)), m_options(options),
|
||||
m_streamReconnector(std::move(streamReconnector)), m_retryOffset(0)
|
||||
{
|
||||
}
|
||||
|
||||
@ -68,8 +61,8 @@ namespace Azure { namespace Storage {
|
||||
{
|
||||
// Rewind directly from a transportAdapter body stream (like libcurl) would throw
|
||||
this->m_inner->Rewind();
|
||||
this->m_retryInfo.Offset = 0;
|
||||
this->m_retryOffset = 0;
|
||||
}
|
||||
};
|
||||
|
||||
}} // namespace Azure::Storage
|
||||
}}} // namespace Azure::Storage::_internal
|
||||
|
||||
@ -8,7 +8,7 @@
|
||||
using Azure::Core::Context;
|
||||
using Azure::Core::IO::BodyStream;
|
||||
|
||||
namespace Azure { namespace Storage {
|
||||
namespace Azure { namespace Storage { namespace _internal {
|
||||
|
||||
int64_t ReliableStream::OnRead(uint8_t* buffer, int64_t count, Context const& context)
|
||||
{
|
||||
@ -22,13 +22,13 @@ namespace Azure { namespace Storage {
|
||||
// if this fails, throw bubbles up
|
||||
// As m_inner is unique_pr, it will be destructed on reassignment, cleaning up network
|
||||
// session.
|
||||
this->m_inner = this->m_httpGetter(this->m_retryInfo, context);
|
||||
this->m_inner = this->m_streamReconnector(this->m_retryOffset, context);
|
||||
}
|
||||
try
|
||||
{
|
||||
auto const readBytes = this->m_inner->Read(buffer, count, context);
|
||||
// update offset
|
||||
this->m_retryInfo.Offset += readBytes;
|
||||
this->m_retryOffset += readBytes;
|
||||
return readBytes;
|
||||
}
|
||||
catch (std::runtime_error const& e)
|
||||
@ -46,4 +46,4 @@ namespace Azure { namespace Storage {
|
||||
}
|
||||
}
|
||||
}
|
||||
}} // namespace Azure::Storage
|
||||
}}} // namespace Azure::Storage::_internal
|
||||
|
||||
@ -258,17 +258,16 @@ namespace Azure { namespace Storage { namespace Files { namespace Shares {
|
||||
// In case network failure during reading the body
|
||||
auto eTag = downloadResponse.Value.ETag;
|
||||
|
||||
auto retryFunction =
|
||||
[this, options, eTag](
|
||||
const HttpGetterInfo& retryInfo,
|
||||
const Azure::Core::Context& context) -> std::unique_ptr<Azure::Core::IO::BodyStream> {
|
||||
auto retryFunction
|
||||
= [this, options, eTag](int64_t retryOffset, const Azure::Core::Context& context)
|
||||
-> std::unique_ptr<Azure::Core::IO::BodyStream> {
|
||||
DownloadFileOptions newOptions = options;
|
||||
newOptions.Range = Core::Http::HttpRange();
|
||||
newOptions.Range.Value().Offset
|
||||
= (options.Range.HasValue() ? options.Range.Value().Offset : 0) + retryInfo.Offset;
|
||||
= (options.Range.HasValue() ? options.Range.Value().Offset : 0) + retryOffset;
|
||||
if (options.Range.HasValue() && options.Range.Value().Length.HasValue())
|
||||
{
|
||||
newOptions.Range.Value().Length = options.Range.Value().Length.Value() - retryInfo.Offset;
|
||||
newOptions.Range.Value().Length = options.Range.Value().Length.Value() - retryOffset;
|
||||
}
|
||||
|
||||
auto newResponse = Download(newOptions, context);
|
||||
@ -280,9 +279,9 @@ namespace Azure { namespace Storage { namespace Files { namespace Shares {
|
||||
return std::move(newResponse.Value.BodyStream);
|
||||
};
|
||||
|
||||
ReliableStreamOptions reliableStreamOptions;
|
||||
_internal::ReliableStreamOptions reliableStreamOptions;
|
||||
reliableStreamOptions.MaxRetryRequests = _internal::ReliableStreamRetryCount;
|
||||
downloadResponse.Value.BodyStream = std::make_unique<ReliableStream>(
|
||||
downloadResponse.Value.BodyStream = std::make_unique<_internal::ReliableStream>(
|
||||
std::move(downloadResponse.Value.BodyStream), reliableStreamOptions, retryFunction);
|
||||
}
|
||||
Models::DownloadFileResult ret;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user