[Storage Blob Service] integrate reliable stream (#421)
* integrate reliable stream * define c_reliableStreamRetryCount
This commit is contained in:
parent
cc0ff27eec
commit
329a30f0e3
@ -18,4 +18,6 @@ namespace Azure { namespace Storage { namespace Details {
|
||||
constexpr static const char* c_HttpHeaderClientRequestId = "x-ms-client-request-id";
|
||||
constexpr static const char* c_HttpHeaderContentType = "content-type";
|
||||
constexpr static const char* c_defaultSasVersion = "2019-12-12";
|
||||
|
||||
constexpr int c_reliableStreamRetryCount = 3;
|
||||
}}} // namespace Azure::Storage::Details
|
||||
|
||||
@ -26,8 +26,6 @@ namespace Azure { namespace Storage {
|
||||
{
|
||||
// configures the maximun retries to be done.
|
||||
int64_t MaxRetryRequests;
|
||||
// Use for testing purposes only.
|
||||
bool DoInjectError;
|
||||
};
|
||||
|
||||
/**
|
||||
|
||||
@ -10,6 +10,7 @@
|
||||
#include "common/concurrent_transfer.hpp"
|
||||
#include "common/constants.hpp"
|
||||
#include "common/file_io.hpp"
|
||||
#include "common/reliable_stream.hpp"
|
||||
#include "common/shared_key_policy.hpp"
|
||||
#include "common/storage_common.hpp"
|
||||
#include "common/storage_version.hpp"
|
||||
@ -170,8 +171,40 @@ namespace Azure { namespace Storage { namespace Blobs {
|
||||
protocolLayerOptions.IfMatch = options.AccessConditions.IfMatch;
|
||||
protocolLayerOptions.IfNoneMatch = options.AccessConditions.IfNoneMatch;
|
||||
|
||||
return BlobRestClient::Blob::Download(
|
||||
auto downloadResponse = BlobRestClient::Blob::Download(
|
||||
options.Context, *m_pipeline, m_blobUrl.ToString(), protocolLayerOptions);
|
||||
|
||||
{
|
||||
// In case network failure during reading the body
|
||||
std::string eTag = downloadResponse->ETag;
|
||||
|
||||
auto retryFunction
|
||||
= [this, options, eTag](
|
||||
const Azure::Core::Context& context,
|
||||
const HTTPGetterInfo& retryInfo) -> std::unique_ptr<Azure::Core::Http::BodyStream> {
|
||||
unused(context);
|
||||
|
||||
DownloadBlobOptions newOptions = options;
|
||||
newOptions.Offset
|
||||
= (options.Offset.HasValue() ? options.Offset.GetValue() : 0) + retryInfo.Offset;
|
||||
newOptions.Length = options.Length;
|
||||
if (newOptions.Length.HasValue())
|
||||
{
|
||||
newOptions.Length = options.Length.GetValue() - retryInfo.Offset;
|
||||
}
|
||||
if (!newOptions.AccessConditions.IfMatch.HasValue())
|
||||
{
|
||||
newOptions.AccessConditions.IfMatch = eTag;
|
||||
}
|
||||
return std::move(Download(newOptions)->BodyStream);
|
||||
};
|
||||
|
||||
ReliableStreamOptions reliableStreamOptions;
|
||||
reliableStreamOptions.MaxRetryRequests = Details::c_reliableStreamRetryCount;
|
||||
downloadResponse->BodyStream = std::make_unique<ReliableStream>(
|
||||
std::move(downloadResponse->BodyStream), reliableStreamOptions, retryFunction);
|
||||
}
|
||||
return downloadResponse;
|
||||
}
|
||||
|
||||
Azure::Core::Response<BlobDownloadInfo> BlobClient::DownloadToBuffer(
|
||||
|
||||
@ -11,11 +11,6 @@ namespace Azure { namespace Storage {
|
||||
|
||||
int64_t ReliableStream::Read(Context const& context, uint8_t* buffer, int64_t count)
|
||||
{
|
||||
if (this->m_options.DoInjectError)
|
||||
{
|
||||
throw std::runtime_error("Injected error");
|
||||
}
|
||||
|
||||
for (int64_t intent = 1;; intent++)
|
||||
{
|
||||
// check if we need to get inner stream
|
||||
|
||||
Loading…
Reference in New Issue
Block a user