[Storage Common] Storage Retry Policy (#588)

* storage retry policy

* revert changes to azure core

* refactor storage retry policy

* fix typo

* change the logic a little bit

* adapt storage retry policy in blob batch client

* fix build error

* Add StorageRetryWithSecondaryOptions
This commit is contained in:
JinmingHu 2020-09-30 13:14:28 +08:00 committed by GitHub
parent 8c4c4e75a9
commit e238e28b0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 732 additions and 34 deletions

View File

@ -60,6 +60,7 @@ target_sources(
test/page_blob_client_test.cpp
test/page_blob_client_test.hpp
test/performance_benchmark.cpp
test/storage_retry_policy_test.cpp
)
target_link_libraries(azure-storage-test PUBLIC azure::storage::blobs)

View File

@ -5,6 +5,7 @@
#include "azure/storage/blobs/protocol/blob_rest_client.hpp"
#include "azure/storage/common/access_conditions.hpp"
#include "azure/storage/common/storage_retry_policy.hpp"
#include <limits>
#include <string>
@ -101,6 +102,11 @@ namespace Azure { namespace Storage { namespace Blobs {
* are applied to every retrial.
*/
std::vector<std::unique_ptr<Azure::Core::Http::HttpPolicy>> PerRetryPolicies;
/**
* @brief Specify the number of retries and other retry-related options.
*/
StorageRetryWithSecondaryOptions RetryOptions;
};
/**
@ -267,6 +273,11 @@ namespace Azure { namespace Storage { namespace Blobs {
* @brief Holds the encryption scope used when making requests.
*/
Azure::Core::Nullable<std::string> EncryptionScope;
/**
* @brief Specify the number of retries and other retry-related options.
*/
StorageRetryWithSecondaryOptions RetryOptions;
};
/**
@ -532,6 +543,11 @@ namespace Azure { namespace Storage { namespace Blobs {
* @brief Holds the encryption scope used when making requests.
*/
Azure::Core::Nullable<std::string> EncryptionScope;
/**
* @brief Specify the number of retries and other retry-related options.
*/
StorageRetryWithSecondaryOptions RetryOptions;
};
/**
@ -1436,6 +1452,11 @@ namespace Azure { namespace Storage { namespace Blobs {
* are applied to every retrial.
*/
std::vector<std::unique_ptr<Azure::Core::Http::HttpPolicy>> PerRetryPolicies;
/**
* @brief Specify the number of retries and other retry-related options.
*/
StorageRetryWithSecondaryOptions RetryOptions;
};
/**

View File

@ -98,7 +98,7 @@ namespace Azure { namespace Storage { namespace Blobs {
policies.emplace_back(p->Clone());
}
policies.emplace_back(
std::make_unique<Azure::Core::Http::RetryPolicy>(Azure::Core::Http::RetryOptions()));
std::make_unique<Azure::Storage::StorageRetryPolicy>(options.RetryOptions));
for (const auto& p : options.PerRetryPolicies)
{
policies.emplace_back(p->Clone());
@ -139,7 +139,7 @@ namespace Azure { namespace Storage { namespace Blobs {
policies.emplace_back(p->Clone());
}
policies.emplace_back(
std::make_unique<Azure::Core::Http::RetryPolicy>(Azure::Core::Http::RetryOptions()));
std::make_unique<Azure::Storage::StorageRetryPolicy>(options.RetryOptions));
for (const auto& p : options.PerRetryPolicies)
{
policies.emplace_back(p->Clone());
@ -183,7 +183,7 @@ namespace Azure { namespace Storage { namespace Blobs {
policies.emplace_back(p->Clone());
}
policies.emplace_back(
std::make_unique<Azure::Core::Http::RetryPolicy>(Azure::Core::Http::RetryOptions()));
std::make_unique<Azure::Storage::StorageRetryPolicy>(options.RetryOptions));
for (const auto& p : options.PerRetryPolicies)
{
policies.emplace_back(p->Clone());

View File

@ -54,8 +54,7 @@ namespace Azure { namespace Storage { namespace Blobs {
{
policies.emplace_back(p->Clone());
}
policies.emplace_back(
std::make_unique<Azure::Core::Http::RetryPolicy>(Azure::Core::Http::RetryOptions()));
policies.emplace_back(std::make_unique<StorageRetryPolicy>(options.RetryOptions));
for (const auto& p : options.PerRetryPolicies)
{
policies.emplace_back(p->Clone());
@ -81,8 +80,7 @@ namespace Azure { namespace Storage { namespace Blobs {
{
policies.emplace_back(p->Clone());
}
policies.emplace_back(
std::make_unique<Azure::Core::Http::RetryPolicy>(Azure::Core::Http::RetryOptions()));
policies.emplace_back(std::make_unique<StorageRetryPolicy>(options.RetryOptions));
for (const auto& p : options.PerRetryPolicies)
{
policies.emplace_back(p->Clone());
@ -108,8 +106,7 @@ namespace Azure { namespace Storage { namespace Blobs {
{
policies.emplace_back(p->Clone());
}
policies.emplace_back(
std::make_unique<Azure::Core::Http::RetryPolicy>(Azure::Core::Http::RetryOptions()));
policies.emplace_back(std::make_unique<StorageRetryPolicy>(options.RetryOptions));
for (const auto& p : options.PerRetryPolicies)
{
policies.emplace_back(p->Clone());
@ -284,7 +281,7 @@ namespace Azure { namespace Storage { namespace Blobs {
auto returnTypeConverter = [](Azure::Core::Response<DownloadBlobResult>& response) {
DownloadBlobToResult ret;
ret.ETag = std::move(response->ETag);
ret.ETag = response->ETag;
ret.LastModified = std::move(response->LastModified);
ret.HttpHeaders = std::move(response->HttpHeaders);
ret.Metadata = std::move(response->Metadata);
@ -304,6 +301,10 @@ namespace Azure { namespace Storage { namespace Blobs {
chunkOptions.Context = options.Context;
chunkOptions.Offset = offset;
chunkOptions.Length = length;
if (!chunkOptions.AccessConditions.IfMatch.HasValue())
{
chunkOptions.AccessConditions.IfMatch = firstChunk->ETag;
}
auto chunk = Download(chunkOptions);
int64_t bytesRead = Azure::Core::Http::BodyStream::ReadToCount(
chunkOptions.Context,
@ -421,7 +422,7 @@ namespace Azure { namespace Storage { namespace Blobs {
auto returnTypeConverter = [](Azure::Core::Response<DownloadBlobResult>& response) {
DownloadBlobToResult ret;
ret.ETag = std::move(response->ETag);
ret.ETag = response->ETag;
ret.LastModified = std::move(response->LastModified);
ret.HttpHeaders = std::move(response->HttpHeaders);
ret.Metadata = std::move(response->Metadata);
@ -441,6 +442,10 @@ namespace Azure { namespace Storage { namespace Blobs {
chunkOptions.Context = options.Context;
chunkOptions.Offset = offset;
chunkOptions.Length = length;
if (!chunkOptions.AccessConditions.IfMatch.HasValue())
{
chunkOptions.AccessConditions.IfMatch = firstChunk->ETag;
}
auto chunk = Download(chunkOptions);
bodyStreamToFile(
*(chunk->BodyStream),

View File

@ -50,8 +50,7 @@ namespace Azure { namespace Storage { namespace Blobs {
{
policies.emplace_back(p->Clone());
}
policies.emplace_back(
std::make_unique<Azure::Core::Http::RetryPolicy>(Azure::Core::Http::RetryOptions()));
policies.emplace_back(std::make_unique<StorageRetryPolicy>(options.RetryOptions));
for (const auto& p : options.PerRetryPolicies)
{
policies.emplace_back(p->Clone());
@ -77,8 +76,7 @@ namespace Azure { namespace Storage { namespace Blobs {
{
policies.emplace_back(p->Clone());
}
policies.emplace_back(
std::make_unique<Azure::Core::Http::RetryPolicy>(Azure::Core::Http::RetryOptions()));
policies.emplace_back(std::make_unique<StorageRetryPolicy>(options.RetryOptions));
for (const auto& p : options.PerRetryPolicies)
{
policies.emplace_back(p->Clone());
@ -106,8 +104,7 @@ namespace Azure { namespace Storage { namespace Blobs {
{
policies.emplace_back(p->Clone());
}
policies.emplace_back(
std::make_unique<Azure::Core::Http::RetryPolicy>(Azure::Core::Http::RetryOptions()));
policies.emplace_back(std::make_unique<StorageRetryPolicy>(options.RetryOptions));
for (const auto& p : options.PerRetryPolicies)
{
policies.emplace_back(p->Clone());

View File

@ -45,8 +45,7 @@ namespace Azure { namespace Storage { namespace Blobs {
{
policies.emplace_back(p->Clone());
}
policies.emplace_back(
std::make_unique<Azure::Core::Http::RetryPolicy>(Azure::Core::Http::RetryOptions()));
policies.emplace_back(std::make_unique<StorageRetryPolicy>(options.RetryOptions));
for (const auto& p : options.PerRetryPolicies)
{
policies.emplace_back(p->Clone());
@ -72,8 +71,7 @@ namespace Azure { namespace Storage { namespace Blobs {
{
policies.emplace_back(p->Clone());
}
policies.emplace_back(
std::make_unique<Azure::Core::Http::RetryPolicy>(Azure::Core::Http::RetryOptions()));
policies.emplace_back(std::make_unique<StorageRetryPolicy>(options.RetryOptions));
for (const auto& p : options.PerRetryPolicies)
{
policies.emplace_back(p->Clone());
@ -100,8 +98,7 @@ namespace Azure { namespace Storage { namespace Blobs {
{
policies.emplace_back(p->Clone());
}
policies.emplace_back(
std::make_unique<Azure::Core::Http::RetryPolicy>(Azure::Core::Http::RetryOptions()));
policies.emplace_back(std::make_unique<StorageRetryPolicy>(options.RetryOptions));
for (const auto& p : options.PerRetryPolicies)
{
policies.emplace_back(p->Clone());

View File

@ -322,20 +322,10 @@ namespace Azure { namespace Storage { namespace Test {
{
EXPECT_THROW(m_blobServiceClient.GetStatistics(), StorageError);
auto GetSecondaryUri = [](const std::string& uri) {
Azure::Core::Http::Url secondaryUri(uri);
std::string primaryHost = secondaryUri.GetHost();
auto dotPos = primaryHost.find(".");
std::string accountName = primaryHost.substr(0, dotPos);
std::string secondaryHost = accountName + "-secondary" + primaryHost.substr(dotPos);
secondaryUri.SetHost(secondaryHost);
return secondaryUri.GetAbsoluteUrl();
};
auto keyCredential
= Details::ParseConnectionString(StandardStorageConnectionString()).KeyCredential;
auto secondaryServiceClient
= Blobs::BlobServiceClient(GetSecondaryUri(m_blobServiceClient.GetUri()), keyCredential);
= Blobs::BlobServiceClient(InferSecondaryUri(m_blobServiceClient.GetUri()), keyCredential);
auto serviceStatistics = *secondaryServiceClient.GetStatistics();
EXPECT_NE(serviceStatistics.GeoReplication.Status, Blobs::BlobGeoReplicationStatus::Unknown);
EXPECT_TRUE(serviceStatistics.GeoReplication.LastSyncTime.HasValue());

View File

@ -0,0 +1,437 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
#include "azure/storage/blobs/blob.hpp"
#include "test_base.hpp"
#include <chrono>
#include <memory>
#include <vector>
namespace Azure { namespace Storage { namespace Test {
class MockTransportPolicy : public Core::Http::HttpPolicy {
public:
MockTransportPolicy() {}
explicit MockTransportPolicy(std::string primaryContent)
: m_primaryContent(std::make_shared<std::string>(std::move(primaryContent))),
m_primaryETag(c_dummyETag)
{
}
explicit MockTransportPolicy(std::string primaryContent, std::string secondaryContent)
: m_primaryContent(std::make_shared<std::string>(std::move(primaryContent))),
m_secondaryContent(std::make_shared<std::string>(std::move(secondaryContent))),
m_primaryETag(c_dummyETag),
m_secondaryETag(*m_secondaryContent == *m_primaryContent ? c_dummyETag : c_dummyETag2)
{
}
~MockTransportPolicy() override {}
std::unique_ptr<HttpPolicy> Clone() const override
{
return std::make_unique<MockTransportPolicy>(*this);
}
std::unique_ptr<Core::Http::RawResponse> Send(
Core::Context const& context,
Core::Http::Request& request,
Core::Http::NextHttpPolicy nextHttpPolicy) const override
{
unused(context, nextHttpPolicy);
const auto requestHeaders = request.GetHeaders();
int64_t requestOffset = 0;
int64_t requestLength = std::numeric_limits<int64_t>::max();
{
std::string rangeStr;
if (requestHeaders.find("Range") != requestHeaders.end())
{
rangeStr = requestHeaders.at("Range");
}
else if (requestHeaders.find("x-ms-range") != requestHeaders.end())
{
rangeStr = requestHeaders.at("x-ms-range");
}
if (!rangeStr.empty())
{
auto equalPos = rangeStr.find('=');
auto dashPos = rangeStr.find('-');
std::string startByte = rangeStr.substr(equalPos + 1, dashPos - equalPos - 1);
requestOffset = std::stoll(startByte);
std::string endByte = rangeStr.substr(dashPos + 1);
if (!endByte.empty())
{
requestLength = std::stoll(endByte) - requestOffset + 1;
}
}
}
auto ConstructTransportException
= []() { return Azure::Core::Http::TransportException("Error while sending request. "); };
auto ConstructNotFoundResponse = []() {
auto requestId = Core::Uuid::CreateUuid().GetUuidString();
std::string errorResponseBody
= "<?xml version=\"1.0\" encoding=\"utf-8\"?>"
"<Error><Code>BlobNotFound</Code><Message>The specified blob does not exist.\n"
"RequestId:"
+ requestId + "\nTime:2020-09-11T02:09:31.8962056Z</Message></Error>";
auto response = std::make_unique<Core::Http::RawResponse>(Core::Http::RawResponse(
1, 1, Core::Http::HttpStatusCode::NotFound, "The specified blob does not exist."));
response->SetBody(std::vector<uint8_t>(errorResponseBody.begin(), errorResponseBody.end()));
response->AddHeader("content-length", std::to_string(errorResponseBody.length()));
response->AddHeader("content-type", "application/xml");
response->AddHeader("x-ms-request-id", Core::Uuid::CreateUuid().GetUuidString());
response->AddHeader("x-ms-version", Blobs::c_ApiVersion);
response->AddHeader("x-ms-error-code", "BlobNotFound");
response->AddHeader("date", ToRfc1123(std::chrono::system_clock::now()));
return response;
};
auto ConstructPreconditionFailedResponse = []() {
auto requestId = Core::Uuid::CreateUuid().GetUuidString();
std::string errorResponseBody
= "<?xml version=\"1.0\" encoding=\"utf-8\"?>"
"<Error><Code>ConditionNotMet</Code>"
"<Message>The condition specified using HTTP conditional header(s) is not met.\n"
"RequestId:"
+ requestId + "\nTime:2020-09-11T02:01:26.0151739Z</Message></Error>";
auto response = std::make_unique<Core::Http::RawResponse>(Core::Http::RawResponse(
1,
1,
Core::Http::HttpStatusCode::PreconditionFailed,
"The condition specified using HTTP conditional header(s) is not met."));
response->SetBody(std::vector<uint8_t>(errorResponseBody.begin(), errorResponseBody.end()));
response->AddHeader("content-length", std::to_string(errorResponseBody.length()));
response->AddHeader("content-type", "application/xml");
response->AddHeader("x-ms-request-id", Core::Uuid::CreateUuid().GetUuidString());
response->AddHeader("x-ms-version", Blobs::c_ApiVersion);
response->AddHeader("x-ms-error-code", "ConditionNotMet");
response->AddHeader("date", ToRfc1123(std::chrono::system_clock::now()));
return response;
};
auto ConstructPrimaryResponse
= [requestOffset, requestLength, this, ConstructNotFoundResponse]() {
if (!m_primaryContent)
{
return ConstructNotFoundResponse();
}
auto response = std::make_unique<Core::Http::RawResponse>(
Core::Http::RawResponse(1, 1, Core::Http::HttpStatusCode::Ok, "OK"));
int64_t bodyLength = std::min(
static_cast<int64_t>(m_primaryContent->length()) - requestOffset, requestLength);
auto bodyStream = std::make_unique<Core::Http::MemoryBodyStream>(
reinterpret_cast<const uint8_t*>(m_primaryContent->data() + requestOffset),
bodyLength);
response->SetBodyStream(std::move(bodyStream));
response->AddHeader("content-length", std::to_string(bodyLength));
response->AddHeader("etag", m_primaryETag);
response->AddHeader("last-modified", "Thu 27 Aug 2001 07:00:00 GMT");
response->AddHeader("x-ms-request-id", Core::Uuid::CreateUuid().GetUuidString());
response->AddHeader("x-ms-version", Blobs::c_ApiVersion);
response->AddHeader("x-ms-creation-time", "Thu 27 Aug 2002 07:00:00 GMT");
response->AddHeader("x-ms-lease-status", "unlocked");
response->AddHeader("x-ms-lease-state", "available");
response->AddHeader("x-ms-blob-type", "BlockBlob");
response->AddHeader("x-ms-server-encrypted", "true");
response->AddHeader("date", ToRfc1123(std::chrono::system_clock::now()));
return response;
};
auto ConstructSecondaryResponse =
[requestOffset, requestLength, this, ConstructNotFoundResponse]() {
if (!m_secondaryContent)
{
return ConstructNotFoundResponse();
}
auto response = std::make_unique<Core::Http::RawResponse>(
Core::Http::RawResponse(1, 1, Core::Http::HttpStatusCode::Ok, "OK"));
int64_t bodyLength = std::min(
static_cast<int64_t>(m_secondaryContent->length()) - requestOffset, requestLength);
auto bodyStream = std::make_unique<Core::Http::MemoryBodyStream>(
reinterpret_cast<const uint8_t*>(m_secondaryContent->data() + requestOffset),
bodyLength);
response->SetBodyStream(std::move(bodyStream));
response->AddHeader("content-length", std::to_string(bodyLength));
response->AddHeader("etag", m_secondaryETag);
response->AddHeader("last-modified", "Thu 27 Aug 2001 07:00:00 GMT");
response->AddHeader("x-ms-request-id", Core::Uuid::CreateUuid().GetUuidString());
response->AddHeader("x-ms-version", Blobs::c_ApiVersion);
response->AddHeader("x-ms-creation-time", "Thu 27 Aug 2002 07:00:00 GMT");
response->AddHeader("x-ms-lease-status", "unlocked");
response->AddHeader("x-ms-lease-state", "available");
response->AddHeader("x-ms-blob-type", "BlockBlob");
response->AddHeader("x-ms-server-encrypted", "true");
response->AddHeader("date", ToRfc1123(std::chrono::system_clock::now()));
return response;
};
Region region = Region::Primary;
if (request.GetUrl().GetHost().find("-secondary") != std::string::npos)
{
region = Region::Secondary;
}
if (m_failPolicy)
{
auto responseType = m_failPolicy(region);
switch (responseType)
{
case ResponseType::NotFound:
return ConstructNotFoundResponse();
case ResponseType::PreconditionFailed:
return ConstructPreconditionFailedResponse();
case ResponseType::TransportException:
throw ConstructTransportException();
default:;
}
}
if (region == Region::Primary)
{
if (requestHeaders.find("if-match") == requestHeaders.end()
|| requestHeaders.at("if-match") == m_primaryETag)
{
return ConstructPrimaryResponse();
}
return ConstructPreconditionFailedResponse();
}
else
{
if (requestHeaders.find("if-match") == requestHeaders.end()
|| requestHeaders.at("if-match") == m_secondaryETag)
{
return ConstructSecondaryResponse();
}
return ConstructPreconditionFailedResponse();
}
}
enum Region
{
Primary,
Secondary,
};
enum ResponseType
{
Success,
NotFound,
PreconditionFailed,
TransportException,
};
void SetFailPolicy(std::function<ResponseType(Region)> func) { m_failPolicy = std::move(func); }
private:
std::shared_ptr<std::string> m_primaryContent;
std::shared_ptr<std::string> m_secondaryContent;
std::string m_primaryETag;
std::string m_secondaryETag;
std::function<ResponseType(Region)> m_failPolicy;
};
TEST(StorageRetryPolicyTest, Basic)
{
std::string primaryContent = "primary content";
auto transportPolicyPtr = std::make_unique<MockTransportPolicy>(primaryContent);
Blobs::BlobClientOptions clientOptions;
clientOptions.PerRetryPolicies.emplace_back(std::move(transportPolicyPtr));
auto blobClient = Azure::Storage::Blobs::BlobClient::CreateFromConnectionString(
StandardStorageConnectionString(), RandomString(), RandomString(), clientOptions);
auto ret = blobClient.Download();
auto responseBody
= Azure::Core::Http::BodyStream::ReadToEnd(Azure::Core::Context(), *(ret->BodyStream));
EXPECT_EQ(std::string(responseBody.begin(), responseBody.end()), primaryContent);
}
TEST(StorageRetryPolicyTest, Retry)
{
std::string primaryContent = "primary content";
auto transportPolicyPtr = std::make_unique<MockTransportPolicy>(primaryContent);
int numTrial = 0;
auto failPolicy
= [&numTrial](MockTransportPolicy::Region region) -> MockTransportPolicy::ResponseType {
unused(region);
if (numTrial++ == 0)
return MockTransportPolicy::ResponseType::TransportException;
return MockTransportPolicy::ResponseType::Success;
};
transportPolicyPtr->SetFailPolicy(failPolicy);
Blobs::BlobClientOptions clientOptions;
clientOptions.PerRetryPolicies.emplace_back(std::move(transportPolicyPtr));
int64_t delayMs = 1000;
clientOptions.RetryOptions.RetryDelay = std::chrono::milliseconds(delayMs);
auto blobClient = Azure::Storage::Blobs::BlobClient::CreateFromConnectionString(
StandardStorageConnectionString(), RandomString(), RandomString(), clientOptions);
auto timeBegin = std::chrono::steady_clock::now();
auto ret = blobClient.Download();
auto timeEnd = std::chrono::steady_clock::now();
auto responseBody
= Azure::Core::Http::BodyStream::ReadToEnd(Azure::Core::Context(), *(ret->BodyStream));
EXPECT_EQ(std::string(responseBody.begin(), responseBody.end()), primaryContent);
EXPECT_EQ(numTrial, 2);
int64_t elapsedTime
= std::chrono::duration_cast<std::chrono::milliseconds>(timeEnd - timeBegin).count();
EXPECT_GE(elapsedTime, delayMs * 0.5);
EXPECT_LE(elapsedTime, delayMs * 2);
}
TEST(StorageRetryPolicyTest, Failover)
{
std::string primaryContent = "primary content";
std::string secondaryContent = "secondary content";
auto transportPolicyPtr
= std::make_unique<MockTransportPolicy>(primaryContent, secondaryContent);
auto failPolicy = [](MockTransportPolicy::Region region) -> MockTransportPolicy::ResponseType {
if (region == MockTransportPolicy::Region::Primary)
{
return MockTransportPolicy::ResponseType::TransportException;
}
return MockTransportPolicy::ResponseType::Success;
};
transportPolicyPtr->SetFailPolicy(failPolicy);
Blobs::BlobClientOptions clientOptions;
clientOptions.PerRetryPolicies.emplace_back(std::move(transportPolicyPtr));
clientOptions.RetryOptions.RetryDelay = std::chrono::milliseconds(0);
{
std::string primaryUrl
= Azure::Storage::Blobs::BlobClient::CreateFromConnectionString(
StandardStorageConnectionString(), RandomString(), RandomString())
.GetUri();
std::string secondaryUrl = InferSecondaryUri(primaryUrl);
std::string secondaryHost = Core::Http::Url(secondaryUrl).GetHost();
clientOptions.RetryOptions.SecondaryHostForRetryReads = secondaryHost;
}
auto blobClient = Azure::Storage::Blobs::BlobClient::CreateFromConnectionString(
StandardStorageConnectionString(), RandomString(), RandomString(), clientOptions);
auto ret = blobClient.Download();
auto responseBody
= Azure::Core::Http::BodyStream::ReadToEnd(Azure::Core::Context(), *(ret->BodyStream));
EXPECT_EQ(std::string(responseBody.begin(), responseBody.end()), secondaryContent);
}
TEST(StorageRetryPolicyTest, Secondary404)
{
std::string primaryContent = "primary content";
std::string secondaryContent = "secondary content";
auto transportPolicyPtr
= std::make_unique<MockTransportPolicy>(primaryContent, secondaryContent);
int numPrimaryTrial = 0;
int numSecondaryTrial = 0;
auto failPolicy = [&numPrimaryTrial, &numSecondaryTrial](
MockTransportPolicy::Region region) -> MockTransportPolicy::ResponseType {
if (region == MockTransportPolicy::Region::Primary)
{
if (numPrimaryTrial++ < 2)
{
return MockTransportPolicy::ResponseType::TransportException;
}
return MockTransportPolicy::ResponseType::Success;
}
else
{
numSecondaryTrial++;
return MockTransportPolicy::ResponseType::NotFound;
}
};
transportPolicyPtr->SetFailPolicy(failPolicy);
Blobs::BlobClientOptions clientOptions;
clientOptions.PerRetryPolicies.emplace_back(std::move(transportPolicyPtr));
clientOptions.RetryOptions.MaxRetries = 3;
clientOptions.RetryOptions.RetryDelay = std::chrono::milliseconds(0);
{
std::string primaryUrl
= Azure::Storage::Blobs::BlobClient::CreateFromConnectionString(
StandardStorageConnectionString(), RandomString(), RandomString())
.GetUri();
std::string secondaryUrl = InferSecondaryUri(primaryUrl);
std::string secondaryHost = Core::Http::Url(secondaryUrl).GetHost();
clientOptions.RetryOptions.SecondaryHostForRetryReads = secondaryHost;
}
auto blobClient = Azure::Storage::Blobs::BlobClient::CreateFromConnectionString(
StandardStorageConnectionString(), RandomString(), RandomString(), clientOptions);
auto ret = blobClient.Download();
auto responseBody
= Azure::Core::Http::BodyStream::ReadToEnd(Azure::Core::Context(), *(ret->BodyStream));
EXPECT_EQ(std::string(responseBody.begin(), responseBody.end()), primaryContent);
EXPECT_EQ(numPrimaryTrial, 3);
EXPECT_EQ(numSecondaryTrial, 1);
}
TEST(StorageRetryPolicyTest, Secondary412)
{
std::string primaryContent = "primary content";
std::string secondaryContent = "secondary content";
auto transportPolicyPtr
= std::make_unique<MockTransportPolicy>(primaryContent, secondaryContent);
int numPrimaryTrial = 0;
int numSecondaryTrial = 0;
auto failPolicy = [&numPrimaryTrial, &numSecondaryTrial](
MockTransportPolicy::Region region) -> MockTransportPolicy::ResponseType {
if (region == MockTransportPolicy::Region::Primary)
{
numPrimaryTrial++;
if (numPrimaryTrial % 2 == 1)
{
return MockTransportPolicy::ResponseType::Success;
}
else
{
return MockTransportPolicy::ResponseType::TransportException;
}
}
else
{
numSecondaryTrial++;
return MockTransportPolicy::ResponseType::Success;
}
};
transportPolicyPtr->SetFailPolicy(failPolicy);
Blobs::BlobClientOptions clientOptions;
clientOptions.PerRetryPolicies.emplace_back(std::move(transportPolicyPtr));
clientOptions.RetryOptions.MaxRetries = 3;
clientOptions.RetryOptions.RetryDelay = std::chrono::milliseconds(0);
{
std::string primaryUrl
= Azure::Storage::Blobs::BlobClient::CreateFromConnectionString(
StandardStorageConnectionString(), RandomString(), RandomString())
.GetUri();
std::string secondaryUrl = InferSecondaryUri(primaryUrl);
std::string secondaryHost = Core::Http::Url(secondaryUrl).GetHost();
clientOptions.RetryOptions.SecondaryHostForRetryReads = secondaryHost;
}
auto blobClient = Azure::Storage::Blobs::BlobClient::CreateFromConnectionString(
StandardStorageConnectionString(), RandomString(), RandomString(), clientOptions);
std::string downloadBuffer;
downloadBuffer.resize(std::max(primaryContent.size(), secondaryContent.size()));
Blobs::DownloadBlobToOptions options;
options.InitialChunkSize = 2;
options.ChunkSize = 2;
options.Concurrency = 1;
blobClient.DownloadTo(
reinterpret_cast<uint8_t*>(&downloadBuffer[0]),
static_cast<int64_t>(downloadBuffer.size()),
options);
downloadBuffer.resize(primaryContent.size());
EXPECT_EQ(downloadBuffer, primaryContent);
EXPECT_NE(numPrimaryTrial, 0);
EXPECT_NE(numSecondaryTrial, 0);
}
}}} // namespace Azure::Storage::Test

View File

@ -17,6 +17,7 @@ set(AZURE_STORAGE_COMMON_HEADER
inc/azure/storage/common/storage_credential.hpp
inc/azure/storage/common/storage_error.hpp
inc/azure/storage/common/storage_per_retry_policy.hpp
inc/azure/storage/common/storage_retry_policy.hpp
inc/azure/storage/common/storage_version.hpp
inc/azure/storage/common/xml_wrapper.hpp
)
@ -31,6 +32,7 @@ set(AZURE_STORAGE_COMMON_SOURCE
src/storage_credential.cpp
src/storage_error.cpp
src/storage_per_retry_policy.cpp
src/storage_retry_policy.cpp
src/storage_version.cpp
src/xml_wrapper.cpp
)

View File

@ -0,0 +1,90 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
#pragma once
#include "azure/core/http/policy.hpp"
#include <memory>
#include <string>
namespace Azure { namespace Storage {
/**
* StorageRetryOptions configures the retry policy's behavior.
*/
struct StorageRetryOptions
{
/**
* @brief Maximum number of attempts to retry.
*/
int MaxRetries = 3;
/**
* @brief Mimimum amount of time between retry attempts.
*/
std::chrono::milliseconds RetryDelay = std::chrono::seconds(4);
/**
* @brief Mimimum amount of time between retry attempts.
*/
std::chrono::milliseconds MaxRetryDelay = std::chrono::minutes(2);
/**
* @brief HTTP status codes to retry on.
*/
std::vector<Azure::Core::Http::HttpStatusCode> StatusCodes{
Azure::Core::Http::HttpStatusCode::RequestTimeout,
Azure::Core::Http::HttpStatusCode::InternalServerError,
Azure::Core::Http::HttpStatusCode::BadGateway,
Azure::Core::Http::HttpStatusCode::ServiceUnavailable,
Azure::Core::Http::HttpStatusCode::GatewayTimeout,
};
};
/**
* StorageRetryWithSecondaryOptions configures whether the retry policy should retry a read
* operation against another host.
*/
struct StorageRetryWithSecondaryOptions : public StorageRetryOptions
{
/**
* SecondaryHostForRetryReads specifies whether the retry policy should retry a read
* operation against another host. If SecondaryHostForRetryReads is "" (the default) then
* operations are not retried against another host. NOTE: Before setting this field, make sure
* you understand the issues around reading stale & potentially-inconsistent data at this
* webpage: https://docs.microsoft.com/en-us/azure/storage/common/geo-redundant-design.
*/
std::string SecondaryHostForRetryReads;
};
class StorageRetryPolicy : public Azure::Core::Http::HttpPolicy {
public:
explicit StorageRetryPolicy(const StorageRetryOptions& options)
{
m_options.MaxRetries = options.MaxRetries;
m_options.RetryDelay = options.RetryDelay;
m_options.MaxRetryDelay = options.MaxRetryDelay;
m_options.StatusCodes = options.StatusCodes;
}
explicit StorageRetryPolicy(const StorageRetryWithSecondaryOptions& options)
: m_options(options)
{
}
std::unique_ptr<Azure::Core::Http::HttpPolicy> Clone() const override
{
return std::make_unique<StorageRetryPolicy>(*this);
}
std::unique_ptr<Azure::Core::Http::RawResponse> Send(
const Azure::Core::Context& ctx,
Azure::Core::Http::Request& request,
Azure::Core::Http::NextHttpPolicy nextHttpPolicy) const override;
private:
StorageRetryWithSecondaryOptions m_options;
};
}} // namespace Azure::Storage

