From ef0c530ee0250ed879cb68c37ca11c2b2ad7e249 Mon Sep 17 00:00:00 2001 From: Victor Vazquez Date: Mon, 27 Jul 2020 11:40:40 -0700 Subject: [PATCH] Pipeline/download content on pipeline to let Retry policy to handle read/get issues (#323) make upload chunk size custumizable --- sdk/core/azure-core/CMakeLists.txt | 1 + sdk/core/azure-core/inc/context.hpp | 16 ++ sdk/core/azure-core/inc/http/http.hpp | 4 + sdk/core/azure-core/inc/http/policy.hpp | 27 ++- sdk/core/azure-core/src/context.cpp | 2 +- .../azure-core/src/http/transport_policy.cpp | 31 +++ ...re_core_storage_list_containers_sample.cpp | 2 +- .../e2e/azure_core_storage_test_sample.cpp | 2 +- .../e2e/azure_core_with_curl_bodyBuffer.cpp | 2 +- .../e2e/azure_core_with_curl_bodyStream.cpp | 2 +- .../azure-core/test/ut/transport_adapter.cpp | 193 +++++++++++++++--- .../azure-core/test/ut/transport_adapter.hpp | 9 +- 12 files changed, 252 insertions(+), 39 deletions(-) create mode 100644 sdk/core/azure-core/src/http/transport_policy.cpp diff --git a/sdk/core/azure-core/CMakeLists.txt b/sdk/core/azure-core/CMakeLists.txt index 91225748e..6a71534da 100644 --- a/sdk/core/azure-core/CMakeLists.txt +++ b/sdk/core/azure-core/CMakeLists.txt @@ -25,6 +25,7 @@ add_library ( src/http/request.cpp src/http/raw_response.cpp src/http/retry_policy.cpp + src/http/transport_policy.cpp src/http/telemetry_policy.cpp src/http/url.cpp src/http/winhttp/win_http_transport.cpp diff --git a/sdk/core/azure-core/inc/context.hpp b/sdk/core/azure-core/inc/context.hpp index b14c8333e..17671ed2b 100644 --- a/sdk/core/azure-core/inc/context.hpp +++ b/sdk/core/azure-core/inc/context.hpp @@ -204,6 +204,21 @@ namespace Azure { namespace Core { return empty; } + bool HasKey(const std::string& key) + { + if (!key.empty()) + { + for (auto ptr = m_contextSharedState; ptr; ptr = ptr->Parent) + { + if (ptr->Key == key) + { + return true; + } + } + } + return false; + } + void Cancel() { m_contextSharedState->CancelAt = time_point::min(); } void ThrowIfCanceled() @@ -216,4 +231,5 @@ namespace Azure { namespace Core { } }; + Context& GetApplicationContext(); }} // namespace Azure::Core diff --git a/sdk/core/azure-core/inc/http/http.hpp b/sdk/core/azure-core/inc/http/http.hpp index a5587fe5b..27b1705a0 100644 --- a/sdk/core/azure-core/inc/http/http.hpp +++ b/sdk/core/azure-core/inc/http/http.hpp @@ -285,6 +285,7 @@ namespace Azure { namespace Core { namespace Http { std::map m_headers; std::unique_ptr m_bodyStream; + std::vector m_body; explicit RawResponse( int32_t majorVersion, @@ -313,6 +314,7 @@ namespace Azure { namespace Core { namespace Http { void AddHeader(std::string const& header); void AddHeader(uint8_t const* const begin, uint8_t const* const last); void SetBodyStream(std::unique_ptr stream); + void SetBody(std::vector body) { this->m_body = std::move(body); } // adding getters for version and stream body. Clang will complain on Mac if we have unused // fields in a class @@ -326,6 +328,8 @@ namespace Azure { namespace Core { namespace Http { // If m_bodyStream was moved before. nullpr is returned return std::move(this->m_bodyStream); } + std::vector& GetBody() { return this->m_body; } + std::vector const& GetBody() const { return this->m_body; } }; }}} // namespace Azure::Core::Http diff --git a/sdk/core/azure-core/inc/http/policy.hpp b/sdk/core/azure-core/inc/http/policy.hpp index ae735d2c3..16976bee6 100644 --- a/sdk/core/azure-core/inc/http/policy.hpp +++ b/sdk/core/azure-core/inc/http/policy.hpp @@ -13,6 +13,10 @@ namespace Azure { namespace Core { namespace Http { + namespace Details { + constexpr static const char* c_GetStreamForBody = "no-download"; + } // namespace Details + class NextHttpPolicy; class HttpPolicy { @@ -54,6 +58,19 @@ namespace Azure { namespace Core { namespace Http { std::shared_ptr m_transport; public: + /** + * @brief Creates a context with the required configuration to avoid transport policy to + * download a body payload from http response and return a BodyStream to read from the wire + * instead. + * + * @param ctx parent context to be used to create a context with specific settings + * @return Azure::Core::Context + */ + static Azure::Core::Context DownloadViaStream(Azure::Core::Context& ctx) + { + return ctx.WithValue(Details::c_GetStreamForBody, true); + } + explicit TransportPolicy(std::shared_ptr transport) : m_transport(std::move(transport)) { @@ -65,15 +82,7 @@ namespace Azure { namespace Core { namespace Http { } std::unique_ptr Send(Context& ctx, Request& request, NextHttpPolicy nextHttpPolicy) - const override - { - AZURE_UNREFERENCED_PARAMETER(nextHttpPolicy); - /** - * The transport policy is always the last policy. - * Call the transport and return - */ - return m_transport->Send(ctx, request); - } + const override; }; struct RetryOptions diff --git a/sdk/core/azure-core/src/context.cpp b/sdk/core/azure-core/src/context.cpp index 4e1918ba0..b5cc04f61 100644 --- a/sdk/core/azure-core/src/context.cpp +++ b/sdk/core/azure-core/src/context.cpp @@ -6,7 +6,7 @@ using namespace Azure::Core; using time_point = std::chrono::system_clock::time_point; -Context& GetApplicationContext() +Context& Azure::Core::GetApplicationContext() { static Context ctx; return ctx; diff --git a/sdk/core/azure-core/src/http/transport_policy.cpp b/sdk/core/azure-core/src/http/transport_policy.cpp new file mode 100644 index 000000000..641cca1ec --- /dev/null +++ b/sdk/core/azure-core/src/http/transport_policy.cpp @@ -0,0 +1,31 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// SPDX-License-Identifier: MIT + +#include + +using namespace Azure::Core::Http; + +std::unique_ptr TransportPolicy::Send( + Context& ctx, + Request& request, + NextHttpPolicy nextHttpPolicy) const +{ + AZURE_UNREFERENCED_PARAMETER(nextHttpPolicy); + /** + * The transport policy is always the last policy. + * Call the transport and return + */ + auto response = m_transport->Send(ctx, request); + if (ctx.HasKey(Details::c_GetStreamForBody)) + { // special case to return a response with BodyStream to read directly from socket + return response; + } + + // default behavior for all request is to download body content to Response + // If ReadToEnd fail, retry policy will eventually call this again + auto bodyStream = response->GetBodyStream(); + response->SetBody(BodyStream::ReadToEnd(ctx, *bodyStream)); + // BodyStream is moved out of response. This makes transport implementation to clean any active + // session with sockets or internal state. + return response; +} diff --git a/sdk/core/azure-core/test/e2e/azure_core_storage_list_containers_sample.cpp b/sdk/core/azure-core/test/e2e/azure_core_storage_list_containers_sample.cpp index b65eb6089..6f08672f3 100644 --- a/sdk/core/azure-core/test/e2e/azure_core_storage_list_containers_sample.cpp +++ b/sdk/core/azure-core/test/e2e/azure_core_storage_list_containers_sample.cpp @@ -27,7 +27,7 @@ int main() auto httpPipeline = Http::HttpPipeline(policies); - auto context = Context(); + auto context = Azure::Core::GetApplicationContext(); string host("http://anglesharp.azurewebsites.net/Chunked"); diff --git a/sdk/core/azure-core/test/e2e/azure_core_storage_test_sample.cpp b/sdk/core/azure-core/test/e2e/azure_core_storage_test_sample.cpp index 0cb8a8259..f9782ec62 100644 --- a/sdk/core/azure-core/test/e2e/azure_core_storage_test_sample.cpp +++ b/sdk/core/azure-core/test/e2e/azure_core_storage_test_sample.cpp @@ -31,7 +31,7 @@ int main() auto httpPipeline = Http::HttpPipeline(policies); - auto context = Context(); + auto context = Azure::Core::GetApplicationContext(); // STORAGE_BLOB_WITH_SAS = like // "https://account.windows.net/azure/container/blob?sv=...&ss=...&..." diff --git a/sdk/core/azure-core/test/e2e/azure_core_with_curl_bodyBuffer.cpp b/sdk/core/azure-core/test/e2e/azure_core_with_curl_bodyBuffer.cpp index fbab873ca..30c67796e 100644 --- a/sdk/core/azure-core/test/e2e/azure_core_with_curl_bodyBuffer.cpp +++ b/sdk/core/azure-core/test/e2e/azure_core_with_curl_bodyBuffer.cpp @@ -51,7 +51,7 @@ int main() // Add the transport policy policies.push_back(std::make_unique(std::move(transport))); auto httpPipeline = Http::HttpPipeline(policies); - auto context = Context(); + auto context = Azure::Core::GetApplicationContext(); // Both requests uses a body buffer to be uploaded that would produce responses with bodyBuffer doHeadRequest(context, httpPipeline); diff --git a/sdk/core/azure-core/test/e2e/azure_core_with_curl_bodyStream.cpp b/sdk/core/azure-core/test/e2e/azure_core_with_curl_bodyStream.cpp index a9152c96f..d57c1bd50 100644 --- a/sdk/core/azure-core/test/e2e/azure_core_with_curl_bodyStream.cpp +++ b/sdk/core/azure-core/test/e2e/azure_core_with_curl_bodyStream.cpp @@ -54,7 +54,7 @@ int main() auto httpPipeline = Http::HttpPipeline(policies); std::unique_ptr response; - auto context = Context(); + auto context = Azure::Core::GetApplicationContext(); doGetRequest(context, httpPipeline); doPutStreamRequest(context, httpPipeline); diff --git a/sdk/core/azure-core/test/ut/transport_adapter.cpp b/sdk/core/azure-core/test/ut/transport_adapter.cpp index e5eed6626..973a707a2 100644 --- a/sdk/core/azure-core/test/ut/transport_adapter.cpp +++ b/sdk/core/azure-core/test/ut/transport_adapter.cpp @@ -2,6 +2,7 @@ // SPDX-License-Identifier: MIT #include "transport_adapter.hpp" +#include #include #include @@ -21,15 +22,18 @@ namespace Azure { namespace Core { namespace Test { = CreatePolicies(); Azure::Core::Http::HttpPipeline TransportAdapter::pipeline(policies); - Azure::Core::Context TransportAdapter::context = Azure::Core::Context(); + Azure::Core::Context TransportAdapter::context = Azure::Core::GetApplicationContext(); - void TransportAdapter::CheckBodyStreamLength( - Azure::Core::Http::BodyStream& body, + void TransportAdapter::CheckBodyFromBuffer( + Azure::Core::Http::RawResponse& response, int64_t size, std::string expectedBody) { - EXPECT_EQ(body.Length(), size); - auto bodyVector = Azure::Core::Http::BodyStream::ReadToEnd(context, body); + auto body = response.GetBodyStream(); + EXPECT_EQ(body, nullptr); + std::vector bodyVector = response.GetBody(); + int64_t bodySize = bodyVector.size(); + if (size > 0) { // only for known body size EXPECT_EQ(bodyVector.size(), size); @@ -42,6 +46,31 @@ namespace Azure { namespace Core { namespace Test { } } + void TransportAdapter::CheckBodyFromStream( + Azure::Core::Http::RawResponse& response, + int64_t size, + std::string expectedBody) + { + auto body = response.GetBodyStream(); + EXPECT_NE(body, nullptr); + + std::vector bodyVector = Azure::Core::Http::BodyStream::ReadToEnd(context, *body); + int64_t bodySize = body->Length(); + EXPECT_EQ(bodySize, size); + bodySize = bodyVector.size(); + + if (size > 0) + { // only for known body size + EXPECT_EQ(bodyVector.size(), size); + } + + if (expectedBody.size() > 0) + { + auto bodyString = std::string(bodyVector.begin(), bodyVector.end()); + EXPECT_STREQ(expectedBody.data(), bodyString.data()); + } + } // namespace Test + TEST_F(TransportAdapter, get) { std::string host("http://httpbin.org/get"); @@ -49,17 +78,15 @@ namespace Azure { namespace Core { namespace Test { auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Get, host); auto response = pipeline.Send(context, request); EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok); - auto body = response->GetBodyStream(); auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length")); - CheckBodyStreamLength(*body, expectedResponseBodySize); + CheckBodyFromBuffer(*response, expectedResponseBodySize); // Add a header and send again. RawResponse should return that header in the body request.AddHeader("123", "456"); response = pipeline.Send(context, request); EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok); - body = response->GetBodyStream(); // header length is 6 (data) + 13 (formating) -> ` "123": "456"\r\n,` - CheckBodyStreamLength(*body, expectedResponseBodySize + 6 + 13); + CheckBodyFromBuffer(*response, expectedResponseBodySize + 6 + 13); } TEST_F(TransportAdapter, getLoop) @@ -74,8 +101,7 @@ namespace Azure { namespace Core { namespace Test { auto response = pipeline.Send(context, request); auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length")); EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok); - auto body = response->GetBodyStream(); - CheckBodyStreamLength(*body, expectedResponseBodySize); + CheckBodyFromBuffer(*response, expectedResponseBodySize); } } @@ -87,8 +113,7 @@ namespace Azure { namespace Core { namespace Test { auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Head, host); auto response = pipeline.Send(context, request); EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok); - auto body = response->GetBodyStream(); - CheckBodyStreamLength(*body, expectedResponseBodySize); + CheckBodyFromBuffer(*response, expectedResponseBodySize); // Check content-length header to be greater than 0 int64_t contentLengthHeader = std::stoull(response->GetHeaders().at("content-length")); @@ -108,8 +133,7 @@ namespace Azure { namespace Core { namespace Test { EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok); auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length")); - auto body = response->GetBodyStream(); - CheckBodyStreamLength(*body, expectedResponseBodySize); + CheckBodyFromBuffer(*response, expectedResponseBodySize); } TEST_F(TransportAdapter, deleteRequest) @@ -124,9 +148,8 @@ namespace Azure { namespace Core { namespace Test { auto response = pipeline.Send(context, request); EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok); - auto body = response->GetBodyStream(); auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length")); - CheckBodyStreamLength(*body, expectedResponseBodySize); + CheckBodyFromBuffer(*response, expectedResponseBodySize); } TEST_F(TransportAdapter, patch) @@ -141,9 +164,8 @@ namespace Azure { namespace Core { namespace Test { auto response = pipeline.Send(context, request); EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok); - auto body = response->GetBodyStream(); auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length")); - CheckBodyStreamLength(*body, expectedResponseBodySize); + CheckBodyFromBuffer(*response, expectedResponseBodySize); } TEST_F(TransportAdapter, getChunk) @@ -159,9 +181,135 @@ namespace Azure { namespace Core { namespace Test { auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Get, host); auto response = pipeline.Send(context, request); + EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok); - auto body = response->GetBodyStream(); - CheckBodyStreamLength(*body, expectedResponseBodySize, expectedChunkResponse); + CheckBodyFromBuffer(*response, expectedResponseBodySize, expectedChunkResponse); + } + + // ********************** + // ***Same tests but getting stream to pull from socket, simulating the Download Op + // ********************** + + TEST_F(TransportAdapter, getWithStream) + { + std::string host("http://httpbin.org/get"); + + auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Get, host); + auto ctx = Azure::Core::Http::TransportPolicy::DownloadViaStream(context); + auto response = pipeline.Send(ctx, request); + EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok); + auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length")); + CheckBodyFromStream(*response, expectedResponseBodySize); + + // Add a header and send again. Response should return that header in the body + request.AddHeader("123", "456"); + response = pipeline.Send(ctx, request); + EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok); + // header length is 6 (data) + 13 (formating) -> ` "123": "456"\r\n,` + CheckBodyFromStream(*response, expectedResponseBodySize + 6 + 13); + } + + TEST_F(TransportAdapter, getLoopWithStream) + { + std::string host("http://httpbin.org/get"); + + auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Get, host); + + // loop sending request + for (auto i = 0; i < 20; i++) + { + auto ctx = Azure::Core::Http::TransportPolicy::DownloadViaStream(context); + auto response = pipeline.Send(ctx, request); + auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length")); + EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok); + CheckBodyFromStream(*response, expectedResponseBodySize); + } + } + + TEST_F(TransportAdapter, headWithStream) + { + std::string host("http://httpbin.org/get"); + auto expectedResponseBodySize = 0; + + auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Head, host); + auto ctx = Azure::Core::Http::TransportPolicy::DownloadViaStream(context); + auto response = pipeline.Send(ctx, request); + EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok); + CheckBodyFromStream(*response, expectedResponseBodySize); + + // Check content-length header to be greater than 0 + int64_t contentLengthHeader = std::stoull(response->GetHeaders().at("content-length")); + EXPECT_TRUE(contentLengthHeader > 0); + } + + TEST_F(TransportAdapter, putWithStream) + { + std::string host("http://httpbin.org/put"); + + // PUT 1MB + auto requestBodyVector = std::vector(1024 * 1024, 'x'); + auto bodyRequest = Azure::Core::Http::MemoryBodyStream(requestBodyVector); + auto request + = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Put, host, &bodyRequest); + auto ctx = Azure::Core::Http::TransportPolicy::DownloadViaStream(context); + auto response = pipeline.Send(ctx, request); + EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok); + auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length")); + + CheckBodyFromStream(*response, expectedResponseBodySize); + } + + TEST_F(TransportAdapter, deleteRequestWithStream) + { + std::string host("http://httpbin.org/delete"); + + // Delete with 1MB payload + auto requestBodyVector = std::vector(1024 * 1024, 'x'); + auto bodyRequest = Azure::Core::Http::MemoryBodyStream(requestBodyVector); + auto request + = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Delete, host, &bodyRequest); + auto ctx = Azure::Core::Http::TransportPolicy::DownloadViaStream(context); + auto response = pipeline.Send(ctx, request); + EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok); + + auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length")); + CheckBodyFromStream(*response, expectedResponseBodySize); + } + + TEST_F(TransportAdapter, patchWithStream) + { + std::string host("http://httpbin.org/patch"); + + // Patch with 1kb payload + auto requestBodyVector = std::vector(1024, 'x'); + auto bodyRequest = Azure::Core::Http::MemoryBodyStream(requestBodyVector); + auto request + = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Patch, host, &bodyRequest); + auto ctx = Azure::Core::Http::TransportPolicy::DownloadViaStream(context); + auto response = pipeline.Send(ctx, request); + EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok); + + auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length")); + CheckBodyFromStream(*response, expectedResponseBodySize); + } + + TEST_F(TransportAdapter, getChunkWithStream) + { + std::string host("http://anglesharp.azurewebsites.net/Chunked"); + auto expectedResponseBodySize = -1; // chunked will return unknown body length + auto expectedChunkResponse = std::string( + "\r\n\r\n\r\n\r\nChunked " + "transfer encoding test\r\n\r\n

Chunked transfer encoding " + "test

This is a chunked response after 100 ms.
This is a chunked " + "response after 1 second. The server should not close the stream before all chunks are " + "sent to a client.
"); + + auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Get, host); + auto ctx = Azure::Core::Http::TransportPolicy::DownloadViaStream(context); + auto response = pipeline.Send(ctx, request); + + EXPECT_TRUE(response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok); + CheckBodyFromStream(*response, expectedResponseBodySize, expectedChunkResponse); } TEST_F(TransportAdapter, createResponseT) @@ -176,9 +324,8 @@ namespace Azure { namespace Core { namespace Test { auto& r = responseT.GetRawResponse(); EXPECT_TRUE(r.GetStatusCode() == Azure::Core::Http::HttpStatusCode::Ok); - auto body = r.GetBodyStream(); auto expectedResponseBodySize = std::stoull(r.GetHeaders().at("content-length")); - CheckBodyStreamLength(*body, expectedResponseBodySize); + CheckBodyFromBuffer(r, expectedResponseBodySize); // Direct access EXPECT_STREQ((*responseT).data(), expectedType.data()); diff --git a/sdk/core/azure-core/test/ut/transport_adapter.hpp b/sdk/core/azure-core/test/ut/transport_adapter.hpp index 4c2461dae..7bd5c2037 100644 --- a/sdk/core/azure-core/test/ut/transport_adapter.hpp +++ b/sdk/core/azure-core/test/ut/transport_adapter.hpp @@ -18,8 +18,13 @@ namespace Azure { namespace Core { namespace Test { static std::vector> policies; static Azure::Core::Context context; - static void CheckBodyStreamLength( - Azure::Core::Http::BodyStream& body, + static void CheckBodyFromBuffer( + Azure::Core::Http::RawResponse& response, + int64_t size, + std::string expectedBody = std::string("")); + + static void CheckBodyFromStream( + Azure::Core::Http::RawResponse& response, int64_t size, std::string expectedBody = std::string("")); };