From 6e33262be5d74177d40f443af24317719209ab7b Mon Sep 17 00:00:00 2001 From: Victor Vazquez Date: Fri, 28 Aug 2020 05:45:33 +0000 Subject: [PATCH] Fix for connection re-use on response error (#548) * Fix for connection re-use on response error --- .../inc/azure/core/http/curl/curl.hpp | 15 +- sdk/core/azure-core/src/http/curl/curl.cpp | 67 ++++-- sdk/core/azure-core/test/ut/CMakeLists.txt | 2 +- .../azure-core/test/ut/transport_adapter.cpp | 199 ++++++++++-------- ....cpp => transport_adapter_file_upload.cpp} | 8 +- 5 files changed, 176 insertions(+), 115 deletions(-) rename sdk/core/azure-core/test/ut/{file_upload.cpp => transport_adapter_file_upload.cpp} (96%) diff --git a/sdk/core/azure-core/inc/azure/core/http/curl/curl.hpp b/sdk/core/azure-core/inc/azure/core/http/curl/curl.hpp index ec8ec4928..f3ba91833 100644 --- a/sdk/core/azure-core/inc/azure/core/http/curl/curl.hpp +++ b/sdk/core/azure-core/inc/azure/core/http/curl/curl.hpp @@ -93,7 +93,9 @@ namespace Azure { namespace Core { namespace Http { /** * Moves a connection back to the pool to be re-used */ - static void MoveConnectionBackToPool(std::unique_ptr connection); + static void MoveConnectionBackToPool( + std::unique_ptr connection, + Http::HttpStatusCode lastStatusCode); // Class can't have instances. CurlConnectionPool() = delete; @@ -107,6 +109,8 @@ namespace Azure { namespace Core { namespace Http { static int32_t s_connectionCounter; static bool s_isCleanConnectionsRunning; + // Removes all connections and indexes + static void ClearIndex() { CurlConnectionPool::s_connectionPoolIndex.clear(); } // Makes possible to know the number of current connections in the connection pool for an // index @@ -427,7 +431,7 @@ namespace Azure { namespace Core { namespace Http { * * @return CURL_OK when an HTTP response is created. */ - void ReadStatusLineAndHeadersFromRawResponse(); + void ReadStatusLineAndHeadersFromRawResponse(bool reUseInternalBUffer = false); /** * @brief Reads from inner buffer or from Wire until chunkSize is parsed and converted to @@ -448,6 +452,8 @@ namespace Azure { namespace Core { namespace Http { */ int64_t ReadFromSocket(uint8_t* buffer, int64_t bufferSize); + Http::HttpStatusCode m_lastStatusCode; + bool IsEOF() { return this->m_isChunkedResponseType ? this->m_chunkSize == 0 @@ -477,9 +483,10 @@ namespace Azure { namespace Core { namespace Http { // in the wire. // By not moving the connection back to the pool, it gets destroyed calling the connection // destructor to clean libcurl handle and close the connection. - if (IsEOF()) + if (this->IsEOF()) { - CurlConnectionPool::MoveConnectionBackToPool(std::move(this->m_connection)); + CurlConnectionPool::MoveConnectionBackToPool( + std::move(this->m_connection), this->m_lastStatusCode); } } diff --git a/sdk/core/azure-core/src/http/curl/curl.cpp b/sdk/core/azure-core/src/http/curl/curl.cpp index d4cc27e3a..18058f750 100644 --- a/sdk/core/azure-core/src/http/curl/curl.cpp +++ b/sdk/core/azure-core/src/http/curl/curl.cpp @@ -109,11 +109,19 @@ CURLcode CurlSession::Perform(Context const& context) // Check server response from Expect:100-continue for PUT; // This help to prevent us from start uploading data when Server can't handle it - if (this->m_response->GetStatusCode() != HttpStatusCode::Continue) + if (this->m_lastStatusCode != HttpStatusCode::Continue) { return result; // Won't upload. } + if (this->m_bodyStartInBuffer > 0) + { + // If internal buffer has more data after the 100-continue means Server return an error. + // We don't need to upload body, just parse the response from Server and return + ReadStatusLineAndHeadersFromRawResponse(true); + return result; + } + // Start upload result = this->UploadBody(context); if (result != CURLE_OK) @@ -334,7 +342,7 @@ void CurlSession::ParseChunkSize() } // Read status line plus headers to create a response with no body -void CurlSession::ReadStatusLineAndHeadersFromRawResponse() +void CurlSession::ReadStatusLineAndHeadersFromRawResponse(bool reUseInternalBUffer) { auto parser = ResponseBufferParser(); auto bufferSize = int64_t(); @@ -342,12 +350,27 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse() // Keep reading until all headers were read while (!parser.IsParseCompleted()) { - // Try to fill internal buffer from socket. - // If response is smaller than buffer, we will get back the size of the response - bufferSize = ReadFromSocket(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize); - - // returns the number of bytes parsed up to the body Start - auto bytesParsed = parser.Parse(this->m_readBuffer, static_cast(bufferSize)); + int64_t bytesParsed = 0; + if (reUseInternalBUffer) + { + // parse from internal buffer. This means previous read from server got more than one response. + // This happens when Server returns a 100-continue plus an error code + bufferSize = this->m_innerBufferSize - this->m_bodyStartInBuffer; + bytesParsed = parser.Parse( + this->m_readBuffer + this->m_bodyStartInBuffer, static_cast(bufferSize)); + // if parsing from internal buffer is not enough, do next read from wire + reUseInternalBUffer = false; + // reset body start + this->m_bodyStartInBuffer = -1; + } + else + { + // Try to fill internal buffer from socket. + // If response is smaller than buffer, we will get back the size of the response + bufferSize = ReadFromSocket(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize); + // returns the number of bytes parsed up to the body Start + bytesParsed = parser.Parse(this->m_readBuffer, static_cast(bufferSize)); + } if (bytesParsed < bufferSize) { @@ -357,6 +380,7 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse() this->m_response = parser.GetResponse(); this->m_innerBufferSize = static_cast(bufferSize); + this->m_lastStatusCode = this->m_response->GetStatusCode(); // For Head request, set the length of body response to 0. // Response will give us content-length as if we were not doing Head saying what would it be the @@ -364,7 +388,7 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse() // For NoContent status code, also need to set conentLength to 0. // https://github.com/Azure/azure-sdk-for-cpp/issues/406 if (this->m_request.GetMethod() == HttpMethod::Head - || this->m_response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::NoContent) + || this->m_lastStatusCode == Azure::Core::Http::HttpStatusCode::NoContent) { this->m_contentLength = 0; this->m_bodyStartInBuffer = -1; @@ -810,7 +834,7 @@ std::unique_ptr CurlConnectionPool::GetCurlConnection(Request& r CurlConnectionPool::s_connectionCounter -= 1; // Remove index if there are no more connections - if (CurlConnectionPool::s_connectionPoolIndex.size() == 0) + if (hostPoolIndex->second.size() == 0) { CurlConnectionPool::s_connectionPoolIndex.erase(hostPoolIndex); } @@ -830,15 +854,16 @@ std::unique_ptr CurlConnectionPool::GetCurlConnection(Request& r if (result != CURLE_OK) { throw std::runtime_error( - Details::c_DefaultFailedToGetNewConnectionTemplate + host + ". Could not set URL."); + Details::c_DefaultFailedToGetNewConnectionTemplate + host + ". " + + std::string(curl_easy_strerror(result))); } result = curl_easy_setopt(newConnection->GetHandle(), CURLOPT_CONNECT_ONLY, 1L); if (result != CURLE_OK) { throw std::runtime_error( - Details::c_DefaultFailedToGetNewConnectionTemplate + host - + ". Could not set connect only ON."); + Details::c_DefaultFailedToGetNewConnectionTemplate + host + ". " + + std::string(curl_easy_strerror(result))); } // curl_easy_setopt(newConnection->GetHandle(), CURLOPT_VERBOSE, 1L); @@ -849,22 +874,32 @@ std::unique_ptr CurlConnectionPool::GetCurlConnection(Request& r if (result != CURLE_OK) { throw std::runtime_error( - Details::c_DefaultFailedToGetNewConnectionTemplate + host + ". Could not set timeout."); + Details::c_DefaultFailedToGetNewConnectionTemplate + host + ". " + + std::string(curl_easy_strerror(result))); } result = curl_easy_perform(newConnection->GetHandle()); if (result != CURLE_OK) { throw std::runtime_error( - Details::c_DefaultFailedToGetNewConnectionTemplate + host + ". Could not open connection."); + Details::c_DefaultFailedToGetNewConnectionTemplate + host + ". " + + std::string(curl_easy_strerror(result))); } return newConnection; } // Move the connection back to the connection pool. Push it to the front so it becomes the first // connection to be picked next time some one ask for a connection to the pool (LIFO) -void CurlConnectionPool::MoveConnectionBackToPool(std::unique_ptr connection) +void CurlConnectionPool::MoveConnectionBackToPool(std::unique_ptr connection, Http::HttpStatusCode lastStatusCode) { + auto code = static_cast::type>(lastStatusCode); + // laststatusCode = 0 + if (code < 200 || code >= 300) + { + // A hanlder with previos response with Error can't be re-use. + return; + } + // Lock mutex to access connection pool. mutex is unlock as soon as lock is out of scope std::lock_guard lock(CurlConnectionPool::s_connectionPoolMutex); auto& hostPool = CurlConnectionPool::s_connectionPoolIndex[connection->GetHost()]; diff --git a/sdk/core/azure-core/test/ut/CMakeLists.txt b/sdk/core/azure-core/test/ut/CMakeLists.txt index f7c5bb5d9..8d1f04412 100644 --- a/sdk/core/azure-core/test/ut/CMakeLists.txt +++ b/sdk/core/azure-core/test/ut/CMakeLists.txt @@ -21,7 +21,6 @@ set(CMAKE_CXX_STANDARD_REQUIRED True) add_executable ( ${TARGET_NAME} - file_upload.cpp http.cpp logging.cpp main.cpp @@ -30,6 +29,7 @@ add_executable ( telemetry_policy.cpp transport_adapter.cpp transport_adapter.hpp + transport_adapter_file_upload.cpp uuid.cpp context.cpp) diff --git a/sdk/core/azure-core/test/ut/transport_adapter.cpp b/sdk/core/azure-core/test/ut/transport_adapter.cpp index 1a2971ca4..64daa4d13 100644 --- a/sdk/core/azure-core/test/ut/transport_adapter.cpp +++ b/sdk/core/azure-core/test/ut/transport_adapter.cpp @@ -44,96 +44,6 @@ namespace Azure { namespace Core { namespace Test { CheckBodyFromBuffer(*response, expectedResponseBodySize + 6 + 13); } - // multiThread test requires `ConnectionsOnPool` hook which is only available when building - // TESTING_BUILD. This test cases are only built when that case is true.` - TEST_F(TransportAdapter, getMultiThread) - { - std::string host("http://httpbin.org/get"); - - auto threadRoutine = [host]() { - auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Get, host); - auto response = pipeline.Send(context, request); - checkResponseCode(response->GetStatusCode()); - auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length")); - CheckBodyFromBuffer(*response, expectedResponseBodySize); - }; - - std::thread t1(threadRoutine); - std::thread t2(threadRoutine); - t1.join(); - t2.join(); - auto connectionsNow = Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"); - - // 2 connections must be available at this point - EXPECT_EQ(connectionsNow, 2); - - std::thread t3(threadRoutine); - std::thread t4(threadRoutine); - std::thread t5(threadRoutine); - t3.join(); - t4.join(); - t5.join(); - connectionsNow = Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"); - // Two connections re-used plus one connection created - EXPECT_EQ(connectionsNow, 3); - } - -#ifdef RUN_LONG_UNIT_TESTS - TEST_F(TransportAdapter, ConnectionPoolCleaner) - { - std::string host("http://httpbin.org/get"); - - auto threadRoutine = [host]() { - auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Get, host); - auto response = pipeline.Send(context, request); - checkResponseCode(response->GetStatusCode()); - auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length")); - CheckBodyFromBuffer(*response, expectedResponseBodySize); - }; - - // one index expected from previous tests - EXPECT_EQ(Http::CurlConnectionPool::ConnectionsIndexOnPool(), 1); - - std::cout - << "Running Connection Pool Cleaner Test. This test takes more than 3 minutes to complete." - << std::endl - << "Add compiler option -DRUN_LONG_UNIT_TESTS=OFF when building if you want to skip this " - "test." - << std::endl; - - // Wait for 100 secs to make sure any previous connection is removed by the cleaner - std::this_thread::sleep_for(std::chrono::milliseconds(1000 * 100)); - - std::cout << "First wait time done. Validating state." << std::endl; - - // index is not affected by cleaner. It does not remove index - EXPECT_EQ(Http::CurlConnectionPool::ConnectionsIndexOnPool(), 1); - // cleaner should have remove connections - EXPECT_EQ(Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"), 0); - - // Let cleaner finish - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - - std::thread t1(threadRoutine); - std::thread t2(threadRoutine); - t1.join(); - t2.join(); - - // 2 connections must be available at this point and one index - EXPECT_EQ(Http::CurlConnectionPool::ConnectionsIndexOnPool(), 1); - EXPECT_EQ(Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"), 2); - - // At this point, cleaner should be ON and will clean connections after on second. - // After 5 seconds connection pool should have been cleaned - std::this_thread::sleep_for(std::chrono::milliseconds(1000 * 100)); - - std::cout << "Second wait time done. Validating state." << std::endl; - - // EXPECT_EQ(Http::CurlSession::ConnectionsIndexOnPool(), 0); - EXPECT_EQ(Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"), 0); - } -#endif - TEST_F(TransportAdapter, get204) { std::string host("http://mt3.google.com/generate_204"); @@ -242,6 +152,115 @@ namespace Azure { namespace Core { namespace Test { CheckBodyFromBuffer(*response, expectedResponseBodySize, expectedChunkResponse); } + TEST_F(TransportAdapter, putErrorResponse) + { + std::string host("http://httpbin.org/get"); + + // Try to make a PUT to a GET url. This will return an error code from server. + // This test makes sure that the connection is not re-used (because it gets closed by server) + // and next request is not hang + for (auto i = 0; i < 10; i++) + { + auto requestBodyVector = std::vector(10, 'x'); + auto bodyRequest = Azure::Core::Http::MemoryBodyStream(requestBodyVector); + auto request + = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Put, host, &bodyRequest); + auto response = pipeline.Send(context, request); + } + } + + // multiThread test requires `ConnectionsOnPool` hook which is only available when building + // TESTING_BUILD. This test cases are only built when that case is true.` + TEST_F(TransportAdapter, getMultiThread) + { + std::string host("http://httpbin.org/get"); + Azure::Core::Http::CurlConnectionPool::ClearIndex(); + + auto threadRoutine = [host]() { + auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Get, host); + auto response = pipeline.Send(context, request); + checkResponseCode(response->GetStatusCode()); + auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length")); + CheckBodyFromBuffer(*response, expectedResponseBodySize); + }; + + std::thread t1(threadRoutine); + std::thread t2(threadRoutine); + t1.join(); + t2.join(); + // wait a few ms for connections to go back to pool. + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + // 2 connections must be available at this point + EXPECT_EQ(Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"), 2); + + std::thread t3(threadRoutine); + std::thread t4(threadRoutine); + std::thread t5(threadRoutine); + t3.join(); + t4.join(); + t5.join(); + // wait a few ms for connections to go back to pool. + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + // Two connections re-used plus one connection created + EXPECT_EQ(Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"), 3); + } + +#ifdef RUN_LONG_UNIT_TESTS + TEST_F(TransportAdapter, ConnectionPoolCleaner) + { + std::string host("http://httpbin.org/get"); + + auto threadRoutine = [host]() { + auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Get, host); + auto response = pipeline.Send(context, request); + checkResponseCode(response->GetStatusCode()); + auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length")); + CheckBodyFromBuffer(*response, expectedResponseBodySize); + }; + + // one index expected from previous tests + EXPECT_EQ(Http::CurlConnectionPool::ConnectionsIndexOnPool(), 1); + + std::cout + << "Running Connection Pool Cleaner Test. This test takes more than 3 minutes to complete." + << std::endl + << "Add compiler option -DRUN_LONG_UNIT_TESTS=OFF when building if you want to skip this " + "test." + << std::endl; + + // Wait for 100 secs to make sure any previous connection is removed by the cleaner + std::this_thread::sleep_for(std::chrono::milliseconds(1000 * 100)); + + std::cout << "First wait time done. Validating state." << std::endl; + + // index is not affected by cleaner. It does not remove index + EXPECT_EQ(Http::CurlConnectionPool::ConnectionsIndexOnPool(), 1); + // cleaner should have remove connections + EXPECT_EQ(Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"), 0); + + // Let cleaner finish + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + std::thread t1(threadRoutine); + std::thread t2(threadRoutine); + t1.join(); + t2.join(); + + // 2 connections must be available at this point and one index + EXPECT_EQ(Http::CurlConnectionPool::ConnectionsIndexOnPool(), 1); + EXPECT_EQ(Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"), 2); + + // At this point, cleaner should be ON and will clean connections after on second. + // After 5 seconds connection pool should have been cleaned + std::this_thread::sleep_for(std::chrono::milliseconds(1000 * 100)); + + std::cout << "Second wait time done. Validating state." << std::endl; + + // EXPECT_EQ(Http::CurlSession::ConnectionsIndexOnPool(), 0); + EXPECT_EQ(Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"), 0); + } +#endif + // ********************** // ***Same tests but getting stream to pull from socket, simulating the Download Op // ********************** diff --git a/sdk/core/azure-core/test/ut/file_upload.cpp b/sdk/core/azure-core/test/ut/transport_adapter_file_upload.cpp similarity index 96% rename from sdk/core/azure-core/test/ut/file_upload.cpp rename to sdk/core/azure-core/test/ut/transport_adapter_file_upload.cpp index 6b83af6ab..735b7ffd9 100644 --- a/sdk/core/azure-core/test/ut/file_upload.cpp +++ b/sdk/core/azure-core/test/ut/transport_adapter_file_upload.cpp @@ -46,7 +46,7 @@ namespace Azure { namespace Core { namespace Test { if (size > 0) { // only for known body size - EXPECT_EQ(bodyVector.size(), size); + EXPECT_EQ(bodySize, size); } if (expectedBody.size() > 0) @@ -81,7 +81,7 @@ namespace Azure { namespace Core { namespace Test { } } - TEST_F(TransportAdapter, customSizePutFromFile) + TEST_F(TransportAdapter, SizePutFromFile) { std::string host("http://httpbin.org/put"); std::string testDataPath(AZURE_TEST_DATA_PATH); @@ -116,7 +116,7 @@ namespace Azure { namespace Core { namespace Test { } } - TEST_F(TransportAdapter, customSizePutFromFileDefault) + TEST_F(TransportAdapter, SizePutFromFileDefault) { std::string host("http://httpbin.org/put"); std::string testDataPath(AZURE_TEST_DATA_PATH); @@ -150,7 +150,7 @@ namespace Azure { namespace Core { namespace Test { } } - TEST_F(TransportAdapter, customSizePutFromFileBiggerPage) + TEST_F(TransportAdapter, SizePutFromFileBiggerPage) { std::string host("http://httpbin.org/put"); std::string testDataPath(AZURE_TEST_DATA_PATH);