Fix for connection re-use on response error (#548)

* Fix for connection re-use on response error
This commit is contained in:
Victor Vazquez 2020-08-28 05:45:33 +00:00 committed by GitHub
parent e782efbb47
commit 6e33262be5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 176 additions and 115 deletions

View File

@ -93,7 +93,9 @@ namespace Azure { namespace Core { namespace Http {
/**
* Moves a connection back to the pool to be re-used
*/
static void MoveConnectionBackToPool(std::unique_ptr<CurlConnection> connection);
static void MoveConnectionBackToPool(
std::unique_ptr<CurlConnection> connection,
Http::HttpStatusCode lastStatusCode);
// Class can't have instances.
CurlConnectionPool() = delete;
@ -107,6 +109,8 @@ namespace Azure { namespace Core { namespace Http {
static int32_t s_connectionCounter;
static bool s_isCleanConnectionsRunning;
// Removes all connections and indexes
static void ClearIndex() { CurlConnectionPool::s_connectionPoolIndex.clear(); }
// Makes possible to know the number of current connections in the connection pool for an
// index
@ -427,7 +431,7 @@ namespace Azure { namespace Core { namespace Http {
*
* @return CURL_OK when an HTTP response is created.
*/
void ReadStatusLineAndHeadersFromRawResponse();
void ReadStatusLineAndHeadersFromRawResponse(bool reUseInternalBUffer = false);
/**
* @brief Reads from inner buffer or from Wire until chunkSize is parsed and converted to
@ -448,6 +452,8 @@ namespace Azure { namespace Core { namespace Http {
*/
int64_t ReadFromSocket(uint8_t* buffer, int64_t bufferSize);
Http::HttpStatusCode m_lastStatusCode;
bool IsEOF()
{
return this->m_isChunkedResponseType ? this->m_chunkSize == 0
@ -477,9 +483,10 @@ namespace Azure { namespace Core { namespace Http {
// in the wire.
// By not moving the connection back to the pool, it gets destroyed calling the connection
// destructor to clean libcurl handle and close the connection.
if (IsEOF())
if (this->IsEOF())
{
CurlConnectionPool::MoveConnectionBackToPool(std::move(this->m_connection));
CurlConnectionPool::MoveConnectionBackToPool(
std::move(this->m_connection), this->m_lastStatusCode);
}
}

View File

@ -109,11 +109,19 @@ CURLcode CurlSession::Perform(Context const& context)
// Check server response from Expect:100-continue for PUT;
// This help to prevent us from start uploading data when Server can't handle it
if (this->m_response->GetStatusCode() != HttpStatusCode::Continue)
if (this->m_lastStatusCode != HttpStatusCode::Continue)
{
return result; // Won't upload.
}
if (this->m_bodyStartInBuffer > 0)
{
// If internal buffer has more data after the 100-continue means Server return an error.
// We don't need to upload body, just parse the response from Server and return
ReadStatusLineAndHeadersFromRawResponse(true);
return result;
}
// Start upload
result = this->UploadBody(context);
if (result != CURLE_OK)
@ -334,7 +342,7 @@ void CurlSession::ParseChunkSize()
}
// Read status line plus headers to create a response with no body
void CurlSession::ReadStatusLineAndHeadersFromRawResponse()
void CurlSession::ReadStatusLineAndHeadersFromRawResponse(bool reUseInternalBUffer)
{
auto parser = ResponseBufferParser();
auto bufferSize = int64_t();
@ -342,12 +350,27 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse()
// Keep reading until all headers were read
while (!parser.IsParseCompleted())
{
// Try to fill internal buffer from socket.
// If response is smaller than buffer, we will get back the size of the response
bufferSize = ReadFromSocket(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
// returns the number of bytes parsed up to the body Start
auto bytesParsed = parser.Parse(this->m_readBuffer, static_cast<size_t>(bufferSize));
int64_t bytesParsed = 0;
if (reUseInternalBUffer)
{
// parse from internal buffer. This means previous read from server got more than one response.
// This happens when Server returns a 100-continue plus an error code
bufferSize = this->m_innerBufferSize - this->m_bodyStartInBuffer;
bytesParsed = parser.Parse(
this->m_readBuffer + this->m_bodyStartInBuffer, static_cast<size_t>(bufferSize));
// if parsing from internal buffer is not enough, do next read from wire
reUseInternalBUffer = false;
// reset body start
this->m_bodyStartInBuffer = -1;
}
else
{
// Try to fill internal buffer from socket.
// If response is smaller than buffer, we will get back the size of the response
bufferSize = ReadFromSocket(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
// returns the number of bytes parsed up to the body Start
bytesParsed = parser.Parse(this->m_readBuffer, static_cast<size_t>(bufferSize));
}
if (bytesParsed < bufferSize)
{
@ -357,6 +380,7 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse()
this->m_response = parser.GetResponse();
this->m_innerBufferSize = static_cast<size_t>(bufferSize);
this->m_lastStatusCode = this->m_response->GetStatusCode();
// For Head request, set the length of body response to 0.
// Response will give us content-length as if we were not doing Head saying what would it be the
@ -364,7 +388,7 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse()
// For NoContent status code, also need to set conentLength to 0.
// https://github.com/Azure/azure-sdk-for-cpp/issues/406
if (this->m_request.GetMethod() == HttpMethod::Head
|| this->m_response->GetStatusCode() == Azure::Core::Http::HttpStatusCode::NoContent)
|| this->m_lastStatusCode == Azure::Core::Http::HttpStatusCode::NoContent)
{
this->m_contentLength = 0;
this->m_bodyStartInBuffer = -1;
@ -810,7 +834,7 @@ std::unique_ptr<CurlConnection> CurlConnectionPool::GetCurlConnection(Request& r
CurlConnectionPool::s_connectionCounter -= 1;
// Remove index if there are no more connections
if (CurlConnectionPool::s_connectionPoolIndex.size() == 0)
if (hostPoolIndex->second.size() == 0)
{
CurlConnectionPool::s_connectionPoolIndex.erase(hostPoolIndex);
}
@ -830,15 +854,16 @@ std::unique_ptr<CurlConnection> CurlConnectionPool::GetCurlConnection(Request& r
if (result != CURLE_OK)
{
throw std::runtime_error(
Details::c_DefaultFailedToGetNewConnectionTemplate + host + ". Could not set URL.");
Details::c_DefaultFailedToGetNewConnectionTemplate + host + ". "
+ std::string(curl_easy_strerror(result)));
}
result = curl_easy_setopt(newConnection->GetHandle(), CURLOPT_CONNECT_ONLY, 1L);
if (result != CURLE_OK)
{
throw std::runtime_error(
Details::c_DefaultFailedToGetNewConnectionTemplate + host
+ ". Could not set connect only ON.");
Details::c_DefaultFailedToGetNewConnectionTemplate + host + ". "
+ std::string(curl_easy_strerror(result)));
}
// curl_easy_setopt(newConnection->GetHandle(), CURLOPT_VERBOSE, 1L);
@ -849,22 +874,32 @@ std::unique_ptr<CurlConnection> CurlConnectionPool::GetCurlConnection(Request& r
if (result != CURLE_OK)
{
throw std::runtime_error(
Details::c_DefaultFailedToGetNewConnectionTemplate + host + ". Could not set timeout.");
Details::c_DefaultFailedToGetNewConnectionTemplate + host + ". "
+ std::string(curl_easy_strerror(result)));
}
result = curl_easy_perform(newConnection->GetHandle());
if (result != CURLE_OK)
{
throw std::runtime_error(
Details::c_DefaultFailedToGetNewConnectionTemplate + host + ". Could not open connection.");
Details::c_DefaultFailedToGetNewConnectionTemplate + host + ". "
+ std::string(curl_easy_strerror(result)));
}
return newConnection;
}
// Move the connection back to the connection pool. Push it to the front so it becomes the first
// connection to be picked next time some one ask for a connection to the pool (LIFO)
void CurlConnectionPool::MoveConnectionBackToPool(std::unique_ptr<CurlConnection> connection)
void CurlConnectionPool::MoveConnectionBackToPool(std::unique_ptr<CurlConnection> connection, Http::HttpStatusCode lastStatusCode)
{
auto code = static_cast<std::underlying_type<Http::HttpStatusCode>::type>(lastStatusCode);
// laststatusCode = 0
if (code < 200 || code >= 300)
{
// A hanlder with previos response with Error can't be re-use.
return;
}
// Lock mutex to access connection pool. mutex is unlock as soon as lock is out of scope
std::lock_guard<std::mutex> lock(CurlConnectionPool::s_connectionPoolMutex);
auto& hostPool = CurlConnectionPool::s_connectionPoolIndex[connection->GetHost()];

View File

@ -21,7 +21,6 @@ set(CMAKE_CXX_STANDARD_REQUIRED True)
add_executable (
${TARGET_NAME}
file_upload.cpp
http.cpp
logging.cpp
main.cpp
@ -30,6 +29,7 @@ add_executable (
telemetry_policy.cpp
transport_adapter.cpp
transport_adapter.hpp
transport_adapter_file_upload.cpp
uuid.cpp
context.cpp)

View File

@ -44,96 +44,6 @@ namespace Azure { namespace Core { namespace Test {
CheckBodyFromBuffer(*response, expectedResponseBodySize + 6 + 13);
}
// multiThread test requires `ConnectionsOnPool` hook which is only available when building
// TESTING_BUILD. This test cases are only built when that case is true.`
TEST_F(TransportAdapter, getMultiThread)
{
std::string host("http://httpbin.org/get");
auto threadRoutine = [host]() {
auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Get, host);
auto response = pipeline.Send(context, request);
checkResponseCode(response->GetStatusCode());
auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length"));
CheckBodyFromBuffer(*response, expectedResponseBodySize);
};
std::thread t1(threadRoutine);
std::thread t2(threadRoutine);
t1.join();
t2.join();
auto connectionsNow = Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org");
// 2 connections must be available at this point
EXPECT_EQ(connectionsNow, 2);
std::thread t3(threadRoutine);
std::thread t4(threadRoutine);
std::thread t5(threadRoutine);
t3.join();
t4.join();
t5.join();
connectionsNow = Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org");
// Two connections re-used plus one connection created
EXPECT_EQ(connectionsNow, 3);
}
#ifdef RUN_LONG_UNIT_TESTS
TEST_F(TransportAdapter, ConnectionPoolCleaner)
{
std::string host("http://httpbin.org/get");
auto threadRoutine = [host]() {
auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Get, host);
auto response = pipeline.Send(context, request);
checkResponseCode(response->GetStatusCode());
auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length"));
CheckBodyFromBuffer(*response, expectedResponseBodySize);
};
// one index expected from previous tests
EXPECT_EQ(Http::CurlConnectionPool::ConnectionsIndexOnPool(), 1);
std::cout
<< "Running Connection Pool Cleaner Test. This test takes more than 3 minutes to complete."
<< std::endl
<< "Add compiler option -DRUN_LONG_UNIT_TESTS=OFF when building if you want to skip this "
"test."
<< std::endl;
// Wait for 100 secs to make sure any previous connection is removed by the cleaner
std::this_thread::sleep_for(std::chrono::milliseconds(1000 * 100));
std::cout << "First wait time done. Validating state." << std::endl;
// index is not affected by cleaner. It does not remove index
EXPECT_EQ(Http::CurlConnectionPool::ConnectionsIndexOnPool(), 1);
// cleaner should have remove connections
EXPECT_EQ(Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"), 0);
// Let cleaner finish
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::thread t1(threadRoutine);
std::thread t2(threadRoutine);
t1.join();
t2.join();
// 2 connections must be available at this point and one index
EXPECT_EQ(Http::CurlConnectionPool::ConnectionsIndexOnPool(), 1);
EXPECT_EQ(Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"), 2);
// At this point, cleaner should be ON and will clean connections after on second.
// After 5 seconds connection pool should have been cleaned
std::this_thread::sleep_for(std::chrono::milliseconds(1000 * 100));
std::cout << "Second wait time done. Validating state." << std::endl;
// EXPECT_EQ(Http::CurlSession::ConnectionsIndexOnPool(), 0);
EXPECT_EQ(Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"), 0);
}
#endif
TEST_F(TransportAdapter, get204)
{
std::string host("http://mt3.google.com/generate_204");
@ -242,6 +152,115 @@ namespace Azure { namespace Core { namespace Test {
CheckBodyFromBuffer(*response, expectedResponseBodySize, expectedChunkResponse);
}
TEST_F(TransportAdapter, putErrorResponse)
{
std::string host("http://httpbin.org/get");
// Try to make a PUT to a GET url. This will return an error code from server.
// This test makes sure that the connection is not re-used (because it gets closed by server)
// and next request is not hang
for (auto i = 0; i < 10; i++)
{
auto requestBodyVector = std::vector<uint8_t>(10, 'x');
auto bodyRequest = Azure::Core::Http::MemoryBodyStream(requestBodyVector);
auto request
= Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Put, host, &bodyRequest);
auto response = pipeline.Send(context, request);
}
}
// multiThread test requires `ConnectionsOnPool` hook which is only available when building
// TESTING_BUILD. This test cases are only built when that case is true.`
TEST_F(TransportAdapter, getMultiThread)
{
std::string host("http://httpbin.org/get");
Azure::Core::Http::CurlConnectionPool::ClearIndex();
auto threadRoutine = [host]() {
auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Get, host);
auto response = pipeline.Send(context, request);
checkResponseCode(response->GetStatusCode());
auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length"));
CheckBodyFromBuffer(*response, expectedResponseBodySize);
};
std::thread t1(threadRoutine);
std::thread t2(threadRoutine);
t1.join();
t2.join();
// wait a few ms for connections to go back to pool.
std::this_thread::sleep_for(std::chrono::milliseconds(500));
// 2 connections must be available at this point
EXPECT_EQ(Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"), 2);
std::thread t3(threadRoutine);
std::thread t4(threadRoutine);
std::thread t5(threadRoutine);
t3.join();
t4.join();
t5.join();
// wait a few ms for connections to go back to pool.
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
// Two connections re-used plus one connection created
EXPECT_EQ(Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"), 3);
}
#ifdef RUN_LONG_UNIT_TESTS
TEST_F(TransportAdapter, ConnectionPoolCleaner)
{
std::string host("http://httpbin.org/get");
auto threadRoutine = [host]() {
auto request = Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Get, host);
auto response = pipeline.Send(context, request);
checkResponseCode(response->GetStatusCode());
auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length"));
CheckBodyFromBuffer(*response, expectedResponseBodySize);
};
// one index expected from previous tests
EXPECT_EQ(Http::CurlConnectionPool::ConnectionsIndexOnPool(), 1);
std::cout
<< "Running Connection Pool Cleaner Test. This test takes more than 3 minutes to complete."
<< std::endl
<< "Add compiler option -DRUN_LONG_UNIT_TESTS=OFF when building if you want to skip this "
"test."
<< std::endl;
// Wait for 100 secs to make sure any previous connection is removed by the cleaner
std::this_thread::sleep_for(std::chrono::milliseconds(1000 * 100));
std::cout << "First wait time done. Validating state." << std::endl;
// index is not affected by cleaner. It does not remove index
EXPECT_EQ(Http::CurlConnectionPool::ConnectionsIndexOnPool(), 1);
// cleaner should have remove connections
EXPECT_EQ(Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"), 0);
// Let cleaner finish
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::thread t1(threadRoutine);
std::thread t2(threadRoutine);
t1.join();
t2.join();
// 2 connections must be available at this point and one index
EXPECT_EQ(Http::CurlConnectionPool::ConnectionsIndexOnPool(), 1);
EXPECT_EQ(Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"), 2);
// At this point, cleaner should be ON and will clean connections after on second.
// After 5 seconds connection pool should have been cleaned
std::this_thread::sleep_for(std::chrono::milliseconds(1000 * 100));
std::cout << "Second wait time done. Validating state." << std::endl;
// EXPECT_EQ(Http::CurlSession::ConnectionsIndexOnPool(), 0);
EXPECT_EQ(Http::CurlConnectionPool::ConnectionsOnPool("httpbin.org"), 0);
}
#endif
// **********************
// ***Same tests but getting stream to pull from socket, simulating the Download Op
// **********************

View File

@ -46,7 +46,7 @@ namespace Azure { namespace Core { namespace Test {
if (size > 0)
{ // only for known body size
EXPECT_EQ(bodyVector.size(), size);
EXPECT_EQ(bodySize, size);
}
if (expectedBody.size() > 0)
@ -81,7 +81,7 @@ namespace Azure { namespace Core { namespace Test {
}
}
TEST_F(TransportAdapter, customSizePutFromFile)
TEST_F(TransportAdapter, SizePutFromFile)
{
std::string host("http://httpbin.org/put");
std::string testDataPath(AZURE_TEST_DATA_PATH);
@ -116,7 +116,7 @@ namespace Azure { namespace Core { namespace Test {
}
}
TEST_F(TransportAdapter, customSizePutFromFileDefault)
TEST_F(TransportAdapter, SizePutFromFileDefault)
{
std::string host("http://httpbin.org/put");
std::string testDataPath(AZURE_TEST_DATA_PATH);
@ -150,7 +150,7 @@ namespace Azure { namespace Core { namespace Test {
}
}
TEST_F(TransportAdapter, customSizePutFromFileBiggerPage)
TEST_F(TransportAdapter, SizePutFromFileBiggerPage)
{
std::string host("http://httpbin.org/put");
std::string testDataPath(AZURE_TEST_DATA_PATH);