make curl transport adapter to request new connection if sending fails (#1721)

* make curl transport adapter to request new connection if sending fails
This commit is contained in:
Victor Vazquez 2021-03-11 02:02:04 +00:00 committed by GitHub
parent de0cb8a720
commit 33c95b09e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 111 additions and 28 deletions

View File

@ -46,6 +46,7 @@
### Bug Fixes
- Make sure to rewind the body stream at the start of each request retry attempt, including the first.
- Connection pool resets when all connections are closed.
- Fix `Azure::Context` to support unique_ptr.
## 1.0.0-beta.6 (2021-02-09)

View File

@ -15,6 +15,7 @@
#if defined(AZ_PLATFORM_POSIX)
#include <poll.h> // for poll()
#include <sys/socket.h> // for socket shutdown
#elif defined(AZ_PLATFORM_WINDOWS)
#include <winsock2.h> // for WSAPoll();
#endif
@ -163,23 +164,30 @@ std::unique_ptr<RawResponse> CurlTransport::Send(Request& request, Context const
request, CurlConnectionPool::GetCurlConnection(request, m_options), m_options.HttpKeepAlive);
CURLcode performing;
// Try to send the request. If we get CURLE_UNSUPPORTED_PROTOCOL back, it means the connection is
// either closed or the socket is not usable any more. In that case, let the session be destroyed
// and create a new session to get another connection from connection pool.
// Try to send the request. If we get CURLE_UNSUPPORTED_PROTOCOL/CURLE_SEND_ERROR back, it means
// the connection is either closed or the socket is not usable any more. In that case, let the
// session be destroyed and create a new session to get another connection from connection pool.
// Prevent from trying forever by using DefaultMaxOpenNewConnectionIntentsAllowed.
for (auto getConnectionOpenIntent = 0;
getConnectionOpenIntent < _detail::DefaultMaxOpenNewConnectionIntentsAllowed;
getConnectionOpenIntent++)
{
performing = session->Perform(context);
if (performing != CURLE_UNSUPPORTED_PROTOCOL)
if (performing != CURLE_UNSUPPORTED_PROTOCOL && performing != CURLE_SEND_ERROR)
{
break;
}
// Let session be destroyed and create a new one to get a new connection
// Let session be destroyed and request a new connection. If the number of
// request for connection has reached `RequestPoolResetAfterConnectionFailed`, ask the pool to
// clean (remove connections) and create a new one. This is because, keep getting connections
// that fail to perform means a general network disconnection where all connections in the pool
// won't be no longer valid.
session = std::make_unique<CurlSession>(
request,
CurlConnectionPool::GetCurlConnection(request, m_options),
CurlConnectionPool::GetCurlConnection(
request,
m_options,
getConnectionOpenIntent + 1 >= _detail::RequestPoolResetAfterConnectionFailed),
m_options.HttpKeepAlive);
}
@ -746,6 +754,16 @@ int64_t CurlSession::OnRead(uint8_t* buffer, int64_t count, Context const& conte
return totalRead;
}
void CurlConnection::Shutdown()
{
#if defined(AZ_PLATFORM_POSIX)
::shutdown(m_curlSocket, SHUT_RDWR);
#elif defined(AZ_PLATFORM_WINDOWS)
::shutdown(m_curlSocket, SD_BOTH);
#endif
m_isShutDown = true;
}
// Read from socket and return the number of bytes taken from socket
int64_t CurlConnection::ReadFromSocket(uint8_t* buffer, int64_t bufferSize, Context const& context)
{
@ -1052,7 +1070,7 @@ int64_t CurlSession::ResponseBufferParser::BuildHeader(
std::mutex CurlConnectionPool::ConnectionPoolMutex;
std::map<std::string, std::list<std::unique_ptr<CurlNetworkConnection>>>
CurlConnectionPool::ConnectionPoolIndex;
int32_t CurlConnectionPool::s_connectionCounter = 0;
uint64_t CurlConnectionPool::s_connectionCounter = 0;
bool CurlConnectionPool::s_isCleanConnectionsRunning = false;
namespace {
@ -1097,7 +1115,8 @@ inline std::string GetConnectionKey(std::string const& host, CurlTransportOption
std::unique_ptr<CurlNetworkConnection> CurlConnectionPool::GetCurlConnection(
Request& request,
CurlTransportOptions const& options)
CurlTransportOptions const& options,
bool resetPool)
{
std::string const& host = request.GetUrl().GetHost();
std::string const connectionKey = GetConnectionKey(host, options);
@ -1109,26 +1128,36 @@ std::unique_ptr<CurlNetworkConnection> CurlConnectionPool::GetCurlConnection(
// get a ref to the pool from the map of pools
auto hostPoolIndex = CurlConnectionPool::ConnectionPoolIndex.find(connectionKey);
if (hostPoolIndex != CurlConnectionPool::ConnectionPoolIndex.end()
&& hostPoolIndex->second.size() > 0)
{
// get ref to first connection
auto fistConnectionIterator = hostPoolIndex->second.begin();
// move the connection ref to temp ref
auto connection = std::move(*fistConnectionIterator);
// Remove the connection ref from list
hostPoolIndex->second.erase(fistConnectionIterator);
// reduce number of connections on the pool
CurlConnectionPool::s_connectionCounter -= 1;
// Remove index if there are no more connections
if (hostPoolIndex->second.size() == 0)
if (resetPool)
{
CurlConnectionPool::ConnectionPoolIndex.erase(hostPoolIndex);
// Remove all connections for the connection Key and move to spawn new connection below
CurlConnectionPool::s_connectionCounter -= hostPoolIndex->second.size();
hostPoolIndex->second.clear();
}
else
{
// get ref to first connection
auto fistConnectionIterator = hostPoolIndex->second.begin();
// move the connection ref to temp ref
auto connection = std::move(*fistConnectionIterator);
// Remove the connection ref from list
hostPoolIndex->second.erase(fistConnectionIterator);
// reduce number of connections on the pool
CurlConnectionPool::s_connectionCounter -= 1;
// return connection ref
return connection;
// Remove index if there are no more connections
if (hostPoolIndex->second.size() == 0)
{
CurlConnectionPool::ConnectionPoolIndex.erase(hostPoolIndex);
}
// return connection ref
return connection;
}
}
}
@ -1249,6 +1278,12 @@ void CurlConnectionPool::MoveConnectionBackToPool(
return;
}
if (connection->IsShutdown())
{
// Can't re-used a shut down connection
return;
}
// Lock mutex to access connection pool. mutex is unlock as soon as lock is out of scope
std::lock_guard<std::mutex> lock(CurlConnectionPool::ConnectionPoolMutex);
auto& poolId = connection->GetConnectionKey();
@ -1306,7 +1341,7 @@ void CurlConnectionPool::CleanUp()
// size() > 0 so we are safe to go end() - 1 and find the last element in the
// list
connection--;
if (connection->get()->isExpired())
if (connection->get()->IsExpired())
{
// remove connection from the pool and update the connection to the next one
// which is going to be list.end()

View File

@ -64,12 +64,17 @@ namespace Azure { namespace Core { namespace Http {
* @remark If there is not any available connection, a new connection is created.
*
* @param request HTTP request to get #Azure::Core::Http::CurlNetworkConnection for.
* @param options The connection settings which includes host name and libcurl handle specific
* configuration.
* @param resetPool Request the pool to remove all current connections for the provided options
* to force the creation of a new connection.
*
* @return #Azure::Core::Http::CurlNetworkConnection to use.
*/
static std::unique_ptr<CurlNetworkConnection> GetCurlConnection(
Request& request,
CurlTransportOptions const& options);
CurlTransportOptions const& options,
bool resetPool = false);
/**
* @brief Moves a connection back to the pool to be re-used.
@ -91,7 +96,7 @@ namespace Azure { namespace Core { namespace Http {
*/
static void CleanUp();
AZ_CORE_DLLEXPORT static int32_t s_connectionCounter;
AZ_CORE_DLLEXPORT static uint64_t s_connectionCounter;
AZ_CORE_DLLEXPORT static bool s_isCleanConnectionsRunning;
// Removes all connections and indexes
static void ClearIndex() { CurlConnectionPool::ConnectionPoolIndex.clear(); }

View File

@ -26,6 +26,9 @@ namespace Azure { namespace Core { namespace Http {
constexpr static const char* DefaultFailedToGetNewConnectionTemplate
= "Fail to get a new connection for: ";
constexpr static int DefaultMaxOpenNewConnectionIntentsAllowed = 10;
// After 3 connections are received from the pool and failed to send a request, the next
// connections would ask the pool to be clean and spawn new connection.
constexpr static int RequestPoolResetAfterConnectionFailed = 3;
// 90 sec -> cleaner wait time before next clean routine
constexpr static int DefaultCleanerIntervalMilliseconds = 1000 * 90;
// 60 sec -> expired connection is when it waits for 60 sec or more and it's not re-used
@ -40,6 +43,9 @@ namespace Azure { namespace Core { namespace Http {
*
*/
class CurlNetworkConnection {
protected:
bool m_isShutDown = false;
public:
/**
* @brief Allow derived classes calling a destructor.
@ -61,7 +67,7 @@ namespace Azure { namespace Core { namespace Http {
/**
* @brief Checks whether this CURL connection is expired.
*/
virtual bool isExpired() = 0;
virtual bool IsExpired() = 0;
/**
* @brief This function is used when working with streams to pull more data from the wire.
@ -77,6 +83,21 @@ namespace Azure { namespace Core { namespace Http {
*/
virtual CURLcode SendBuffer(uint8_t const* buffer, size_t bufferSize, Context const& context)
= 0;
/**
* @brief Set the connection into an invalid and unusable state.
*
* @remark A connection won't be returned to the connection pool if it was shut it down.
*
*/
virtual void Shutdown() { m_isShutDown = true; };
/**
* @brief Check if the the connection was shut it down.
*
* @return `true` is the connection was shut it down.
*/
bool IsShutdown() const { return m_isShutDown; };
};
/**
@ -138,7 +159,7 @@ namespace Azure { namespace Core { namespace Http {
* @brief Checks whether this CURL connection is expired.
* @return `true` if this connection is considered expired, `false` otherwise.
*/
bool isExpired() override
bool IsExpired() override
{
auto connectionOnWaitingTimeMs = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - this->m_lastUseTime);
@ -170,5 +191,7 @@ namespace Azure { namespace Core { namespace Http {
*/
CURLcode SendBuffer(uint8_t const* buffer, size_t bufferSize, Context const& context)
override;
void Shutdown() override;
};
}}} // namespace Azure::Core::Http

View File

@ -172,5 +172,24 @@ namespace Azure { namespace Core { namespace Test {
#endif
}
TEST(CurlConnectionPool, resiliencyOnConnectionClosed)
{
Azure::Core::Http::Request req(
Azure::Core::Http::HttpMethod::Get, Azure::Core::Http::Url("http://httpbin.org/get"));
Azure::Core::Http::CurlTransportOptions options;
auto connection = Azure::Core::Http::CurlConnectionPool::GetCurlConnection(req, options);
// Simulate connection lost (like server disconnection).
connection->Shutdown();
{
// Check that CURLE_SEND_ERROR is produced when trying to use the connection.
auto session = std::make_unique<Azure::Core::Http::CurlSession>(
req, std::move(connection), options.HttpKeepAlive);
auto r = session->Perform(Azure::Core::Context::GetApplicationContext());
EXPECT_EQ(CURLE_SEND_ERROR, r);
}
}
#endif
}}} // namespace Azure::Core::Test

View File

@ -39,7 +39,7 @@ namespace Azure { namespace Core { namespace Test {
public:
MOCK_METHOD(std::string const&, GetConnectionKey, (), (const, override));
MOCK_METHOD(void, updateLastUsageTime, (), (override));
MOCK_METHOD(bool, isExpired, (), (override));
MOCK_METHOD(bool, IsExpired, (), (override));
MOCK_METHOD(
int64_t,
ReadFromSocket,