Pipeline/download content on pipeline to let Retry policy to handle read/get issues (#323)

make upload chunk size custumizable
This commit is contained in:
Victor Vazquez 2020-07-27 11:40:40 -07:00 committed by GitHub
parent 573ffdf1f7
commit ef0c530ee0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 252 additions and 39 deletions

View File

@ -25,6 +25,7 @@ add_library (
src/http/request.cpp
src/http/raw_response.cpp
src/http/retry_policy.cpp
src/http/transport_policy.cpp
src/http/telemetry_policy.cpp
src/http/url.cpp
src/http/winhttp/win_http_transport.cpp

View File

@ -204,6 +204,21 @@ namespace Azure { namespace Core {
return empty;
}
bool HasKey(const std::string& key)
{
if (!key.empty())
{
for (auto ptr = m_contextSharedState; ptr; ptr = ptr->Parent)
{
if (ptr->Key == key)
{
return true;
}
}
}
return false;
}
void Cancel() { m_contextSharedState->CancelAt = time_point::min(); }
void ThrowIfCanceled()
@ -216,4 +231,5 @@ namespace Azure { namespace Core {
}
};
Context& GetApplicationContext();
}} // namespace Azure::Core

View File

@ -285,6 +285,7 @@ namespace Azure { namespace Core { namespace Http {
std::map<std::string, std::string> m_headers;
std::unique_ptr<BodyStream> m_bodyStream;
std::vector<uint8_t> m_body;
explicit RawResponse(
int32_t majorVersion,
@ -313,6 +314,7 @@ namespace Azure { namespace Core { namespace Http {
void AddHeader(std::string const& header);
void AddHeader(uint8_t const* const begin, uint8_t const* const last);
void SetBodyStream(std::unique_ptr<BodyStream> stream);
void SetBody(std::vector<uint8_t> body) { this->m_body = std::move(body); }
// adding getters for version and stream body. Clang will complain on Mac if we have unused
// fields in a class
@ -326,6 +328,8 @@ namespace Azure { namespace Core { namespace Http {
// If m_bodyStream was moved before. nullpr is returned
return std::move(this->m_bodyStream);
}
std::vector<uint8_t>& GetBody() { return this->m_body; }
std::vector<uint8_t> const& GetBody() const { return this->m_body; }
};
}}} // namespace Azure::Core::Http

View File

@ -13,6 +13,10 @@
namespace Azure { namespace Core { namespace Http {
namespace Details {
constexpr static const char* c_GetStreamForBody = "no-download";
} // namespace Details
class NextHttpPolicy;
class HttpPolicy {
@ -54,6 +58,19 @@ namespace Azure { namespace Core { namespace Http {
std::shared_ptr<HttpTransport> m_transport;
public:
/**
* @brief Creates a context with the required configuration to avoid transport policy to
* download a body payload from http response and return a BodyStream to read from the wire
* instead.
*
* @param ctx parent context to be used to create a context with specific settings
* @return Azure::Core::Context
*/
static Azure::Core::Context DownloadViaStream(Azure::Core::Context& ctx)
{
return ctx.WithValue(Details::c_GetStreamForBody, true);
}
explicit TransportPolicy(std::shared_ptr<HttpTransport> transport)
: m_transport(std::move(transport))
{
@ -65,15 +82,7 @@ namespace Azure { namespace Core { namespace Http {
}
std::unique_ptr<RawResponse> Send(Context& ctx, Request& request, NextHttpPolicy nextHttpPolicy)
const override
{
AZURE_UNREFERENCED_PARAMETER(nextHttpPolicy);
/**
* The transport policy is always the last policy.
* Call the transport and return
*/
return m_transport->Send(ctx, request);
}
const override;
};
struct RetryOptions

View File

@ -6,7 +6,7 @@
using namespace Azure::Core;
using time_point = std::chrono::system_clock::time_point;
Context& GetApplicationContext()
Context& Azure::Core::GetApplicationContext()
{
static Context ctx;
return ctx;

View File

@ -0,0 +1,31 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
#include <http/policy.hpp>
using namespace Azure::Core::Http;
std::unique_ptr<RawResponse> TransportPolicy::Send(
Context& ctx,
Request& request,
NextHttpPolicy nextHttpPolicy) const
{
AZURE_UNREFERENCED_PARAMETER(nextHttpPolicy);
/**
* The transport policy is always the last policy.
* Call the transport and return
*/
auto response = m_transport->Send(ctx, request);
if (ctx.HasKey(Details::c_GetStreamForBody))
{ // special case to return a response with BodyStream to read directly from socket
return response;
}
// default behavior for all request is to download body content to Response
// If ReadToEnd fail, retry policy will eventually call this again
auto bodyStream = response->GetBodyStream();
response->SetBody(BodyStream::ReadToEnd(ctx, *bodyStream));
// BodyStream is moved out of response. This makes transport implementation to clean any active
// session with sockets or internal state.
return response;
}

View File

@ -27,7 +27,7 @@ int main()
auto httpPipeline = Http::HttpPipeline(policies);
auto context = Context();
auto context = Azure::Core::GetApplicationContext();
string host("http://anglesharp.azurewebsites.net/Chunked");

View File

@ -31,7 +31,7 @@ int main()
auto httpPipeline = Http::HttpPipeline(policies);
auto context = Context();
auto context = Azure::Core::GetApplicationContext();
// STORAGE_BLOB_WITH_SAS = like
// "https://account.windows.net/azure/container/blob?sv=...&ss=...&..."

View File

@ -51,7 +51,7 @@ int main()
// Add the transport policy
policies.push_back(std::make_unique<TransportPolicy>(std::move(transport)));
auto httpPipeline = Http::HttpPipeline(policies);
auto context = Context();
auto context = Azure::Core::GetApplicationContext();
// Both requests uses a body buffer to be uploaded that would produce responses with bodyBuffer
doHeadRequest(context, httpPipeline);

View File

@ -54,7 +54,7 @@ int main()
auto httpPipeline = Http::HttpPipeline(policies);
std::unique_ptr<Http::RawResponse> response;
auto context = Context();
auto context = Azure::Core::GetApplicationContext();
doGetRequest(context, httpPipeline);
doPutStreamRequest(context, httpPipeline);

View File

@ -2,6 +2,7 @@
// SPDX-License-Identifier: MIT
#include "transport_adapter.hpp"
#include <context.hpp>
#include <response.hpp>
#include <string>
@ -21,15 +22,18 @@ namespace Azure { namespace Core { namespace Test {
= CreatePolicies();
Azure::Core::Http::HttpPipeline TransportAdapter::pipeline(policies);
Azure::Core::Context TransportAdapter::context = Azure::Core::Context();
Azure::Core::Context TransportAdapter::context = Azure::Core::GetApplicationContext();
void TransportAdapter::CheckBodyStreamLength(
Azure::Core::Http::BodyStream& body,
void TransportAdapter::CheckBodyFromBuffer(
Azure::Core::Http::RawResponse& response,
int64_t size,
std::string expectedBody)
{
EXPECT_EQ(body.Length(), size);
auto bodyVector = Azure::Core::Http::BodyStream::ReadToEnd(context, body);
auto body = response.GetBodyStream();
EXPECT_EQ(body, nullptr);
std::vector<uint8_t> bodyVector = response.GetBody();
int64_t bodySize = bodyVector.size();
if (size > 0)
{ // only for known body size
EXPECT_EQ(bodyVector.size(), size);
@ -42,6 +46,31 @@ namespace Azure { namespace Core { namespace Test {
}
}
void TransportAdapter::CheckBodyFromStream(
Azure::Core::Http::RawResponse& response,
int64_t size,
std::string expectedBody)
{
auto body = response.GetBodyStream();
EXPECT_NE(body, nullptr);
std::vector<uint8_t> bodyVector = Azure::Core::Http::BodyStream::ReadToEnd(context, *body);
int64_t bodySize = body->Length();
EXPECT_EQ(bodySize, size);
bodySize = bodyVector.size();
if (size > 0)
{ // only for known body size
EXPECT_EQ(bodyVector.size(), size);
}
if (expectedBody.size() > 0)
{
auto bodyString = std::string(bodyVector.begin(), bodyVector.end());
EXPECT_STREQ(expectedBody.data(), bodyString.data());
}
} // namespace Test
TEST_F(TransportAdapter, get)
{
std::string host("http://httpbin.org/get");
@ -49,17 +78,15 @@ namespace Azure { namespace Core { namespace Test {
auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Get, host);
auto response = pipeline.Send(context, request);
EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok);
auto body = response->GetBodyStream();
auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length"));
CheckBodyStreamLength(*body, expectedResponseBodySize);
CheckBodyFromBuffer(*response, expectedResponseBodySize);
// Add a header and send again. RawResponse should return that header in the body
request.AddHeader("123", "456");
response = pipeline.Send(context, request);
EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok);
body = response->GetBodyStream();
// header length is 6 (data) + 13 (formating) -> ` "123": "456"\r\n,`
CheckBodyStreamLength(*body, expectedResponseBodySize + 6 + 13);
CheckBodyFromBuffer(*response, expectedResponseBodySize + 6 + 13);
}
TEST_F(TransportAdapter, getLoop)
@ -74,8 +101,7 @@ namespace Azure { namespace Core { namespace Test {
auto response = pipeline.Send(context, request);
auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length"));
EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok);
auto body = response->GetBodyStream();
CheckBodyStreamLength(*body, expectedResponseBodySize);
CheckBodyFromBuffer(*response, expectedResponseBodySize);
}
}
@ -87,8 +113,7 @@ namespace Azure { namespace Core { namespace Test {
auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Head, host);
auto response = pipeline.Send(context, request);
EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok);
auto body = response->GetBodyStream();
CheckBodyStreamLength(*body, expectedResponseBodySize);
CheckBodyFromBuffer(*response, expectedResponseBodySize);
// Check content-length header to be greater than 0
int64_t contentLengthHeader = std::stoull(response->GetHeaders().at("content-length"));
@ -108,8 +133,7 @@ namespace Azure { namespace Core { namespace Test {
EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok);
auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length"));
auto body = response->GetBodyStream();
CheckBodyStreamLength(*body, expectedResponseBodySize);
CheckBodyFromBuffer(*response, expectedResponseBodySize);
}
TEST_F(TransportAdapter, deleteRequest)
@ -124,9 +148,8 @@ namespace Azure { namespace Core { namespace Test {
auto response = pipeline.Send(context, request);
EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok);
auto body = response->GetBodyStream();
auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length"));
CheckBodyStreamLength(*body, expectedResponseBodySize);
CheckBodyFromBuffer(*response, expectedResponseBodySize);
}
TEST_F(TransportAdapter, patch)
@ -141,9 +164,8 @@ namespace Azure { namespace Core { namespace Test {
auto response = pipeline.Send(context, request);
EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok);
auto body = response->GetBodyStream();
auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length"));
CheckBodyStreamLength(*body, expectedResponseBodySize);
CheckBodyFromBuffer(*response, expectedResponseBodySize);
}
TEST_F(TransportAdapter, getChunk)
@ -159,9 +181,135 @@ namespace Azure { namespace Core { namespace Test {
auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Get, host);
auto response = pipeline.Send(context, request);
EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok);
auto body = response->GetBodyStream();
CheckBodyStreamLength(*body, expectedResponseBodySize, expectedChunkResponse);
CheckBodyFromBuffer(*response, expectedResponseBodySize, expectedChunkResponse);
}
// **********************
// ***Same tests but getting stream to pull from socket, simulating the Download Op
// **********************
TEST_F(TransportAdapter, getWithStream)
{
std::string host("http://httpbin.org/get");
auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Get, host);
auto ctx = Azure::Core::Http::TransportPolicy::DownloadViaStream(context);
auto response = pipeline.Send(ctx, request);
EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok);
auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length"));
CheckBodyFromStream(*response, expectedResponseBodySize);
// Add a header and send again. Response should return that header in the body
request.AddHeader("123", "456");
response = pipeline.Send(ctx, request);
EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok);
// header length is 6 (data) + 13 (formating) -> ` "123": "456"\r\n,`
CheckBodyFromStream(*response, expectedResponseBodySize + 6 + 13);
}
TEST_F(TransportAdapter, getLoopWithStream)
{
std::string host("http://httpbin.org/get");
auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Get, host);
// loop sending request
for (auto i = 0; i < 20; i++)
{
auto ctx = Azure::Core::Http::TransportPolicy::DownloadViaStream(context);
auto response = pipeline.Send(ctx, request);
auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length"));
EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok);
CheckBodyFromStream(*response, expectedResponseBodySize);
}
}
TEST_F(TransportAdapter, headWithStream)
{
std::string host("http://httpbin.org/get");
auto expectedResponseBodySize = 0;
auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Head, host);
auto ctx = Azure::Core::Http::TransportPolicy::DownloadViaStream(context);
auto response = pipeline.Send(ctx, request);
EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok);
CheckBodyFromStream(*response, expectedResponseBodySize);
// Check content-length header to be greater than 0
int64_t contentLengthHeader = std::stoull(response->GetHeaders().at("content-length"));
EXPECT_TRUE(contentLengthHeader > 0);
}
TEST_F(TransportAdapter, putWithStream)
{
std::string host("http://httpbin.org/put");
// PUT 1MB
auto requestBodyVector = std::vector<uint8_t>(1024 * 1024, 'x');
auto bodyRequest = Azure::Core::Http::MemoryBodyStream(requestBodyVector);
auto request
= Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Put, host, &bodyRequest);
auto ctx = Azure::Core::Http::TransportPolicy::DownloadViaStream(context);
auto response = pipeline.Send(ctx, request);
EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok);
auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length"));
CheckBodyFromStream(*response, expectedResponseBodySize);
}
TEST_F(TransportAdapter, deleteRequestWithStream)
{
std::string host("http://httpbin.org/delete");
// Delete with 1MB payload
auto requestBodyVector = std::vector<uint8_t>(1024 * 1024, 'x');
auto bodyRequest = Azure::Core::Http::MemoryBodyStream(requestBodyVector);
auto request
= Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Delete, host, &bodyRequest);
auto ctx = Azure::Core::Http::TransportPolicy::DownloadViaStream(context);
auto response = pipeline.Send(ctx, request);
EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok);
auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length"));
CheckBodyFromStream(*response, expectedResponseBodySize);
}
TEST_F(TransportAdapter, patchWithStream)
{
std::string host("http://httpbin.org/patch");
// Patch with 1kb payload
auto requestBodyVector = std::vector<uint8_t>(1024, 'x');
auto bodyRequest = Azure::Core::Http::MemoryBodyStream(requestBodyVector);
auto request
= Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Patch, host, &bodyRequest);
auto ctx = Azure::Core::Http::TransportPolicy::DownloadViaStream(context);
auto response = pipeline.Send(ctx, request);
EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok);
auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length"));
CheckBodyFromStream(*response, expectedResponseBodySize);
}
TEST_F(TransportAdapter, getChunkWithStream)
{
std::string host("http://anglesharp.azurewebsites.net/Chunked");
auto expectedResponseBodySize = -1; // chunked will return unknown body length
auto expectedChunkResponse = std::string(
"<!DOCTYPE html>\r\n<html lang=en>\r\n<head>\r\n<meta charset='utf-8'>\r\n<title>Chunked "
"transfer encoding test</title>\r\n</head>\r\n<body><h1>Chunked transfer encoding "
"test</h1><h5>This is a chunked response after 100 ms.</h5><h5>This is a chunked "
"response after 1 second. The server should not close the stream before all chunks are "
"sent to a client.</h5></body></html>");
auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Get, host);
auto ctx = Azure::Core::Http::TransportPolicy::DownloadViaStream(context);
auto response = pipeline.Send(ctx, request);
EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok);
CheckBodyFromStream(*response, expectedResponseBodySize, expectedChunkResponse);
}
TEST_F(TransportAdapter, createResponseT)
@ -176,9 +324,8 @@ namespace Azure { namespace Core { namespace Test {
auto& r = responseT.GetRawResponse();
EXPECT_TRUE(r.GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok);
auto body = r.GetBodyStream();
auto expectedResponseBodySize = std::stoull(r.GetHeaders().at("content-length"));
CheckBodyStreamLength(*body, expectedResponseBodySize);
CheckBodyFromBuffer(r, expectedResponseBodySize);
// Direct access
EXPECT_STREQ((*responseT).data(), expectedType.data());

View File

@ -18,8 +18,13 @@ namespace Azure { namespace Core { namespace Test {
static std::vector<std::unique_ptr<Azure::Core::Http::HttpPolicy>> policies;
static Azure::Core::Context context;
static void CheckBodyStreamLength(
Azure::Core::Http::BodyStream& body,
static void CheckBodyFromBuffer(
Azure::Core::Http::RawResponse& response,
int64_t size,
std::string expectedBody = std::string(""));
static void CheckBodyFromStream(
Azure::Core::Http::RawResponse& response,
int64_t size,
std::string expectedBody = std::string(""));
};