Clean routine (#530)

* adding cleaner routine
This commit is contained in:
Victor Vazquez 2020-08-26 23:51:24 +00:00 committed by GitHub
parent 2c84e3f094
commit 1f0da6fcc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 323 additions and 105 deletions

View File

@ -9,10 +9,14 @@ option(WARNINGS_AS_ERRORS "Treat compiler warnings as errors" ON)
option(BUILD_CURL_TRANSPORT "Build internal http transport implementation with CURL for HTTP Pipeline" OFF)
option(BUILD_TESTING "Build test cases" OFF)
option(BUILD_DOCUMENTATION "Create HTML based API documentation (requires Doxygen)" OFF)
option(RUN_LONG_UNIT_TESTS "Tests that takes more than 5 minutes to complete. No effect if BUILD_TESTING is OFF" OFF)
if(BUILD_TESTING)
# define a symbol that enables some test hooks in code
add_compile_definitions(TESTING_BUILD)
if(RUN_LONG_UNIT_TESTS)
add_compile_definitions(RUN_LONG_UNIT_TESTS)
endif()
endif()
# VCPKG Integration

View File

@ -35,26 +35,26 @@ jobs:
OSVmImage: 'ubuntu-18.04'
VcpkgInstall: 'curl[ssl] libxml2 openssl'
VCPKG_DEFAULT_TRIPLET: 'x64-linux'
CmakeArgs: ' -DBUILD_TESTING=ON'
CmakeArgs: ' -DBUILD_TESTING=ON -DRUN_LONG_UNIT_TESTS=ON'
Win_x86_with_unit_test:
OSVmImage: 'windows-2019'
VcpkgInstall: 'curl[winssl] libxml2'
VCPKG_DEFAULT_TRIPLET: 'x86-windows-static'
CMAKE_GENERATOR: 'Visual Studio 16 2019'
CMAKE_GENERATOR_PLATFORM: Win32
CmakeArgs: ' -DBUILD_TESTING=ON'
CmakeArgs: ' -DBUILD_TESTING=ON -DRUN_LONG_UNIT_TESTS=ON'
Win_x64_with_unit_test:
OSVmImage: 'windows-2019'
VcpkgInstall: 'curl[winssl] libxml2'
VCPKG_DEFAULT_TRIPLET: 'x64-windows-static'
CMAKE_GENERATOR: 'Visual Studio 16 2019'
CMAKE_GENERATOR_PLATFORM: x64
CmakeArgs: ' -DBUILD_TESTING=ON'
CmakeArgs: ' -DBUILD_TESTING=ON -DRUN_LONG_UNIT_TESTS=ON'
MacOS_x64_with_unit_test:
OSVmImage: 'macOS-10.14'
VcpkgInstall: 'curl[ssl] libxml2 openssl'
VCPKG_DEFAULT_TRIPLET: 'x64-osx'
CmakeArgs: ' -DBUILD_TESTING=ON'
CmakeArgs: ' -DBUILD_TESTING=ON -DRUN_LONG_UNIT_TESTS=ON'
pool:
vmImage: $(OSVmImage)
variables:

View File

@ -8,6 +8,7 @@
#include "azure/core/http/http.hpp"
#include "azure/core/http/policy.hpp"
#include <chrono>
#include <curl/curl.h>
#include <list>
#include <map>
@ -15,6 +16,12 @@
#include <type_traits>
#include <vector>
// Define the class name that reads from ConnectionPool private members
namespace Azure { namespace Core { namespace Test {
class TransportAdapter_ConnectionPoolCleaner_Test;
class TransportAdapter_getMultiThread_Test;
}}} // namespace Azure::Core::Test
namespace Azure { namespace Core { namespace Http {
namespace Details {
@ -26,26 +33,17 @@ namespace Azure { namespace Core { namespace Http {
constexpr static const char* c_DefaultFailedToGetNewConnectionTemplate
= "Fail to get a new connection for: ";
constexpr static int c_DefaultMaxOpenNewConnectionIntentsAllowed = 10;
// 90 sec -> cleaner wait time before next clean routine
constexpr static int c_DefaultCleanerIntervalMilliseconds = 1000 * 90;
// 60 sec -> expired connection is when it waits for 60 sec or more and it's not re-used
constexpr static int c_DefaultConnectionExpiredMilliseconds = 1000 * 60;
} // namespace Details
/**
* @brief Statefull component that controls sending an HTTP Request with libcurl thru the wire and
* parsing and building an HTTP RawResponse.
* This session supports the classic libcurl easy interface to send and receive bytes from network
* using callbacks.
* This session also supports working with the custom HTTP protocol option from libcurl to
* manually upload and download bytes using a network socket. This implementation is used when
* working with streams so customers can lazily pull data from netwok using an stream abstraction.
*
* @remarks This component is expected to be used by an HTTP Transporter to ensure that
* transporter to be re usuable in multiple pipelines while every call to network is unique.
*/
class CurlSession : public BodyStream {
// connection handle. It will be taken from a pool
class CurlConnection {
private:
CURL* m_handle;
std::string m_host;
std::chrono::steady_clock::time_point m_lastUseTime;
public:
CurlConnection(std::string const& host) : m_handle(curl_easy_init()), m_host(host) {}
@ -55,9 +53,28 @@ namespace Azure { namespace Core { namespace Http {
CURL* GetHandle() { return this->m_handle; }
std::string GetHost() const { return this->m_host; }
void updateLastUsageTime() { this->m_lastUseTime = std::chrono::steady_clock::now(); }
bool isExpired()
{
auto connectionOnWaitingTimeMs = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - this->m_lastUseTime);
return connectionOnWaitingTimeMs.count() >= Details::c_DefaultConnectionExpiredMilliseconds;
}
};
// TODO: Mutex for this code to access connectionPoolIndex
struct CurlConnectionPool
{
// Give access to private to this tests class
friend class Azure::Core::Test::TransportAdapter_getMultiThread_Test;
friend class Azure::Core::Test::TransportAdapter_ConnectionPoolCleaner_Test;
/*
* Mutex for accessing connection pool for thread-safe reading and writing
*/
static std::mutex s_connectionPoolMutex;
/*
* Keeps an unique key for each host and creates a connection pool for each key.
* This way getting a connection for an specific host can be done in O(1) instead of looping a
@ -66,13 +83,67 @@ namespace Azure { namespace Core { namespace Http {
* There might be multiple connections for each host.
*/
static std::map<std::string, std::list<std::unique_ptr<CurlConnection>>> s_connectionPoolIndex;
/*
* Finds a connection to be re-used from the connection pool. If there is not any available
* connection, a new connection is created.
*/
static std::unique_ptr<CurlConnection> GetCurlConnection(Request& request);
static void MoveConnectionBackToPool(std::unique_ptr<CurlSession::CurlConnection> connection);
static std::mutex s_connectionPoolMutex;
/**
* Moves a connection back to the pool to be re-used
*/
static void MoveConnectionBackToPool(std::unique_ptr<CurlConnection> connection);
// Class can't have instances.
CurlConnectionPool() = delete;
private:
/**
* @brief Enum used by ResponseBufferParser to control the parsing internal state while building
* Review all connections in the pool and removes old connections that might be already
* expired and closed its connection on server side.
*/
static void CleanUp();
static int32_t s_connectionCounter;
static bool s_isCleanConnectionsRunning;
// Makes possible to know the number of current connections in the connection pool for an
// index
static int64_t ConnectionsOnPool(std::string const& host)
{
auto& pool = CurlConnectionPool::s_connectionPoolIndex[host];
return pool.size();
};
// Makes possible to know the number indexes in the pool
static int64_t ConnectionsIndexOnPool()
{
return CurlConnectionPool::s_connectionPoolIndex.size();
};
};
/**
* @brief Stateful component that controls sending an HTTP Request with libcurl thru the wire
* and parsing and building an HTTP RawResponse. This session supports the classic libcurl easy
* interface to send and receive bytes from network using callbacks. This session also supports
* working with the custom HTTP protocol option from libcurl to manually upload and download
* bytes using a network socket. This implementation is used when working with streams so
* customers can lazily pull data from network using an stream abstraction.
*
* @remarks This component is expected to be used by an HTTP Transporter to ensure that
* transporter to be reusable in multiple pipelines while every call to network is unique.
*/
class CurlSession : public BodyStream {
private:
/*
* Static Connection pool for the application. Multiple CurlSessions will use the connection
* pool for getting a curl handle connection.
*
*/
/*
* Enum used by ResponseBufferParser to control the parsing internal state while building
* the HTTP RawResponse
*
*/
@ -87,13 +158,13 @@ namespace Azure { namespace Core { namespace Http {
* @brief stateful component used to read and parse a buffer to construct a valid HTTP
* RawResponse.
*
* It uses an internal string as buffers to accumulate a response token (version, code, header,
* etc) until the next delimiter is found. Then it uses this string to keep building the HTTP
* RawResponse.
* It uses an internal string as buffers to accumulate a response token (version, code,
* header, etc) until the next delimiter is found. Then it uses this string to keep building
* the HTTP RawResponse.
*
* @remark Only status line and headers are parsed and built. Body is ignored by this component.
* A libcurl session will use this component to build and return the HTTP RawResponse with a
* body stream to the pipeline.
* @remark Only status line and headers are parsed and built. Body is ignored by this
* component. A libcurl session will use this component to build and return the HTTP
* RawResponse with a body stream to the pipeline.
*/
class ResponseBufferParser {
private:
@ -104,8 +175,8 @@ namespace Azure { namespace Core { namespace Http {
ResponseParserState state;
/**
* @brief Unique ptr to a response. Parser will create an Initial-valid HTTP RawResponse and
* then it will append headers to it. This response is moved to a different owner once parsing
* is completed.
* then it will append headers to it. This response is moved to a different owner once
* parsing is completed.
*
*/
std::unique_ptr<RawResponse> m_response;
@ -124,16 +195,17 @@ namespace Azure { namespace Core { namespace Http {
* the token for the HTTP RawResponse is taken from this internal sting if it contains data.
*
* @remark This buffer allows a libcurl session to use any size of buffer to read from a
* socket while constructing an initial valid HTTP RawResponse. No matter if the response from
* wire contains hundreds of headers, we can use only one fixed size buffer to parse it all.
* socket while constructing an initial valid HTTP RawResponse. No matter if the response
* from wire contains hundreds of headers, we can use only one fixed size buffer to parse it
* all.
*
*/
std::string m_internalBuffer;
/**
* @brief This method is invoked by the Parsing process if the internal state is set to status
* code. Function will get the status-line expected tokens until finding the end of status
* line delimiter.
* @brief This method is invoked by the Parsing process if the internal state is set to
* status code. Function will get the status-line expected tokens until finding the end of
* status line delimiter.
*
* @remark When the end of status line delimiter is found, this method will create the HTTP
* RawResponse. The HTTP RawResponse is constructed by default with body type as Stream.
@ -151,8 +223,9 @@ namespace Azure { namespace Core { namespace Http {
*
* @param buffer Points to a memory address with all or some part of a HTTP header.
* @param bufferSize Indicates the size of the buffer.
* @return Returns the index of the last parsed position from buffer. When the returned value
* is smaller than the body size, means there is part of the body response in the buffer.
* @return Returns the index of the last parsed position from buffer. When the returned
* value is smaller than the body size, means there is part of the body response in the
* buffer.
*/
int64_t BuildHeader(uint8_t const* const buffer, int64_t const bufferSize);
@ -170,17 +243,18 @@ namespace Azure { namespace Core { namespace Http {
}
// Parse contents of buffer to construct HttpResponse. Returns the index of the last parsed
// possition. Return bufferSize when all buffer was used to parse
// position. Return bufferSize when all buffer was used to parse
/**
* @brief Parses the content of a buffer to constuct a valid HTTP RawResponse. This method is
* expected to be called over and over until it returns 0, indicating there is nothing more to
* parse to build the HTTP RawResponse.
* @brief Parses the content of a buffer to construct a valid HTTP RawResponse. This method
* is expected to be called over and over until it returns 0, indicating there is nothing
* more to parse to build the HTTP RawResponse.
*
* @param buffer points to a memory area that contains, all or some part of an HTTP response.
* @param buffer points to a memory area that contains, all or some part of an HTTP
* response.
* @param bufferSize Indicates the size of the buffer.
* @return Returns the index of the last parsed position. Returning a 0 means nothing was
* parsed and it is likely that the HTTP RawResponse is completed. Returning the same value as
* the buffer size means all buffer was parsed and the HTTP might be completed or not.
* parsed and it is likely that the HTTP RawResponse is completed. Returning the same value
* as the buffer size means all buffer was parsed and the HTTP might be completed or not.
* Returning a value smaller than the buffer size will likely indicate that the HTTP
* RawResponse is completed and that the rest of the buffer contains part of the response
* body.
@ -197,8 +271,8 @@ namespace Azure { namespace Core { namespace Http {
/**
* @brief Moves the internal response to a different owner.
*
* @return Will move the response only if parsing is completed and if the HTTP RawResponse was
* not moved before.
* @return Will move the response only if parsing is completed and if the HTTP RawResponse
* was not moved before.
*/
std::unique_ptr<RawResponse> GetResponse()
{
@ -232,8 +306,8 @@ namespace Azure { namespace Core { namespace Http {
Request& m_request;
/**
* @brief Controls the progress of a body buffer upload when using libcurl callbacks. Woks like
* an offset to move the pointer to read the body from the HTTP Request on each callback.
* @brief Controls the progress of a body buffer upload when using libcurl callbacks. Woks
* like an offset to move the pointer to read the body from the HTTP Request on each callback.
*
*/
int64_t m_uploadedBytes;
@ -257,9 +331,9 @@ namespace Azure { namespace Core { namespace Http {
bool m_isChunkedResponseType;
/**
* @brief This is a copy of the value of an HTTP response header `content-length`. The value is
* received as string and parsed to size_t. This field avoid parsing the string header everytime
* from HTTP RawResponse.
* @brief This is a copy of the value of an HTTP response header `content-length`. The value
* is received as string and parsed to size_t. This field avoid parsing the string header
* every time from HTTP RawResponse.
*
* @remark This value is also used to avoid trying to read more data from network than what we
* are expecting to.
@ -268,8 +342,8 @@ namespace Azure { namespace Core { namespace Http {
int64_t m_contentLength;
/**
* @brief For chunked responses, this field knows the size of the current chuck size server will
* de sending
* @brief For chunked responses, this field knows the size of the current chuck size server
* will de sending
*
*/
int64_t m_chunkSize;
@ -285,8 +359,8 @@ namespace Azure { namespace Core { namespace Http {
uint8_t m_readBuffer[Details::c_DefaultLibcurlReaderSize]; // to work with libcurl custom read.
/**
* @brief convenient function that indicates when the HTTP Request will need to upload a payload
* or not.
* @brief convenient function that indicates when the HTTP Request will need to upload a
* payload or not.
*
* @return true if the HTTP Request will need to upload bytes to wire.
*
@ -328,12 +402,12 @@ namespace Azure { namespace Core { namespace Http {
CURLcode SetReadRequest();
/**
* @brief Function used when working with Streams to manually write from the HTTP Request to the
* wire.
* @brief Function used when working with Streams to manually write from the HTTP Request to
* the wire.
*
* @return CURL_OK when response is sent successfully.
*/
CURLcode HttpRawSend(Context const& context);
CURLcode SendRawHttp(Context const& context);
CURLcode UploadBody(Context const& context);
/**
@ -372,7 +446,7 @@ namespace Azure { namespace Core { namespace Http {
* @return return the numbers of bytes pulled from socket. It can be less than what it was
* requested.
*/
int64_t ReadSocketToBuffer(uint8_t* buffer, int64_t bufferSize);
int64_t ReadFromSocket(uint8_t* buffer, int64_t bufferSize);
bool IsEOF()
{
@ -381,15 +455,6 @@ namespace Azure { namespace Core { namespace Http {
}
public:
#ifdef TESTING_BUILD
// Makes possible to know the number of current connections in the connection pool
static int64_t s_ConnectionsOnPool(std::string const& host)
{
auto& pool = s_connectionPoolIndex[host];
return pool.size();
};
#endif
/**
* @brief Construct a new Curl Session object. Init internal libcurl handler.
*
@ -397,7 +462,7 @@ namespace Azure { namespace Core { namespace Http {
*/
CurlSession(Request& request) : m_request(request)
{
this->m_connection = GetCurlConnection(this->m_request);
this->m_connection = CurlConnectionPool::GetCurlConnection(this->m_request);
this->m_bodyStartInBuffer = -1;
this->m_innerBufferSize = Details::c_DefaultLibcurlReaderSize;
this->m_isChunkedResponseType = false;
@ -414,7 +479,7 @@ namespace Azure { namespace Core { namespace Http {
// destructor to clean libcurl handle and close the connection.
if (IsEOF())
{
MoveConnectionBackToPool(std::move(this->m_connection));
CurlConnectionPool::MoveConnectionBackToPool(std::move(this->m_connection));
}
}

View File

@ -1,11 +1,12 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
#include "azure/core/azure.hpp"
#include "azure/core/http/curl/curl.hpp"
#include "azure/core/azure.hpp"
#include "azure/core/http/http.hpp"
#include <string>
#include <thread>
using namespace Azure::Core::Http;
@ -92,7 +93,7 @@ CURLcode CurlSession::Perform(Context const& context)
// Send request. If the connection assigned to this curlSession is closed or the socket is
// somehow lost, libcurl will return CURLE_UNSUPPORTED_PROTOCOL
// (https://curl.haxx.se/libcurl/c/curl_easy_send.html). Return the error back.
result = HttpRawSend(context);
result = SendRawHttp(context);
if (result != CURLE_OK)
{
return result;
@ -265,7 +266,7 @@ CURLcode CurlSession::UploadBody(Context const& context)
}
// custom sending to wire an http request
CURLcode CurlSession::HttpRawSend(Context const& context)
CURLcode CurlSession::SendRawHttp(Context const& context)
{
// something like GET /path HTTP1.0 \r\nheaders\r\n
auto rawRequest = this->m_request.GetHTTPMessagePreBody();
@ -310,7 +311,7 @@ void CurlSession::ParseChunkSize()
if (index + 1 == this->m_innerBufferSize)
{ // on last index. Whatever we read is the BodyStart here
this->m_innerBufferSize
= ReadSocketToBuffer(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
= ReadFromSocket(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
this->m_bodyStartInBuffer = 0;
}
else
@ -325,7 +326,7 @@ void CurlSession::ParseChunkSize()
if (keepPolling)
{ // Read all internal buffer and \n was not found, pull from wire
this->m_innerBufferSize
= ReadSocketToBuffer(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
= ReadFromSocket(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
this->m_bodyStartInBuffer = 0;
}
}
@ -343,7 +344,7 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse()
{
// Try to fill internal buffer from socket.
// If response is smaller than buffer, we will get back the size of the response
bufferSize = ReadSocketToBuffer(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
bufferSize = ReadFromSocket(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
// returns the number of bytes parsed up to the body Start
auto bytesParsed = parser.Parse(this->m_readBuffer, static_cast<size_t>(bufferSize));
@ -398,7 +399,7 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse()
if (this->m_bodyStartInBuffer == -1)
{ // if nothing on inner buffer, pull from wire
this->m_innerBufferSize
= ReadSocketToBuffer(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
= ReadFromSocket(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
this->m_bodyStartInBuffer = 0;
}
@ -438,7 +439,7 @@ int64_t CurlSession::Read(Azure::Core::Context const& context, uint8_t* buffer,
else
{ // end of buffer, pull data from wire
this->m_innerBufferSize
= ReadSocketToBuffer(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
= ReadFromSocket(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
this->m_bodyStartInBuffer = 1; // jump first char (could be \r or \n)
}
}
@ -495,14 +496,14 @@ int64_t CurlSession::Read(Azure::Core::Context const& context, uint8_t* buffer,
// Read from socket when no more data on internal buffer
// For chunk request, read a chunk based on chunk size
totalRead = ReadSocketToBuffer(buffer, static_cast<size_t>(readRequestLength));
totalRead = ReadFromSocket(buffer, static_cast<size_t>(readRequestLength));
this->m_sessionTotalRead += totalRead;
return totalRead;
}
// Read from socket and return the number of bytes taken from socket
int64_t CurlSession::ReadSocketToBuffer(uint8_t* buffer, int64_t bufferSize)
int64_t CurlSession::ReadFromSocket(uint8_t* buffer, int64_t bufferSize)
{
// loop until read result is not CURLE_AGAIN
size_t readBytes = 0;
@ -779,29 +780,41 @@ int64_t CurlSession::ResponseBufferParser::BuildHeader(
return indexOfEndOfStatusLine + 1 - buffer;
}
std::mutex CurlSession::s_connectionPoolMutex;
std::map<std::string, std::list<std::unique_ptr<CurlSession::CurlConnection>>>
CurlSession::s_connectionPoolIndex;
std::mutex CurlConnectionPool::s_connectionPoolMutex;
std::map<std::string, std::list<std::unique_ptr<CurlConnection>>>
CurlConnectionPool::s_connectionPoolIndex;
int32_t CurlConnectionPool::s_connectionCounter = 0;
bool CurlConnectionPool::s_isCleanConnectionsRunning = false;
std::unique_ptr<CurlSession::CurlConnection> CurlSession::GetCurlConnection(Request& request)
std::unique_ptr<CurlConnection> CurlConnectionPool::GetCurlConnection(Request& request)
{
std::string const& host = request.GetHost();
{
// Critical section. Needs to own s_connectionPoolMutex before executing
// Lock mutex to access connection pool. mutex is unlock as soon as lock is out of scope
std::lock_guard<std::mutex> lock(s_connectionPoolMutex);
std::lock_guard<std::mutex> lock(CurlConnectionPool::s_connectionPoolMutex);
// get a ref to the pool from the map of pools
auto& hostPool = s_connectionPoolIndex[host];
if (hostPool.size() > 0)
auto hostPoolIndex = CurlConnectionPool::s_connectionPoolIndex.find(host);
if (hostPoolIndex != CurlConnectionPool::s_connectionPoolIndex.end()
&& hostPoolIndex->second.size() > 0)
{
// get ref to first connection
auto fistConnectionIterator = hostPool.begin();
auto fistConnectionIterator = hostPoolIndex->second.begin();
// move the connection ref to temp ref
auto connection = std::move(*fistConnectionIterator);
// Remove the connection ref from list
hostPool.erase(fistConnectionIterator);
hostPoolIndex->second.erase(fistConnectionIterator);
// reduce number of connections on the pool
CurlConnectionPool::s_connectionCounter -= 1;
// Remove index if there are no more connections
if (CurlConnectionPool::s_connectionPoolIndex.size() == 0)
{
CurlConnectionPool::s_connectionPoolIndex.erase(hostPoolIndex);
}
// return connection ref
return connection;
}
@ -850,10 +863,88 @@ std::unique_ptr<CurlSession::CurlConnection> CurlSession::GetCurlConnection(Requ
// Move the connection back to the connection pool. Push it to the front so it becomes the first
// connection to be picked next time some one ask for a connection to the pool (LIFO)
void CurlSession::MoveConnectionBackToPool(std::unique_ptr<CurlSession::CurlConnection> connection)
void CurlConnectionPool::MoveConnectionBackToPool(std::unique_ptr<CurlConnection> connection)
{
// Lock mutex to access connection pool. mutex is unlock as soon as lock is out of scope
std::lock_guard<std::mutex> lock(s_connectionPoolMutex);
auto& hostPool = s_connectionPoolIndex[connection->GetHost()];
std::lock_guard<std::mutex> lock(CurlConnectionPool::s_connectionPoolMutex);
auto& hostPool = CurlConnectionPool::s_connectionPoolIndex[connection->GetHost()];
// update the time when connection was moved back to pool
connection->updateLastUsageTime();
hostPool.push_front(std::move(connection));
CurlConnectionPool::s_connectionCounter += 1;
// Check if there's no cleaner running and started
if (!CurlConnectionPool::s_isCleanConnectionsRunning)
{
CurlConnectionPool::s_isCleanConnectionsRunning = true;
CurlConnectionPool::CleanUp();
}
}
// spawn a thread for cleaning old connections.
// Thread will keep running while there are at least one connection in the pool
void CurlConnectionPool::CleanUp()
{
std::thread backgroundCleanerThread([]() {
for (;;)
{
// wait before trying to clean
std::this_thread::sleep_for(
std::chrono::milliseconds(Details::c_DefaultCleanerIntervalMilliseconds));
{
// take mutex for reading the pool
std::lock_guard<std::mutex> lock(CurlConnectionPool::s_connectionPoolMutex);
if (CurlConnectionPool::s_connectionCounter == 0)
{
// stop the cleaner since there are no connections
CurlConnectionPool::s_isCleanConnectionsRunning = false;
return;
}
// loop the connection pool index
for (auto index = CurlConnectionPool::s_connectionPoolIndex.begin();
index != CurlConnectionPool::s_connectionPoolIndex.end();
index++)
{
if (index->second.size() == 0)
{
// Move the next pool index
continue;
}
// Pool index with waiting connections. Loop the connection pool backwards until
// a connection that is not expired is found or until all connections are removed.
for (auto connection = index->second.end();;)
{
// loop starts at end(), go back to previous possition. We know the list is size() > 0
// so we are safe to go end() - 1 and find the last element in the list
connection--;
if (connection->get()->isExpired())
{
// remove connection from the pool and update the connection to the next one which
// is going to be list.end()
connection = index->second.erase(connection);
CurlConnectionPool::s_connectionCounter -= 1;
// Connection removed, break if there are no more connections to check
if (index->second.size() == 0)
{
break;
}
}
else
{
// Got a non-expired connection, all connections before this one are not expired.
// Break the loop and continue looping the Pool index
break;
}
}
}
}
}
});
// let thread run independent. It will be done once ther is not connections in the pool
backgroundCleanerThread.detach();
}

View File

@ -4,6 +4,7 @@
#include "transport_adapter.hpp"
#include <azure/core/context.hpp>
#include <azure/core/response.hpp>
#include <iostream>
#include <string>
#include <thread>
@ -43,7 +44,7 @@ namespace Azure { namespace Core { namespace Test {
CheckBodyFromBuffer(*response, expectedResponseBodySize + 6 + 13);
}
// multiThread test requires `s_ConnectionsOnPool` hook which is only available when building
// multiThread test requires `ConnectionsOnPool` hook which is only available when building
// TESTING_BUILD. This test cases are only built when that case is true.`
TEST_F(TransportAdapter, getMultiThread)
{
@ -61,7 +62,8 @@ namespace Azure { namespace Core { namespace Test {
std::thread t2(threadRoutine);
t1.join();
t2.join();
auto connectionsNow = Http::CurlSession::s_ConnectionsOnPool("httpbin.org");
auto connectionsNow = Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org");
// 2 connections must be available at this point
EXPECT_EQ(connectionsNow, 2);
@ -71,11 +73,67 @@ namespace Azure { namespace Core { namespace Test {
t3.join();
t4.join();
t5.join();
connectionsNow = Http::CurlSession::s_ConnectionsOnPool("httpbin.org");
connectionsNow = Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org");
// Two connections re-used plus one connection created
EXPECT_EQ(connectionsNow, 3);
}
#ifdef RUN_LONG_UNIT_TESTS
TEST_F(TransportAdapter, ConnectionPoolCleaner)
{
std::string host("http://httpbin.org/get");
auto threadRoutine = [host]() {
auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Get, host);
auto response = pipeline.Send(context, request);
checkResponseCode(response->GetStatusCode());
auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length"));
CheckBodyFromBuffer(*response, expectedResponseBodySize);
};
// one index expected from previous tests
EXPECT_EQ(Http::CurlConnectionPool::ConnectionsIndexOnPool(), 1);
std::cout
<< "Running Connection Pool Cleaner Test. This test takes more than 3 minutes to complete."
<< std::endl
<< "Add compiler option -DRUN_LONG_UNIT_TESTS=OFF when building if you want to skip this "
"test."
<< std::endl;
// Wait for 100 secs to make sure any previous connection is removed by the cleaner
std::this_thread::sleep_for(std::chrono::milliseconds(1000 * 100));
std::cout << "First wait time done. Validating state." << std::endl;
// index is not affected by cleaner. It does not remove index
EXPECT_EQ(Http::CurlConnectionPool::ConnectionsIndexOnPool(), 1);
// cleaner should have remove connections
EXPECT_EQ(Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"), 0);
// Let cleaner finish
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::thread t1(threadRoutine);
std::thread t2(threadRoutine);
t1.join();
t2.join();
// 2 connections must be available at this point and one index
EXPECT_EQ(Http::CurlConnectionPool::ConnectionsIndexOnPool(), 1);
EXPECT_EQ(Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"), 2);
// At this point, cleaner should be ON and will clean connections after on second.
// After 5 seconds connection pool should have been cleaned
std::this_thread::sleep_for(std::chrono::milliseconds(1000 * 100));
std::cout << "Second wait time done. Validating state." << std::endl;
// EXPECT_EQ(Http::CurlSession::ConnectionsIndexOnPool(), 0);
EXPECT_EQ(Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"), 0);
}
#endif
TEST_F(TransportAdapter, get204)
{
std::string host("http://mt3.google.com/generate_204");