From 33c95b09e93beccb23f1d84bd9aff752580b49d3 Mon Sep 17 00:00:00 2001 From: Victor Vazquez Date: Thu, 11 Mar 2021 02:02:04 +0000 Subject: [PATCH] make curl transport adapter to request new connection if sending fails (#1721) * make curl transport adapter to request new connection if sending fails --- sdk/core/azure-core/CHANGELOG.md | 1 + sdk/core/azure-core/src/http/curl/curl.cpp | 81 +++++++++++++------ .../curl/curl_connection_pool_private.hpp | 9 ++- .../src/http/curl/curl_connection_private.hpp | 27 ++++++- .../test/ut/curl_connection_pool.cpp | 19 +++++ sdk/core/azure-core/test/ut/curl_session.hpp | 2 +- 6 files changed, 111 insertions(+), 28 deletions(-) diff --git a/sdk/core/azure-core/CHANGELOG.md b/sdk/core/azure-core/CHANGELOG.md index 6ba433b54..ba6fef5ae 100644 --- a/sdk/core/azure-core/CHANGELOG.md +++ b/sdk/core/azure-core/CHANGELOG.md @@ -46,6 +46,7 @@ ### Bug Fixes - Make sure to rewind the body stream at the start of each request retry attempt, including the first. +- Connection pool resets when all connections are closed. - Fix `Azure::Context` to support unique_ptr. ## 1.0.0-beta.6 (2021-02-09) diff --git a/sdk/core/azure-core/src/http/curl/curl.cpp b/sdk/core/azure-core/src/http/curl/curl.cpp index afba6a005..43763234d 100644 --- a/sdk/core/azure-core/src/http/curl/curl.cpp +++ b/sdk/core/azure-core/src/http/curl/curl.cpp @@ -15,6 +15,7 @@ #if defined(AZ_PLATFORM_POSIX) #include // for poll() +#include // for socket shutdown #elif defined(AZ_PLATFORM_WINDOWS) #include // for WSAPoll(); #endif @@ -163,23 +164,30 @@ std::unique_ptr CurlTransport::Send(Request& request, Context const request, CurlConnectionPool::GetCurlConnection(request, m_options), m_options.HttpKeepAlive); CURLcode performing; - // Try to send the request. If we get CURLE_UNSUPPORTED_PROTOCOL back, it means the connection is - // either closed or the socket is not usable any more. In that case, let the session be destroyed - // and create a new session to get another connection from connection pool. + // Try to send the request. If we get CURLE_UNSUPPORTED_PROTOCOL/CURLE_SEND_ERROR back, it means + // the connection is either closed or the socket is not usable any more. In that case, let the + // session be destroyed and create a new session to get another connection from connection pool. // Prevent from trying forever by using DefaultMaxOpenNewConnectionIntentsAllowed. for (auto getConnectionOpenIntent = 0; getConnectionOpenIntent < _detail::DefaultMaxOpenNewConnectionIntentsAllowed; getConnectionOpenIntent++) { performing = session->Perform(context); - if (performing != CURLE_UNSUPPORTED_PROTOCOL) + if (performing != CURLE_UNSUPPORTED_PROTOCOL && performing != CURLE_SEND_ERROR) { break; } - // Let session be destroyed and create a new one to get a new connection + // Let session be destroyed and request a new connection. If the number of + // request for connection has reached `RequestPoolResetAfterConnectionFailed`, ask the pool to + // clean (remove connections) and create a new one. This is because, keep getting connections + // that fail to perform means a general network disconnection where all connections in the pool + // won't be no longer valid. session = std::make_unique( request, - CurlConnectionPool::GetCurlConnection(request, m_options), + CurlConnectionPool::GetCurlConnection( + request, + m_options, + getConnectionOpenIntent + 1 >= _detail::RequestPoolResetAfterConnectionFailed), m_options.HttpKeepAlive); } @@ -746,6 +754,16 @@ int64_t CurlSession::OnRead(uint8_t* buffer, int64_t count, Context const& conte return totalRead; } +void CurlConnection::Shutdown() +{ +#if defined(AZ_PLATFORM_POSIX) + ::shutdown(m_curlSocket, SHUT_RDWR); +#elif defined(AZ_PLATFORM_WINDOWS) + ::shutdown(m_curlSocket, SD_BOTH); +#endif + m_isShutDown = true; +} + // Read from socket and return the number of bytes taken from socket int64_t CurlConnection::ReadFromSocket(uint8_t* buffer, int64_t bufferSize, Context const& context) { @@ -1052,7 +1070,7 @@ int64_t CurlSession::ResponseBufferParser::BuildHeader( std::mutex CurlConnectionPool::ConnectionPoolMutex; std::map>> CurlConnectionPool::ConnectionPoolIndex; -int32_t CurlConnectionPool::s_connectionCounter = 0; +uint64_t CurlConnectionPool::s_connectionCounter = 0; bool CurlConnectionPool::s_isCleanConnectionsRunning = false; namespace { @@ -1097,7 +1115,8 @@ inline std::string GetConnectionKey(std::string const& host, CurlTransportOption std::unique_ptr CurlConnectionPool::GetCurlConnection( Request& request, - CurlTransportOptions const& options) + CurlTransportOptions const& options, + bool resetPool) { std::string const& host = request.GetUrl().GetHost(); std::string const connectionKey = GetConnectionKey(host, options); @@ -1109,26 +1128,36 @@ std::unique_ptr CurlConnectionPool::GetCurlConnection( // get a ref to the pool from the map of pools auto hostPoolIndex = CurlConnectionPool::ConnectionPoolIndex.find(connectionKey); + if (hostPoolIndex != CurlConnectionPool::ConnectionPoolIndex.end() && hostPoolIndex->second.size() > 0) { - // get ref to first connection - auto fistConnectionIterator = hostPoolIndex->second.begin(); - // move the connection ref to temp ref - auto connection = std::move(*fistConnectionIterator); - // Remove the connection ref from list - hostPoolIndex->second.erase(fistConnectionIterator); - // reduce number of connections on the pool - CurlConnectionPool::s_connectionCounter -= 1; - - // Remove index if there are no more connections - if (hostPoolIndex->second.size() == 0) + if (resetPool) { - CurlConnectionPool::ConnectionPoolIndex.erase(hostPoolIndex); + // Remove all connections for the connection Key and move to spawn new connection below + CurlConnectionPool::s_connectionCounter -= hostPoolIndex->second.size(); + hostPoolIndex->second.clear(); } + else + { + // get ref to first connection + auto fistConnectionIterator = hostPoolIndex->second.begin(); + // move the connection ref to temp ref + auto connection = std::move(*fistConnectionIterator); + // Remove the connection ref from list + hostPoolIndex->second.erase(fistConnectionIterator); + // reduce number of connections on the pool + CurlConnectionPool::s_connectionCounter -= 1; - // return connection ref - return connection; + // Remove index if there are no more connections + if (hostPoolIndex->second.size() == 0) + { + CurlConnectionPool::ConnectionPoolIndex.erase(hostPoolIndex); + } + + // return connection ref + return connection; + } } } @@ -1249,6 +1278,12 @@ void CurlConnectionPool::MoveConnectionBackToPool( return; } + if (connection->IsShutdown()) + { + // Can't re-used a shut down connection + return; + } + // Lock mutex to access connection pool. mutex is unlock as soon as lock is out of scope std::lock_guard lock(CurlConnectionPool::ConnectionPoolMutex); auto& poolId = connection->GetConnectionKey(); @@ -1306,7 +1341,7 @@ void CurlConnectionPool::CleanUp() // size() > 0 so we are safe to go end() - 1 and find the last element in the // list connection--; - if (connection->get()->isExpired()) + if (connection->get()->IsExpired()) { // remove connection from the pool and update the connection to the next one // which is going to be list.end() diff --git a/sdk/core/azure-core/src/http/curl/curl_connection_pool_private.hpp b/sdk/core/azure-core/src/http/curl/curl_connection_pool_private.hpp index 329a1d2fd..c67f44716 100644 --- a/sdk/core/azure-core/src/http/curl/curl_connection_pool_private.hpp +++ b/sdk/core/azure-core/src/http/curl/curl_connection_pool_private.hpp @@ -64,12 +64,17 @@ namespace Azure { namespace Core { namespace Http { * @remark If there is not any available connection, a new connection is created. * * @param request HTTP request to get #Azure::Core::Http::CurlNetworkConnection for. + * @param options The connection settings which includes host name and libcurl handle specific + * configuration. + * @param resetPool Request the pool to remove all current connections for the provided options + * to force the creation of a new connection. * * @return #Azure::Core::Http::CurlNetworkConnection to use. */ static std::unique_ptr GetCurlConnection( Request& request, - CurlTransportOptions const& options); + CurlTransportOptions const& options, + bool resetPool = false); /** * @brief Moves a connection back to the pool to be re-used. @@ -91,7 +96,7 @@ namespace Azure { namespace Core { namespace Http { */ static void CleanUp(); - AZ_CORE_DLLEXPORT static int32_t s_connectionCounter; + AZ_CORE_DLLEXPORT static uint64_t s_connectionCounter; AZ_CORE_DLLEXPORT static bool s_isCleanConnectionsRunning; // Removes all connections and indexes static void ClearIndex() { CurlConnectionPool::ConnectionPoolIndex.clear(); } diff --git a/sdk/core/azure-core/src/http/curl/curl_connection_private.hpp b/sdk/core/azure-core/src/http/curl/curl_connection_private.hpp index 60174ceb9..0c11cec64 100644 --- a/sdk/core/azure-core/src/http/curl/curl_connection_private.hpp +++ b/sdk/core/azure-core/src/http/curl/curl_connection_private.hpp @@ -26,6 +26,9 @@ namespace Azure { namespace Core { namespace Http { constexpr static const char* DefaultFailedToGetNewConnectionTemplate = "Fail to get a new connection for: "; constexpr static int DefaultMaxOpenNewConnectionIntentsAllowed = 10; + // After 3 connections are received from the pool and failed to send a request, the next + // connections would ask the pool to be clean and spawn new connection. + constexpr static int RequestPoolResetAfterConnectionFailed = 3; // 90 sec -> cleaner wait time before next clean routine constexpr static int DefaultCleanerIntervalMilliseconds = 1000 * 90; // 60 sec -> expired connection is when it waits for 60 sec or more and it's not re-used @@ -40,6 +43,9 @@ namespace Azure { namespace Core { namespace Http { * */ class CurlNetworkConnection { + protected: + bool m_isShutDown = false; + public: /** * @brief Allow derived classes calling a destructor. @@ -61,7 +67,7 @@ namespace Azure { namespace Core { namespace Http { /** * @brief Checks whether this CURL connection is expired. */ - virtual bool isExpired() = 0; + virtual bool IsExpired() = 0; /** * @brief This function is used when working with streams to pull more data from the wire. @@ -77,6 +83,21 @@ namespace Azure { namespace Core { namespace Http { */ virtual CURLcode SendBuffer(uint8_t const* buffer, size_t bufferSize, Context const& context) = 0; + + /** + * @brief Set the connection into an invalid and unusable state. + * + * @remark A connection won't be returned to the connection pool if it was shut it down. + * + */ + virtual void Shutdown() { m_isShutDown = true; }; + + /** + * @brief Check if the the connection was shut it down. + * + * @return `true` is the connection was shut it down. + */ + bool IsShutdown() const { return m_isShutDown; }; }; /** @@ -138,7 +159,7 @@ namespace Azure { namespace Core { namespace Http { * @brief Checks whether this CURL connection is expired. * @return `true` if this connection is considered expired, `false` otherwise. */ - bool isExpired() override + bool IsExpired() override { auto connectionOnWaitingTimeMs = std::chrono::duration_cast( std::chrono::steady_clock::now() - this->m_lastUseTime); @@ -170,5 +191,7 @@ namespace Azure { namespace Core { namespace Http { */ CURLcode SendBuffer(uint8_t const* buffer, size_t bufferSize, Context const& context) override; + + void Shutdown() override; }; }}} // namespace Azure::Core::Http diff --git a/sdk/core/azure-core/test/ut/curl_connection_pool.cpp b/sdk/core/azure-core/test/ut/curl_connection_pool.cpp index 86ab67a0e..5efa5f7c7 100644 --- a/sdk/core/azure-core/test/ut/curl_connection_pool.cpp +++ b/sdk/core/azure-core/test/ut/curl_connection_pool.cpp @@ -172,5 +172,24 @@ namespace Azure { namespace Core { namespace Test { #endif } + TEST(CurlConnectionPool, resiliencyOnConnectionClosed) + { + Azure::Core::Http::Request req( + Azure::Core::Http::HttpMethod::Get, Azure::Core::Http::Url("http://httpbin.org/get")); + + Azure::Core::Http::CurlTransportOptions options; + auto connection = Azure::Core::Http::CurlConnectionPool::GetCurlConnection(req, options); + // Simulate connection lost (like server disconnection). + connection->Shutdown(); + + { + // Check that CURLE_SEND_ERROR is produced when trying to use the connection. + auto session = std::make_unique( + req, std::move(connection), options.HttpKeepAlive); + auto r = session->Perform(Azure::Core::Context::GetApplicationContext()); + EXPECT_EQ(CURLE_SEND_ERROR, r); + } + } + #endif }}} // namespace Azure::Core::Test diff --git a/sdk/core/azure-core/test/ut/curl_session.hpp b/sdk/core/azure-core/test/ut/curl_session.hpp index 4fcbbf1e6..e386287e6 100644 --- a/sdk/core/azure-core/test/ut/curl_session.hpp +++ b/sdk/core/azure-core/test/ut/curl_session.hpp @@ -39,7 +39,7 @@ namespace Azure { namespace Core { namespace Test { public: MOCK_METHOD(std::string const&, GetConnectionKey, (), (const, override)); MOCK_METHOD(void, updateLastUsageTime, (), (override)); - MOCK_METHOD(bool, isExpired, (), (override)); + MOCK_METHOD(bool, IsExpired, (), (override)); MOCK_METHOD( int64_t, ReadFromSocket,