View File

@ -0,0 +1,141 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
#include "azure/storage/common/storage_retry_policy.hpp"
#include "azure/storage/common/storage_common.hpp"
#include <thread>
namespace Azure { namespace Storage {
std::unique_ptr<Azure::Core::Http::RawResponse> StorageRetryPolicy::Send(
const Azure::Core::Context& ctx,
Azure::Core::Http::Request& request,
Azure::Core::Http::NextHttpPolicy nextHttpPolicy) const
{
bool considerSecondary = (request.GetMethod() == Azure::Core::Http::HttpMethod::Get
|| request.GetMethod() == Azure::Core::Http::HttpMethod::Head)
&& !m_options.SecondaryHostForRetryReads.empty();
std::string primaryHost = request.GetUrl().GetHost();
const std::string& secondaryHost = m_options.SecondaryHostForRetryReads;
bool isUsingSecondary = false;
auto switchHost
= [&considerSecondary, &isUsingSecondary, &request, &primaryHost, &secondaryHost]() {
if (considerSecondary)
{
isUsingSecondary = !isUsingSecondary;
}
else
{
isUsingSecondary = false;
}
if (isUsingSecondary)
{
request.GetUrl().SetHost(secondaryHost);
}
else
{
request.GetUrl().SetHost(primaryHost);
}
};
std::unique_ptr<Azure::Core::Http::RawResponse> pResponse;
for (int i = 0; i <= m_options.MaxRetries; ++i)
{
bool lastAttempt = i == m_options.MaxRetries;
try
{
auto response = nextHttpPolicy.Send(ctx, request);
bool shouldRetry = false;
if (isUsingSecondary)
{
if (response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::NotFound
|| response->GetStatusCode() == Core::Http::HttpStatusCode::PreconditionFailed)
{
considerSecondary = false;
// disgard this response
shouldRetry = true;
}
}
shouldRetry |= response
&& std::find(
m_options.StatusCodes.begin(),
m_options.StatusCodes.end(),
response->GetStatusCode())
!= m_options.StatusCodes.end();
pResponse = std::move(response);
if (!shouldRetry)
{
break;
}
}
catch (Azure::Core::Http::CouldNotResolveHostException const&)
{
if (lastAttempt)
{
throw;
}
}
catch (Azure::Core::Http::TransportException const&)
{
if (lastAttempt)
{
throw;
}
}
if (!lastAttempt)
{
if (auto bodyStream = request.GetBodyStream())
{
bodyStream->Rewind();
}
switchHost();
ctx.ThrowIfCanceled();
const int64_t baseRetryDelayMs = m_options.RetryDelay.count();
const int64_t maxRetryDelayMs = m_options.MaxRetryDelay.count();
int64_t retryDelayMs = maxRetryDelayMs;
if (static_cast<std::size_t>(i) < sizeof(int64_t) * 8)
{
const int64_t factor = 1LL << i;
retryDelayMs = baseRetryDelayMs * factor;
if (baseRetryDelayMs != 0 && retryDelayMs / baseRetryDelayMs != factor)
{
retryDelayMs = maxRetryDelayMs;
}
else
{
static thread_local std::random_device rd;
std::mt19937_64 gen(rd());
std::uniform_real_distribution<> dist(0.8, 1.3);
retryDelayMs = static_cast<decltype(retryDelayMs)>(retryDelayMs * dist(gen));
if (retryDelayMs < 0 || retryDelayMs > maxRetryDelayMs)
{
retryDelayMs = maxRetryDelayMs;
}
}
}
if (retryDelayMs != 0)
{
std::this_thread::sleep_for(std::chrono::milliseconds(retryDelayMs));
}
}
}
return pResponse;
}
}} // namespace Azure::Storage

View File

@ -18,6 +18,8 @@
#include <sstream>
#include <string>
#include "azure/core/http/http.hpp"
namespace Azure { namespace Storage { namespace Test {
constexpr static const char* c_StandardStorageConnectionString = "";
@ -283,4 +285,16 @@ namespace Azure { namespace Storage { namespace Test {
return std::chrono::system_clock::from_time_t(tt);
}
std::string InferSecondaryUri(const std::string primaryUri)
{
Azure::Core::Http::Url secondaryUri(primaryUri);
std::string primaryHost = secondaryUri.GetHost();
auto dotPos = primaryHost.find(".");
std::string accountName = primaryHost.substr(0, dotPos);
std::string secondaryHost = accountName + "-secondary" + primaryHost.substr(dotPos);
secondaryUri.SetHost(secondaryHost);
return secondaryUri.GetAbsoluteUrl();
}
}}} // namespace Azure::Storage::Test

View File

@ -39,6 +39,7 @@ namespace Azure { namespace Storage { namespace Test {
}
constexpr static const char* c_dummyETag = "0x8D83B58BDF51D75";
constexpr static const char* c_dummyETag2 = "0x8D812645BFB0CDE";
constexpr static const char* c_dummyMd5 = "tQbD1aMPeB+LiPffUwFQJQ==";
constexpr static const char* c_dummyCrc64 = "+DNR5PON4EM=";
@ -78,4 +79,6 @@ namespace Azure { namespace Storage { namespace Test {
std::chrono::system_clock::time_point FromRfc1123(const std::string& timeStr);
std::string InferSecondaryUri(const std::string primaryUri);
}}} // namespace Azure::Storage::Test