Refactor curl connection for mocking (#869)

* split curl headers

* curl transport refactoring for curl mocking support tests
This commit is contained in:
Victor Vazquez 2020-10-29 23:44:06 -07:00 committed by GitHub
parent 6450133774
commit 1fa314f19e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 822 additions and 652 deletions

View File

@ -0,0 +1,108 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
/**
* @file
* @brief The curl connection pool provides the utilities for creating a new curl connection and to
* keep a pool of connections to be re-used.
*/
#pragma once
#include "azure/core/http/curl/curl_connection.hpp"
#include "azure/core/http/http.hpp"
#include <curl/curl.h>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#ifdef TESTING_BUILD
// Define the class name that reads from ConnectionPool private members
namespace Azure { namespace Core { namespace Test {
class TransportAdapter_connectionPoolTest_Test;
}}} // namespace Azure::Core::Test
#endif
namespace Azure { namespace Core { namespace Http {
/**
* @brief CURL HTTP connection pool makes it possible to re-use one curl connection to perform
* more than one request. Use this component when connections are not re-used by default.
*
* This pool offers static methods and it is allocated statically. There can be only one
* connection pool per application.
*/
class CurlConnectionPool {
#ifdef TESTING_BUILD
// Give access to private to this tests class
friend class Azure::Core::Test::TransportAdapter_connectionPoolTest_Test;
#endif
public:
/**
* @brief Mutex for accessing connection pool for thread-safe reading and writing.
*/
static std::mutex ConnectionPoolMutex;
/**
* @brief Keeps an unique key for each host and creates a connection pool for each key.
*
* @detail This way getting a connection for a specific host can be done in O(1) instead of
* looping a single connection list to find the first connection for the required host.
*
* @remark There might be multiple connections for each host.
*/
static std::map<std::string, std::list<std::unique_ptr<CurlNetworkConnection>>>
ConnectionPoolIndex;
/**
* @brief Finds a connection to be re-used from the connection pool.
* @remark If there is not any available connection, a new connection is created.
*
* @param request HTTP request to get #CurlNetworkConnection for.
*
* @return #CurlNetworkConnection to use.
*/
static std::unique_ptr<CurlNetworkConnection> GetCurlConnection(Request& request);
/**
* @brief Moves a connection back to the pool to be re-used.
*
* @param connection CURL HTTP connection to add to the pool.
* @param lastStatusCode The most recent HTTP status code received from the \p connection.
*/
static void MoveConnectionBackToPool(
std::unique_ptr<CurlNetworkConnection> connection,
HttpStatusCode lastStatusCode);
// Class can't have instances.
CurlConnectionPool() = delete;
private:
/**
* Review all connections in the pool and removes old connections that might be already
* expired and closed its connection on server side.
*/
static void CleanUp();
static int32_t s_connectionCounter;
static bool s_isCleanConnectionsRunning;
// Removes all connections and indexes
static void ClearIndex() { CurlConnectionPool::ConnectionPoolIndex.clear(); }
// Makes possible to know the number of current connections in the connection pool for an
// index
static int64_t ConnectionsOnPool(std::string const& host)
{
auto& pool = CurlConnectionPool::ConnectionPoolIndex[host];
return pool.size();
};
// Makes possible to know the number indexes in the pool
static int64_t ConnectionsIndexOnPool()
{
return CurlConnectionPool::ConnectionPoolIndex.size();
};
};
}}} // namespace Azure::Core::Http

View File

@ -2,6 +2,7 @@
// SPDX-License-Identifier: MIT
/**
* @file
* @brief #HttpTransport implementation via CURL.
*/
@ -10,596 +11,14 @@
#ifdef BUILD_CURL_HTTP_TRANSPORT_ADAPTER
#include "azure/core/context.hpp"
#include "azure/core/http/curl/curl connection_pool.hpp"
#include "azure/core/http/curl/curl_connection.hpp"
#include "azure/core/http/curl/curl_session.hpp"
#include "azure/core/http/http.hpp"
#include "azure/core/http/policy.hpp"
#include <chrono>
#include <curl/curl.h>
#include <functional>
#include <list>
#include <map>
#include <mutex>
#include <type_traits>
#include <vector>
#ifdef TESTING_BUILD
// Define the class name that reads from ConnectionPool private members
namespace Azure { namespace Core { namespace Test {
class TransportAdapter_connectionPoolTest_Test;
}}} // namespace Azure::Core::Test
#endif
namespace Azure { namespace Core { namespace Http {
namespace Details {
// libcurl CURL_MAX_WRITE_SIZE is 64k. Using same value for default uploading chunk size.
// This can be customizable in the HttpRequest
constexpr static int64_t c_DefaultUploadChunkSize = 1024 * 64;
constexpr static auto c_DefaultLibcurlReaderSize = 1024;
// Run time error template
constexpr static const char* c_DefaultFailedToGetNewConnectionTemplate
= "Fail to get a new connection for: ";
constexpr static int c_DefaultMaxOpenNewConnectionIntentsAllowed = 10;
// 90 sec -> cleaner wait time before next clean routine
constexpr static int c_DefaultCleanerIntervalMilliseconds = 1000 * 90;
// 60 sec -> expired connection is when it waits for 60 sec or more and it's not re-used
constexpr static int c_DefaultConnectionExpiredMilliseconds = 1000 * 60;
} // namespace Details
/**
* @brief CURL HTTP connection.
*/
class CurlConnection {
private:
CURL* m_handle;
std::string m_host;
std::chrono::steady_clock::time_point m_lastUseTime;
public:
/**
* @Brief Construct CURL HTTP connection.
*
* @param host HTTP connection host name.
*/
CurlConnection(std::string const& host) : m_handle(curl_easy_init()), m_host(host) {}
/**
* @brief Destructor.
* @detail Cleans up CURL (invokes `curl_easy_cleanup()`).
*/
~CurlConnection() { curl_easy_cleanup(this->m_handle); }
/**
* @brief Get CURL handle.
* @return CURL handle for the HTTP connection.
*/
CURL* GetHandle() { return this->m_handle; }
/**
* @brief Get HTTP connection host.
* @return HTTP connection host name.
*/
std::string GetHost() const { return this->m_host; }
/**
* @brief Update last usage time for the connection.
*/
void updateLastUsageTime() { this->m_lastUseTime = std::chrono::steady_clock::now(); }
/**
* @brief Checks whether this CURL connection is expired.
* @return `true` if this connection is considered expired, `false` otherwise.
*/
bool isExpired()
{
auto connectionOnWaitingTimeMs = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - this->m_lastUseTime);
return connectionOnWaitingTimeMs.count() >= Details::c_DefaultConnectionExpiredMilliseconds;
}
};
/**
* @brief CURL HTTP connection pool makes it possible to re-use one curl connection to perform
* more than one request. Use this component when connections are not re-used by default.
*
* This pool offers static methods and it is allocated statically. There can be only one
* connection pool per application.
*/
struct CurlConnectionPool
{
#ifdef TESTING_BUILD
// Give access to private to this tests class
friend class Azure::Core::Test::TransportAdapter_connectionPoolTest_Test;
#endif
/**
* @brief Mutex for accessing connection pool for thread-safe reading and writing.
*/
static std::mutex s_connectionPoolMutex;
/**
* @brief Keeps an unique key for each host and creates a connection pool for each key.
*
* @detail This way getting a connection for a specific host can be done in O(1) instead of
* looping a single connection list to find the first connection for the required host.
*
* @remark There might be multiple connections for each host.
*/
static std::map<std::string, std::list<std::unique_ptr<CurlConnection>>> s_connectionPoolIndex;
/**
* @brief Finds a connection to be re-used from the connection pool.
* @remark If there is not any available connection, a new connection is created.
*
* @param request HTTP request to get #CurlConnection for.
*
* @return #CurlConnection to use.
*/
static std::unique_ptr<CurlConnection> GetCurlConnection(Request& request);
/**
* @brief Moves a connection back to the pool to be re-used.
*
* @param connection CURL HTTP connection to add to the pool.
* @param lastStatusCode The most recent HTTP status code received from the \p connection.
*/
static void MoveConnectionBackToPool(
std::unique_ptr<CurlConnection> connection,
Http::HttpStatusCode lastStatusCode);
// Class can't have instances.
CurlConnectionPool() = delete;
private:
/**
* Review all connections in the pool and removes old connections that might be already
* expired and closed its connection on server side.
*/
static void CleanUp();
static int32_t s_connectionCounter;
static bool s_isCleanConnectionsRunning;
// Removes all connections and indexes
static void ClearIndex() { CurlConnectionPool::s_connectionPoolIndex.clear(); }
// Makes possible to know the number of current connections in the connection pool for an
// index
static int64_t ConnectionsOnPool(std::string const& host)
{
auto& pool = CurlConnectionPool::s_connectionPoolIndex[host];
return pool.size();
};
// Makes possible to know the number indexes in the pool
static int64_t ConnectionsIndexOnPool()
{
return CurlConnectionPool::s_connectionPoolIndex.size();
};
};
/**
* @brief Stateful component that controls sending an HTTP Request with libcurl over the wire.
*
* @remark This component does not use the classic libcurl easy interface to send and receive
* bytes from the network using callbacks. Instead, `CurlSession` supports working with the custom
* HTTP protocol option from libcurl to manually upload and download bytes from the network socket
* using curl_easy_send() and curl_easy_recv().
*
* @remarks This component is expected to be used by an HTTP Transporter to ensure that
* transporter to be reusable in multiple pipelines while every call to network is unique.
*/
class CurlSession : public BodyStream {
private:
/*
* Enum used by ResponseBufferParser to control the parsing internal state while building
* the HTTP RawResponse
*
*/
enum class ResponseParserState
{
StatusLine,
Headers,
EndOfHeaders,
};
/**
* @brief This is used to set the current state of a session.
*
* @remark The session needs to know what's the state on it when an exception occurs so the
* connection is not moved back to the connection pool. When a new request is going to be sent,
* the session will be in `PERFORM` until the request has been uploaded and a response code is
* received from the server. At that point the state will change to `STREAMING`.
* If there is any error before changing the state, the connection need to be cleaned up.
*
*/
enum class SessionState
{
PERFORM,
STREAMING
};
/**
* @brief stateful component used to read and parse a buffer to construct a valid HTTP
* RawResponse.
*
* @remark It uses an internal string as a buffer to accumulate a response token (version, code,
* header, etc.) until the next delimiter is found. Then it uses this string to keep building
* the HTTP RawResponse.
*
* @remark Only status line and headers are parsed and built. Body is ignored by this
* component. A libcurl session will use this component to build and return the HTTP
* RawResponse with a body stream to the pipeline.
*/
class ResponseBufferParser {
private:
/**
* @brief Controls what the parser is expecting during the reading process
*
*/
ResponseParserState state;
/**
* @brief Unique ptr to a response. Parser will create an Initial-valid HTTP RawResponse and
* then it will append headers to it. This response is moved to a different owner once
* parsing is completed.
*
*/
std::unique_ptr<RawResponse> m_response;
/**
* @brief Indicates if parser has found the end of the headers and there is nothing left for
* the HTTP RawResponse.
*
*/
bool m_parseCompleted;
bool m_delimiterStartInPrevPosition;
/**
* @brief This buffer is used when the parsed buffer doesn't contain a completed token. The
* content from the buffer will be appended to this buffer. Once that a delimiter is found,
* the token for the HTTP RawResponse is taken from this internal sting if it contains data.
*
* @remark This buffer allows a libcurl session to use any size of buffer to read from a
* socket while constructing an initial valid HTTP RawResponse. No matter if the response
* from wire contains hundreds of headers, we can use only one fixed size buffer to parse it
* all.
*
*/
std::string m_internalBuffer;
/**
* @brief This method is invoked by the Parsing process if the internal state is set to
* status code. Function will get the status-line expected tokens until finding the end of
* status line delimiter.
*
* @remark When the end of status line delimiter is found, this method will create the HTTP
* RawResponse. The HTTP RawResponse is constructed by default with body type as Stream.
*
* @param buffer Points to a memory address with all or some part of a HTTP status line.
* @param bufferSize Indicates the size of the buffer.
* @return Returns the index of the last parsed position from buffer.
*/
int64_t BuildStatusCode(uint8_t const* const buffer, int64_t const bufferSize);
/**
* @brief This method is invoked by the Parsing process if the internal state is set to
* headers. Function will keep adding headers to the HTTP RawResponse created before while
* parsing an status line.
*
* @param buffer Points to a memory address with all or some part of a HTTP header.
* @param bufferSize Indicates the size of the buffer.
* @return Returns the index of the last parsed position from buffer. When the returned
* value is smaller than the body size, means there is part of the body response in the
* buffer.
*/
int64_t BuildHeader(uint8_t const* const buffer, int64_t const bufferSize);
public:
/**
* @brief Construct a new RawResponse Buffer Parser object.
*
*/
ResponseBufferParser()
{
state = ResponseParserState::StatusLine;
this->m_parseCompleted = false;
this->m_delimiterStartInPrevPosition = false;
}
/**
* @brief Parses the content of a buffer to construct a valid HTTP RawResponse. This method
* is expected to be called over and over until it returns 0, indicating there is nothing
* more to parse to build the HTTP RawResponse.
*
* @param buffer points to a memory area that contains, all or some part of an HTTP
* response.
* @param bufferSize Indicates the size of the buffer.
* @return Returns the index of the last parsed position. Returning a 0 means nothing was
* parsed and it is likely that the HTTP RawResponse is completed. Returning the same value
* as the buffer size means all buffer was parsed and the HTTP might be completed or not.
* Returning a value smaller than the buffer size will likely indicate that the HTTP
* RawResponse is completed and that the rest of the buffer contains part of the response
* body.
*/
int64_t Parse(uint8_t const* const buffer, int64_t const bufferSize);
/**
* @brief Indicates when the parser has completed parsing and building the HTTP RawResponse.
*
* @return `true` if parsing is completed. Otherwise `false`.
*/
bool IsParseCompleted() const { return this->m_parseCompleted; }
/**
* @brief Moves the internal response to a different owner.
*
* @return Will move the response only if parsing is completed and if the HTTP RawResponse
* was not moved before.
*/
std::unique_ptr<RawResponse> GetResponse()
{
if (this->m_parseCompleted && this->m_response != nullptr)
{
return std::move(this->m_response);
}
return nullptr; // parse is not completed or response has been moved already.
}
};
std::unique_ptr<CurlConnection> m_connection;
/**
* @brief libcurl socket abstraction used when working with streams.
*
*/
curl_socket_t m_curlSocket;
/**
* @brief The current state of the session.
*
* @remark The state of the session is used to determine if a connection can be moved back to
* the connection pool or not. A connection can be re-used only when the session state is
* `STREAMING` and the response has been read completely.
*
*/
SessionState m_sessionState;
/**
* @brief unique ptr for the HTTP RawResponse. The session is responsable for creating the
* response once that an HTTP status line is received.
*
*/
std::unique_ptr<RawResponse> m_response;
/**
* @brief The HTTP Request for to be used by the session.
*
*/
Request& m_request;
/**
* @brief Control field to handle the case when part of HTTP response body was copied to the
* inner buffer. When a libcurl stream tries to read part of the body, this field will help to
* decide how much data to take from the inner buffer before pulling more data from network.
*
*/
int64_t m_bodyStartInBuffer;
/**
* @brief Control field to handle the number of bytes containing relevant data within the
* internal buffer. This is because internal buffer can be set to be size N but after writing
* from wire into it, it can be holding less then N bytes.
*
*/
int64_t m_innerBufferSize;
bool m_isChunkedResponseType;
/**
* @brief This is a copy of the value of an HTTP response header `content-length`. The value
* is received as string and parsed to size_t. This field avoid parsing the string header
* every time from HTTP RawResponse.
*
* @remark This value is also used to avoid trying to read more data from network than what we
* are expecting to.
*
*/
int64_t m_contentLength;
/**
* @brief For chunked responses, this field knows the size of the current chuck size server
* will de sending
*
*/
int64_t m_chunkSize;
int64_t m_sessionTotalRead = 0;
/**
* @brief Internal buffer from a session used to read bytes from a socket. This buffer is only
* used while constructing an HTTP RawResponse without adding a body to it. Customers would
* provide their own buffer to copy from socket when reading the HTTP body using streams.
*
*/
uint8_t m_readBuffer[Details::c_DefaultLibcurlReaderSize]; // to work with libcurl custom read.
/**
* @brief convenient function that indicates when the HTTP Request will need to upload a
* payload or not.
*
* @return true if the HTTP Request will need to upload bytes to wire.
*
*/
bool isUploadRequest();
/**
* @brief Set up libcurl handle to behave as a specific HTTP Method.
*
* @return returns the libcurl result after setting up.
*/
CURLcode SetMethod();
/**
* @brief Creates a list of libcurl headers and set it up to CURLOPT_HTTPHEADER.
*
* @remark For an HTTP Request that requires uploading bytes to network, this method will set
* the content-length header and will also set libcurl to avoid sending an expect; header to
* only ask server if it is OK to upload the body.
*
* @return returns the libcurl result after setting up.
*/
CURLcode SetHeaders();
/**
* @brief Function used when working with Streams to manually write from the HTTP Request to
* the wire.
*
* @param context #Context so that operation can be canceled.
*
* @return CURL_OK when response is sent successfully.
*/
CURLcode SendRawHttp(Context const& context);
/**
* @brief Upload body.
*
* @param context #Context so that operation can be canceled.
*
* @return Curl code.
*/
CURLcode UploadBody(Context const& context);
/**
* @brief This method will use libcurl socket to write all the bytes from buffer.
*
* @remarks Hardcoded timeout is used in case a socket stop responding.
*
* @param context #Context so that operation can be canceled.
* @param buffer ptr to the data to be sent to wire.
* @param bufferSize size of the buffer to send.
* @return CURL_OK when response is sent successfully.
*/
CURLcode SendBuffer(Context const& context, uint8_t const* buffer, size_t bufferSize);
/**
* @brief This function is used after sending an HTTP request to the server to read the HTTP
* RawResponse from wire until the end of headers only.
*
* @param context #Context so that operation can be canceled.
* @param reuseInternalBuffer Indicates whether the internal buffer should be reused.
*
* @return CURL_OK when an HTTP response is created.
*/
void ReadStatusLineAndHeadersFromRawResponse(
Context const& context,
bool reuseInternalBuffer = false);
/**
* @brief Reads from inner buffer or from Wire until chunkSize is parsed and converted to
* unsigned long long
*
* @param context #Context so that operation can be canceled.
*/
void ParseChunkSize(Context const& context);
/**
* @brief This function is used when working with streams to pull more data from the wire.
* Function will try to keep pulling data from socket until the buffer is all written or until
* there is no more data to get from the socket.
*
* @param context #Context so that operation can be canceled.
* @param buffer ptr to buffer where to copy bytes from socket.
* @param bufferSize size of the buffer and the requested bytes to be pulled from wire.
* @return return the numbers of bytes pulled from socket. It can be less than what it was
* requested.
*/
int64_t ReadFromSocket(Context const& context, uint8_t* buffer, int64_t bufferSize);
/**
* @brief Last HTTP status code read.
*/
Http::HttpStatusCode m_lastStatusCode;
/**
* @brief check whether an end of file has been reached.
* @return `true` if end of file has been reached, `false` otherwise.
*/
bool IsEOF()
{
auto eof = this->m_isChunkedResponseType ? this->m_chunkSize == 0
: this->m_contentLength == this->m_sessionTotalRead;
// `IsEOF` is called before trying to move a connection back to the connection pool.
// If the session state is `PERFORM` it means the request could not complete an upload
// operation (might have throw while uploading).
// Connection should not be moved back to the connection pool on this scenario.
return eof && m_sessionState != SessionState::PERFORM;
}
public:
/**
* @brief Construct a new Curl Session object. Init internal libcurl handler.
*
* @param request reference to an HTTP Request.
*/
CurlSession(Request& request) : m_request(request)
{
this->m_connection = CurlConnectionPool::GetCurlConnection(this->m_request);
this->m_bodyStartInBuffer = -1;
this->m_innerBufferSize = Details::c_DefaultLibcurlReaderSize;
this->m_isChunkedResponseType = false;
this->m_sessionTotalRead = 0;
}
~CurlSession() override
{
// mark connection as reusable only if entire response was read
// If not, connection can't be reused because next Read will start from what it is currently
// in the wire.
// By not moving the connection back to the pool, it gets destroyed calling the connection
// destructor to clean libcurl handle and close the connection.
// IsEOF will also handle a connection that fail to complete an upload request.
if (this->IsEOF())
{
CurlConnectionPool::MoveConnectionBackToPool(
std::move(this->m_connection), this->m_lastStatusCode);
}
}
/**
* @brief Function will use the HTTP request received in constructor to perform a network call
* based on the HTTP request configuration.
*
* @param context #Context so that operation can be canceled.
* @return CURLE_OK when the network call is completed successfully.
*/
CURLcode Perform(Context const& context);
/**
* @brief Moved the ownership of the HTTP RawResponse out of the session.
*
* @return the unique ptr to the HTTP RawResponse or null if the HTTP RawResponse is not yet
* created or was moved before.
*/
std::unique_ptr<Azure::Core::Http::RawResponse> GetResponse();
/**
* @brief Implement #BodyStream length.
*
* @return The size of the payload.
*/
int64_t Length() const override { return this->m_contentLength; }
/**
* @brief Implement #BodyStream read. Calling this function pulls data from the wire.
*
* @param context #Context so that operation can be canceled.
* @param buffer Buffer where data from wire is written to.
* @param count The number of bytes to read from the network.
* @return The actual number of bytes read from the network.
*/
int64_t Read(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) override;
};
/**
* @brief Concrete implementation of an HTTP Transport that uses libcurl.
*

View File

@ -0,0 +1,164 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
/**
* @file
* @brief The libcurl connection keeps the curl handle and performs the data transfer to the
* network.
*/
#pragma once
#include "azure/core/http/http.hpp"
#include <chrono>
#include <curl/curl.h>
#include <string>
namespace Azure { namespace Core { namespace Http {
namespace Details {
// libcurl CURL_MAX_WRITE_SIZE is 64k. Using same value for default uploading chunk size.
// This can be customizable in the HttpRequest
constexpr static int64_t c_DefaultUploadChunkSize = 1024 * 64;
constexpr static auto c_DefaultLibcurlReaderSize = 1024;
// Run time error template
constexpr static const char* c_DefaultFailedToGetNewConnectionTemplate
= "Fail to get a new connection for: ";
constexpr static int c_DefaultMaxOpenNewConnectionIntentsAllowed = 10;
// 90 sec -> cleaner wait time before next clean routine
constexpr static int c_DefaultCleanerIntervalMilliseconds = 1000 * 90;
// 60 sec -> expired connection is when it waits for 60 sec or more and it's not re-used
constexpr static int c_DefaultConnectionExpiredMilliseconds = 1000 * 60;
} // namespace Details
/**
* @brief Interface for the connection to the network with Curl.
*
* @remark This interface enables to mock the communication to the network with any behavior for
* testing.
*
*/
class CurlNetworkConnection {
public:
/**
* @brief Allow derived classes calling a destructor.
*
*/
virtual ~CurlNetworkConnection() = default;
/**
* @brief Get HTTP connection host.
*/
virtual std::string const& GetHost() const = 0;
/**
* @brief Update last usage time for the connection.
*/
virtual void updateLastUsageTime() = 0;
/**
* @brief Checks whether this CURL connection is expired.
*/
virtual bool isExpired() = 0;
/**
* @brief This function is used when working with streams to pull more data from the wire.
* Function will try to keep pulling data from socket until the buffer is all written or until
* there is no more data to get from the socket.
*
*/
virtual int64_t ReadFromSocket(Context const& context, uint8_t* buffer, int64_t bufferSize) = 0;
/**
* @brief This method will use libcurl socket to write all the bytes from buffer.
*
*/
virtual CURLcode SendBuffer(Context const& context, uint8_t const* buffer, size_t bufferSize)
= 0;
};
/**
* @brief CURL HTTP connection.
*/
class CurlConnection : public CurlNetworkConnection {
private:
CURL* m_handle;
curl_socket_t m_curlSocket;
std::string m_host;
std::chrono::steady_clock::time_point m_lastUseTime;
public:
/**
* @Brief Construct CURL HTTP connection.
*
* @param host HTTP connection host name.
*/
CurlConnection(CURL* handle, std::string const& host) : m_handle(handle), m_host(host)
{
// Get the socket that libcurl is using from handle. Will use this to wait while
// reading/writing
// into wire
auto result = curl_easy_getinfo(m_handle, CURLINFO_ACTIVESOCKET, &m_curlSocket);
if (result != CURLE_OK)
{
throw Http::TransportException(
Details::c_DefaultFailedToGetNewConnectionTemplate + m_host + ". "
+ std::string(curl_easy_strerror(result)));
}
}
/**
* @brief Destructor.
* @detail Cleans up CURL (invokes `curl_easy_cleanup()`).
*/
~CurlConnection() override { curl_easy_cleanup(this->m_handle); }
/**
* @brief Get HTTP connection host.
* @return HTTP connection host name.
*/
std::string const& GetHost() const override { return this->m_host; }
/**
* @brief Update last usage time for the connection.
*/
void updateLastUsageTime() override { this->m_lastUseTime = std::chrono::steady_clock::now(); }
/**
* @brief Checks whether this CURL connection is expired.
* @return `true` if this connection is considered expired, `false` otherwise.
*/
bool isExpired() override
{
auto connectionOnWaitingTimeMs = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - this->m_lastUseTime);
return connectionOnWaitingTimeMs.count() >= Details::c_DefaultConnectionExpiredMilliseconds;
}
/**
* @brief This function is used when working with streams to pull more data from the wire.
* Function will try to keep pulling data from socket until the buffer is all written or until
* there is no more data to get from the socket.
*
* @param context #Context so that operation can be canceled.
* @param buffer ptr to buffer where to copy bytes from socket.
* @param bufferSize size of the buffer and the requested bytes to be pulled from wire.
* @return return the numbers of bytes pulled from socket. It can be less than what it was
* requested.
*/
int64_t ReadFromSocket(Context const& context, uint8_t* buffer, int64_t bufferSize) override;
/**
* @brief This method will use libcurl socket to write all the bytes from buffer.
*
* @remarks Hardcoded timeout is used in case a socket stop responding.
*
* @param context #Context so that operation can be canceled.
* @param buffer ptr to the data to be sent to wire.
* @param bufferSize size of the buffer to send.
* @return CURL_OK when response is sent successfully.
*/
CURLcode SendBuffer(Context const& context, uint8_t const* buffer, size_t bufferSize) override;
};
}}} // namespace Azure::Core::Http

View File

@ -0,0 +1,386 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
/**
* @file
* @brief The curl session consumes a curl connection to perform a request with it and start
* streaming the response.
*
* @remark The curl session is a body stream derived class.
*/
#pragma once
#include "azure/core/http/curl/curl connection_pool.hpp"
#include "azure/core/http/curl/curl_connection.hpp"
#include "azure/core/http/http.hpp"
#include <memory>
#include <string>
namespace Azure { namespace Core { namespace Http {
/**
* @brief Stateful component that controls sending an HTTP Request with libcurl over the wire.
*
* @remark This component does not use the classic libcurl easy interface to send and receive
* bytes from the network using callbacks. Instead, `CurlSession` supports working with the custom
* HTTP protocol option from libcurl to manually upload and download bytes from the network socket
* using curl_easy_send() and curl_easy_recv().
*
* @remarks This component is expected to be used by an HTTP Transporter to ensure that
* transporter to be reusable in multiple pipelines while every call to network is unique.
*/
class CurlSession : public BodyStream {
private:
/**
* @brief This is used to set the current state of a session.
*
* @remark The session needs to know what's the state on it when an exception occurs so the
* connection is not moved back to the connection pool. When a new request is going to be sent,
* the session will be in `PERFORM` until the request has been uploaded and a response code is
* received from the server. At that point the state will change to `STREAMING`.
* If there is any error before changing the state, the connection need to be cleaned up.
*
*/
enum class SessionState
{
PERFORM,
STREAMING
};
/*
* Enum used by ResponseBufferParser to control the parsing internal state while building
* the HTTP RawResponse
*
*/
enum class ResponseParserState
{
StatusLine,
Headers,
EndOfHeaders,
};
/**
* @brief stateful component used to read and parse a buffer to construct a valid HTTP
* RawResponse.
*
* @remark It uses an internal string as a buffer to accumulate a response token (version, code,
* header, etc.) until the next delimiter is found. Then it uses this string to keep building
* the HTTP RawResponse.
*
* @remark Only status line and headers are parsed and built. Body is ignored by this
* component. A libcurl session will use this component to build and return the HTTP
* RawResponse with a body stream to the pipeline.
*/
class ResponseBufferParser {
private:
/**
* @brief Controls what the parser is expecting during the reading process
*
*/
ResponseParserState state;
/**
* @brief Unique ptr to a response. Parser will create an Initial-valid HTTP RawResponse and
* then it will append headers to it. This response is moved to a different owner once
* parsing is completed.
*
*/
std::unique_ptr<RawResponse> m_response;
/**
* @brief Indicates if parser has found the end of the headers and there is nothing left for
* the HTTP RawResponse.
*
*/
bool m_parseCompleted;
bool m_delimiterStartInPrevPosition;
/**
* @brief This buffer is used when the parsed buffer doesn't contain a completed token. The
* content from the buffer will be appended to this buffer. Once that a delimiter is found,
* the token for the HTTP RawResponse is taken from this internal sting if it contains data.
*
* @remark This buffer allows a libcurl session to use any size of buffer to read from a
* socket while constructing an initial valid HTTP RawResponse. No matter if the response
* from wire contains hundreds of headers, we can use only one fixed size buffer to parse it
* all.
*
*/
std::string m_internalBuffer;
/**
* @brief This method is invoked by the Parsing process if the internal state is set to
* status code. Function will get the status-line expected tokens until finding the end of
* status line delimiter.
*
* @remark When the end of status line delimiter is found, this method will create the HTTP
* RawResponse. The HTTP RawResponse is constructed by default with body type as Stream.
*
* @param buffer Points to a memory address with all or some part of a HTTP status line.
* @param bufferSize Indicates the size of the buffer.
* @return Returns the index of the last parsed position from buffer.
*/
int64_t BuildStatusCode(uint8_t const* const buffer, int64_t const bufferSize);
/**
* @brief This method is invoked by the Parsing process if the internal state is set to
* headers. Function will keep adding headers to the HTTP RawResponse created before while
* parsing an status line.
*
* @param buffer Points to a memory address with all or some part of a HTTP header.
* @param bufferSize Indicates the size of the buffer.
* @return Returns the index of the last parsed position from buffer. When the returned
* value is smaller than the body size, means there is part of the body response in the
* buffer.
*/
int64_t BuildHeader(uint8_t const* const buffer, int64_t const bufferSize);
public:
/**
* @brief Construct a new RawResponse Buffer Parser object.
*
*/
ResponseBufferParser()
{
state = ResponseParserState::StatusLine;
m_parseCompleted = false;
m_delimiterStartInPrevPosition = false;
}
/**
* @brief Parses the content of a buffer to construct a valid HTTP RawResponse. This method
* is expected to be called over and over until it returns 0, indicating there is nothing
* more to parse to build the HTTP RawResponse.
*
* @param buffer points to a memory area that contains, all or some part of an HTTP
* response.
* @param bufferSize Indicates the size of the buffer.
* @return Returns the index of the last parsed position. Returning a 0 means nothing was
* parsed and it is likely that the HTTP RawResponse is completed. Returning the same value
* as the buffer size means all buffer was parsed and the HTTP might be completed or not.
* Returning a value smaller than the buffer size will likely indicate that the HTTP
* RawResponse is completed and that the rest of the buffer contains part of the response
* body.
*/
int64_t Parse(uint8_t const* const buffer, int64_t const bufferSize);
/**
* @brief Indicates when the parser has completed parsing and building the HTTP RawResponse.
*
* @return `true` if parsing is completed. Otherwise `false`.
*/
bool IsParseCompleted() const { return m_parseCompleted; }
/**
* @brief Moves the internal response to a different owner.
*
* @return Will move the response only if parsing is completed and if the HTTP RawResponse
* was not moved before.
*/
std::unique_ptr<RawResponse> GetResponse()
{
if (m_parseCompleted && m_response != nullptr)
{
return std::move(m_response);
}
return nullptr; // parse is not completed or response has been moved already.
}
};
/**
* @brief The current state of the session.
*
* @remark The state of the session is used to determine if a connection can be moved back to
* the connection pool or not. A connection can be re-used only when the session state is
* `STREAMING` and the response has been read completely.
*
*/
SessionState m_sessionState;
std::unique_ptr<CurlNetworkConnection> m_connection;
/**
* @brief unique ptr for the HTTP RawResponse. The session is responsable for creating the
* response once that an HTTP status line is received.
*
*/
std::unique_ptr<RawResponse> m_response;
/**
* @brief The HTTP Request for to be used by the session.
*
*/
Request& m_request;
/**
* @brief Control field to handle the case when part of HTTP response body was copied to the
* inner buffer. When a libcurl stream tries to read part of the body, this field will help to
* decide how much data to take from the inner buffer before pulling more data from network.
*
*/
int64_t m_bodyStartInBuffer;
/**
* @brief Control field to handle the number of bytes containing relevant data within the
* internal buffer. This is because internal buffer can be set to be size N but after writing
* from wire into it, it can be holding less then N bytes.
*
*/
int64_t m_innerBufferSize;
bool m_isChunkedResponseType;
/**
* @brief This is a copy of the value of an HTTP response header `content-length`. The value
* is received as string and parsed to size_t. This field avoid parsing the string header
* every time from HTTP RawResponse.
*
* @remark This value is also used to avoid trying to read more data from network than what we
* are expecting to.
*
*/
int64_t m_contentLength;
/**
* @brief For chunked responses, this field knows the size of the current chuck size server
* will de sending
*
*/
int64_t m_chunkSize;
int64_t m_sessionTotalRead = 0;
/**
* @brief Internal buffer from a session used to read bytes from a socket. This buffer is only
* used while constructing an HTTP RawResponse without adding a body to it. Customers would
* provide their own buffer to copy from socket when reading the HTTP body using streams.
*
*/
uint8_t m_readBuffer[Details::c_DefaultLibcurlReaderSize]; // to work with libcurl custom read.
/**
* @brief Function used when working with Streams to manually write from the HTTP Request to
* the wire.
*
* @param context #Context so that operation can be canceled.
*
* @return CURL_OK when response is sent successfully.
*/
CURLcode SendRawHttp(Context const& context);
/**
* @brief Upload body.
*
* @param context #Context so that operation can be canceled.
*
* @return Curl code.
*/
CURLcode UploadBody(Context const& context);
/**
* @brief This function is used after sending an HTTP request to the server to read the HTTP
* RawResponse from wire until the end of headers only.
*
* @param context #Context so that operation can be canceled.
* @param reuseInternalBuffer Indicates whether the internal buffer should be reused.
*
* @return CURL_OK when an HTTP response is created.
*/
void ReadStatusLineAndHeadersFromRawResponse(
Context const& context,
bool reuseInternalBuffer = false);
/**
* @brief Reads from inner buffer or from Wire until chunkSize is parsed and converted to
* unsigned long long
*
* @param context #Context so that operation can be canceled.
*/
void ParseChunkSize(Context const& context);
/**
* @brief Last HTTP status code read.
*/
Http::HttpStatusCode m_lastStatusCode;
/**
* @brief check whether an end of file has been reached.
* @return `true` if end of file has been reached, `false` otherwise.
*/
bool IsEOF()
{
auto eof = m_isChunkedResponseType ? m_chunkSize == 0 : m_contentLength == m_sessionTotalRead;
// `IsEOF` is called before trying to move a connection back to the connection pool.
// If the session state is `PERFORM` it means the request could not complete an upload
// operation (might have throw while uploading).
// Connection should not be moved back to the connection pool on this scenario.
return eof && m_sessionState != SessionState::PERFORM;
}
public:
/**
* @brief Construct a new Curl Session object. Init internal libcurl handler.
*
* @param request reference to an HTTP Request.
*/
CurlSession(Request& request, std::unique_ptr<CurlNetworkConnection> connection)
: m_connection(std::move(connection)), m_request(request)
{
m_bodyStartInBuffer = -1;
m_innerBufferSize = Details::c_DefaultLibcurlReaderSize;
m_isChunkedResponseType = false;
m_sessionTotalRead = 0;
}
~CurlSession() override
{
// mark connection as reusable only if entire response was read
// If not, connection can't be reused because next Read will start from what it is currently
// in the wire.
// By not moving the connection back to the pool, it gets destroyed calling the connection
// destructor to clean libcurl handle and close the connection.
// IsEOF will also handle a connection that fail to complete an upload request.
if (IsEOF())
{
CurlConnectionPool::MoveConnectionBackToPool(std::move(m_connection), m_lastStatusCode);
}
}
/**
* @brief Function will use the HTTP request received in constructor to perform a network call
* based on the HTTP request configuration.
*
* @param context #Context so that operation can be canceled.
* @return CURLE_OK when the network call is completed successfully.
*/
CURLcode Perform(Context const& context);
/**
* @brief Moved the ownership of the HTTP RawResponse out of the session.
*
* @return the unique ptr to the HTTP RawResponse or null if the HTTP RawResponse is not yet
* created or was moved before.
*/
std::unique_ptr<Azure::Core::Http::RawResponse> GetResponse();
/**
* @brief Implement #BodyStream length.
*
* @return The size of the payload.
*/
int64_t Length() const override { return m_contentLength; }
/**
* @brief Implement #BodyStream read. Calling this function pulls data from the wire.
*
* @param context #Context so that operation can be canceled.
* @param buffer Buffer where data from wire is written to.
* @param count The number of bytes to read from the network.
* @return The actual number of bytes read from the network.
*/
int64_t Read(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) override;
};
}}} // namespace Azure::Core::Http

View File

@ -142,6 +142,7 @@ void WinSocketSetBuffSize(curl_socket_t socket)
using Azure::Core::Http::CurlConnection;
using Azure::Core::Http::CurlConnectionPool;
using Azure::Core::Http::CurlNetworkConnection;
using Azure::Core::Http::CurlSession;
using Azure::Core::Http::CurlTransport;
using Azure::Core::Http::HttpStatusCode;
@ -154,7 +155,8 @@ std::unique_ptr<RawResponse> CurlTransport::Send(Context const& context, Request
{
// Create CurlSession to perform request
LogThis("Creating a new session.");
auto session = std::make_unique<CurlSession>(request);
auto session
= std::make_unique<CurlSession>(request, CurlConnectionPool::GetCurlConnection(request));
CURLcode performing;
// Try to send the request. If we get CURLE_UNSUPPORTED_PROTOCOL back, it means the connection is
@ -171,7 +173,8 @@ std::unique_ptr<RawResponse> CurlTransport::Send(Context const& context, Request
break;
}
// Let session be destroyed and create a new one to get a new connection
session = std::make_unique<CurlSession>(request);
session
= std::make_unique<CurlSession>(request, CurlConnectionPool::GetCurlConnection(request));
}
if (performing != CURLE_OK)
@ -194,15 +197,6 @@ CURLcode CurlSession::Perform(Context const& context)
// Set the session state
m_sessionState = SessionState::PERFORM;
// Get the socket that libcurl is using from handle. Will use this to wait while reading/writing
// into wire
auto result = curl_easy_getinfo(
this->m_connection->GetHandle(), CURLINFO_ACTIVESOCKET, &this->m_curlSocket);
if (result != CURLE_OK)
{
return result;
}
// LibCurl settings after connection is open (headers)
{
auto headers = this->m_request.GetHeaders();
@ -232,7 +226,7 @@ CURLcode CurlSession::Perform(Context const& context)
// somehow lost, libcurl will return CURLE_UNSUPPORTED_PROTOCOL
// (https://curl.haxx.se/libcurl/c/curl_easy_send.html). Return the error back.
LogThis("Send request without payload");
result = SendRawHttp(context);
auto result = SendRawHttp(context);
if (result != CURLE_OK)
{
return result;
@ -320,14 +314,11 @@ static std::unique_ptr<RawResponse> CreateHTTPResponse(std::string const& header
reinterpret_cast<const uint8_t*>(header.data() + header.size()));
}
bool CurlSession::isUploadRequest()
{
return this->m_request.GetMethod() == HttpMethod::Put
|| this->m_request.GetMethod() == HttpMethod::Post;
}
// Send buffer thru the wire
CURLcode CurlSession::SendBuffer(Context const& context, uint8_t const* buffer, size_t bufferSize)
CURLcode CurlConnection::SendBuffer(
Context const& context,
uint8_t const* buffer,
size_t bufferSize)
{
for (size_t sentBytesTotal = 0; sentBytesTotal < bufferSize;)
{
@ -346,10 +337,7 @@ CURLcode CurlSession::SendBuffer(Context const& context, uint8_t const* buffer,
{
size_t sentBytesPerRequest = 0;
sendResult = curl_easy_send(
this->m_connection->GetHandle(),
buffer + sentBytesTotal,
bufferSize - sentBytesTotal,
&sentBytesPerRequest);
m_handle, buffer + sentBytesTotal, bufferSize - sentBytesTotal, &sentBytesPerRequest);
switch (sendResult)
{
@ -362,7 +350,7 @@ CURLcode CurlSession::SendBuffer(Context const& context, uint8_t const* buffer,
{
// start polling operation with 1 min timeout
auto pollUntilSocketIsReady = pollSocketUntilEventOrTimeout(
context, this->m_curlSocket, PollSocketDirection::Write, 60000L);
context, m_curlSocket, PollSocketDirection::Write, 60000L);
if (pollUntilSocketIsReady == 0)
{
@ -384,7 +372,7 @@ CURLcode CurlSession::SendBuffer(Context const& context, uint8_t const* buffer,
};
}
#ifdef WINDOWS
WinSocketSetBuffSize(this->m_curlSocket);
WinSocketSetBuffSize(m_curlSocket);
#endif // WINDOWS
return CURLE_OK;
}
@ -411,7 +399,8 @@ CURLcode CurlSession::UploadBody(Context const& context)
{
break;
}
sendResult = SendBuffer(context, unique_buffer.get(), static_cast<size_t>(rawRequestLen));
sendResult = m_connection->SendBuffer(
context, unique_buffer.get(), static_cast<size_t>(rawRequestLen));
if (sendResult != CURLE_OK)
{
return sendResult;
@ -427,7 +416,7 @@ CURLcode CurlSession::SendRawHttp(Context const& context)
auto rawRequest = this->m_request.GetHTTPMessagePreBody();
int64_t rawRequestLen = rawRequest.size();
CURLcode sendResult = SendBuffer(
CURLcode sendResult = m_connection->SendBuffer(
context,
reinterpret_cast<uint8_t const*>(rawRequest.data()),
static_cast<size_t>(rawRequestLen));
@ -467,8 +456,8 @@ void CurlSession::ParseChunkSize(Context const& context)
if (index + 1 == this->m_innerBufferSize)
{ // on last index. Whatever we read is the BodyStart here
this->m_innerBufferSize
= ReadFromSocket(context, this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
this->m_innerBufferSize = m_connection->ReadFromSocket(
context, this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
this->m_bodyStartInBuffer = 0;
}
else
@ -482,8 +471,8 @@ void CurlSession::ParseChunkSize(Context const& context)
}
if (keepPolling)
{ // Read all internal buffer and \n was not found, pull from wire
this->m_innerBufferSize
= ReadFromSocket(context, this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
this->m_innerBufferSize = m_connection->ReadFromSocket(
context, this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
this->m_bodyStartInBuffer = 0;
}
}
@ -518,7 +507,8 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse(
{
// Try to fill internal buffer from socket.
// If response is smaller than buffer, we will get back the size of the response
bufferSize = ReadFromSocket(context, this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
bufferSize = m_connection->ReadFromSocket(
context, this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
if (bufferSize == 0)
{
// closed connection, prevent application from keep trying to pull more bytes from the wire
@ -579,8 +569,8 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse(
// Need to move body start after chunk size
if (this->m_bodyStartInBuffer == -1)
{ // if nothing on inner buffer, pull from wire
this->m_innerBufferSize
= ReadFromSocket(context, this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
this->m_innerBufferSize = m_connection->ReadFromSocket(
context, this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
this->m_bodyStartInBuffer = 0;
}
@ -619,8 +609,8 @@ int64_t CurlSession::Read(Context const& context, uint8_t* buffer, int64_t count
}
else
{ // end of buffer, pull data from wire
this->m_innerBufferSize
= ReadFromSocket(context, this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
this->m_innerBufferSize = m_connection->ReadFromSocket(
context, this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
this->m_bodyStartInBuffer = 1; // jump first char (could be \r or \n)
}
}
@ -677,7 +667,7 @@ int64_t CurlSession::Read(Context const& context, uint8_t* buffer, int64_t count
// Read from socket when no more data on internal buffer
// For chunk request, read a chunk based on chunk size
totalRead = ReadFromSocket(context, buffer, static_cast<size_t>(readRequestLength));
totalRead = m_connection->ReadFromSocket(context, buffer, static_cast<size_t>(readRequestLength));
this->m_sessionTotalRead += totalRead;
// Reading 0 bytes means closed connection.
@ -701,7 +691,7 @@ int64_t CurlSession::Read(Context const& context, uint8_t* buffer, int64_t count
}
// Read from socket and return the number of bytes taken from socket
int64_t CurlSession::ReadFromSocket(Context const& context, uint8_t* buffer, int64_t bufferSize)
int64_t CurlConnection::ReadFromSocket(Context const& context, uint8_t* buffer, int64_t bufferSize)
{
// loop until read result is not CURLE_AGAIN
// Next loop is expected to be called at most 2 times:
@ -716,8 +706,7 @@ int64_t CurlSession::ReadFromSocket(Context const& context, uint8_t* buffer, int
size_t readBytes = 0;
for (CURLcode readResult = CURLE_AGAIN; readResult == CURLE_AGAIN;)
{
readResult = curl_easy_recv(
this->m_connection->GetHandle(), buffer, static_cast<size_t>(bufferSize), &readBytes);
readResult = curl_easy_recv(m_handle, buffer, static_cast<size_t>(bufferSize), &readBytes);
switch (readResult)
{
@ -725,7 +714,7 @@ int64_t CurlSession::ReadFromSocket(Context const& context, uint8_t* buffer, int
{
// start polling operation
auto pollUntilSocketIsReady = pollSocketUntilEventOrTimeout(
context, this->m_curlSocket, PollSocketDirection::Read, 60000L);
context, m_curlSocket, PollSocketDirection::Read, 60000L);
if (pollUntilSocketIsReady == 0)
{
@ -753,7 +742,7 @@ int64_t CurlSession::ReadFromSocket(Context const& context, uint8_t* buffer, int
}
}
#ifdef WINDOWS
WinSocketSetBuffSize(this->m_curlSocket);
WinSocketSetBuffSize(m_curlSocket);
#endif // WINDOWS
return readBytes;
}
@ -1006,24 +995,24 @@ int64_t CurlSession::ResponseBufferParser::BuildHeader(
return indexOfEndOfStatusLine + 1 - buffer;
}
std::mutex CurlConnectionPool::s_connectionPoolMutex;
std::map<std::string, std::list<std::unique_ptr<CurlConnection>>>
CurlConnectionPool::s_connectionPoolIndex;
std::mutex CurlConnectionPool::ConnectionPoolMutex;
std::map<std::string, std::list<std::unique_ptr<CurlNetworkConnection>>>
CurlConnectionPool::ConnectionPoolIndex;
int32_t CurlConnectionPool::s_connectionCounter = 0;
bool CurlConnectionPool::s_isCleanConnectionsRunning = false;
std::unique_ptr<CurlConnection> CurlConnectionPool::GetCurlConnection(Request& request)
std::unique_ptr<CurlNetworkConnection> CurlConnectionPool::GetCurlConnection(Request& request)
{
std::string const& host = request.GetUrl().GetHost();
{
// Critical section. Needs to own s_connectionPoolMutex before executing
// Critical section. Needs to own ConnectionPoolMutex before executing
// Lock mutex to access connection pool. mutex is unlock as soon as lock is out of scope
std::lock_guard<std::mutex> lock(CurlConnectionPool::s_connectionPoolMutex);
std::lock_guard<std::mutex> lock(CurlConnectionPool::ConnectionPoolMutex);
// get a ref to the pool from the map of pools
auto hostPoolIndex = CurlConnectionPool::s_connectionPoolIndex.find(host);
if (hostPoolIndex != CurlConnectionPool::s_connectionPoolIndex.end()
auto hostPoolIndex = CurlConnectionPool::ConnectionPoolIndex.find(host);
if (hostPoolIndex != CurlConnectionPool::ConnectionPoolIndex.end()
&& hostPoolIndex->second.size() > 0)
{
// get ref to first connection
@ -1038,7 +1027,7 @@ std::unique_ptr<CurlConnection> CurlConnectionPool::GetCurlConnection(Request& r
// Remove index if there are no more connections
if (hostPoolIndex->second.size() == 0)
{
CurlConnectionPool::s_connectionPoolIndex.erase(hostPoolIndex);
CurlConnectionPool::ConnectionPoolIndex.erase(hostPoolIndex);
}
// return connection ref
@ -1048,45 +1037,46 @@ std::unique_ptr<CurlConnection> CurlConnectionPool::GetCurlConnection(Request& r
// Creating a new connection is thread safe. No need to lock mutex here.
// No available connection for the pool for the required host. Create one
auto newConnection = std::make_unique<CurlConnection>(host);
CURL* newHandle = curl_easy_init();
// Libcurl setup before open connection (url, connect_only, timeout)
SetLibcurlOption(
newConnection->GetHandle(),
newHandle,
CURLOPT_URL,
request.GetUrl().GetAbsoluteUrl().data(),
Details::c_DefaultFailedToGetNewConnectionTemplate + host);
SetLibcurlOption(
newConnection->GetHandle(),
newHandle,
CURLOPT_CONNECT_ONLY,
1L,
Details::c_DefaultFailedToGetNewConnectionTemplate + host);
// curl_easy_setopt(newConnection->GetHandle(), CURLOPT_VERBOSE, 1L);
// curl_easy_setopt(newHandle, CURLOPT_VERBOSE, 1L);
// Set timeout to 24h. Libcurl will fail uploading on windows if timeout is:
// timeout >= 25 days. Fails as soon as trying to upload any data
// 25 days < timeout > 1 days. Fail on huge uploads ( > 1GB)
SetLibcurlOption(
newConnection->GetHandle(),
newHandle,
CURLOPT_TIMEOUT,
60L * 60L * 24L,
Details::c_DefaultFailedToGetNewConnectionTemplate + host);
auto result = curl_easy_perform(newConnection->GetHandle());
if (result != CURLE_OK)
auto performResult = curl_easy_perform(newHandle);
if (performResult != CURLE_OK)
{
throw Http::TransportException(
Details::c_DefaultFailedToGetNewConnectionTemplate + host + ". "
+ std::string(curl_easy_strerror(result)));
+ std::string(curl_easy_strerror(performResult)));
}
return newConnection;
return std::make_unique<CurlConnection>(newHandle, host);
}
// Move the connection back to the connection pool. Push it to the front so it becomes the first
// connection to be picked next time some one ask for a connection to the pool (LIFO)
void CurlConnectionPool::MoveConnectionBackToPool(
std::unique_ptr<CurlConnection> connection,
std::unique_ptr<CurlNetworkConnection> connection,
Http::HttpStatusCode lastStatusCode)
{
auto code = static_cast<std::underlying_type<Http::HttpStatusCode>::type>(lastStatusCode);
@ -1098,8 +1088,8 @@ void CurlConnectionPool::MoveConnectionBackToPool(
}
// Lock mutex to access connection pool. mutex is unlock as soon as lock is out of scope
std::lock_guard<std::mutex> lock(CurlConnectionPool::s_connectionPoolMutex);
auto& hostPool = CurlConnectionPool::s_connectionPoolIndex[connection->GetHost()];
std::lock_guard<std::mutex> lock(CurlConnectionPool::ConnectionPoolMutex);
auto& hostPool = CurlConnectionPool::ConnectionPoolIndex[connection->GetHost()];
// update the time when connection was moved back to pool
connection->updateLastUsageTime();
hostPool.push_front(std::move(connection));
@ -1125,7 +1115,7 @@ void CurlConnectionPool::CleanUp()
{
// take mutex for reading the pool
std::lock_guard<std::mutex> lock(CurlConnectionPool::s_connectionPoolMutex);
std::lock_guard<std::mutex> lock(CurlConnectionPool::ConnectionPoolMutex);
if (CurlConnectionPool::s_connectionCounter == 0)
{
@ -1135,8 +1125,8 @@ void CurlConnectionPool::CleanUp()
}
// loop the connection pool index
for (auto index = CurlConnectionPool::s_connectionPoolIndex.begin();
index != CurlConnectionPool::s_connectionPoolIndex.end();
for (auto index = CurlConnectionPool::ConnectionPoolIndex.begin();
index != CurlConnectionPool::ConnectionPoolIndex.end();
index++)
{
if (index->second.size() == 0)

View File

@ -23,6 +23,7 @@ include(GoogleTest)
add_executable (
${TARGET_NAME}
context.cpp
curl_session_test.cpp
datetime.cpp
http.cpp
logging.cpp
@ -37,7 +38,7 @@ add_executable (
uuid.cpp
)
target_link_libraries(${TARGET_NAME} PRIVATE azure-core gtest)
target_link_libraries(${TARGET_NAME} PRIVATE azure-core gtest gmock)
# gtest_add_tests will scan the test from azure-core-test and call add_test
# for each test to ctest. This enables `ctest -r` to run specific tests directly.

View File

@ -0,0 +1,44 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
/**
* @file
* @brief The base class for testing a curl session.
*
* @remark The curl connection mock is defined here.
*
*/
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <string>
#include <azure/core/http/curl/curl.hpp>
namespace Azure { namespace Core { namespace Test {
class CurlSession : public ::testing::Test {
};
/**
* @brief mock the network connection
*
*/
class MockCurlNetworkConnection : public Azure::Core::Http::CurlNetworkConnection {
public:
MOCK_METHOD(std::string const&, GetHost, (), (const, override));
MOCK_METHOD(void, updateLastUsageTime, (), (override));
MOCK_METHOD(bool, isExpired, (), (override));
MOCK_METHOD(
int64_t,
ReadFromSocket,
(Context const& context, uint8_t* buffer, int64_t bufferSize),
(override));
MOCK_METHOD(
CURLcode,
SendBuffer,
(Context const& context, uint8_t const* buffer, size_t bufferSize),
(override));
};
}}} // namespace Azure::Core::Test

View File

@ -0,0 +1,44 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
#include "curl_session.hpp"
#include <azure/core/http/curl/curl.hpp>
#include <azure/core/http/http.hpp>
using ::testing::_;
using ::testing::DoAll;
using ::testing::Return;
using ::testing::SetArrayArgument;
namespace Azure { namespace Core { namespace Test {
TEST_F(CurlSession, successCall)
{
std::string response(
"HTTP/1.1 200 Ok\r\nContent-Type: text/html; charset=UTF-8\r\n\r\n{\r\n\"somejson\":45\r}");
// Can't mock the curMock directly from a unique ptr, heap allocate it first and then make a
// unique ptr for it
MockCurlNetworkConnection* curlMock = new MockCurlNetworkConnection();
EXPECT_CALL(*curlMock, SendBuffer(_, _, _)).WillOnce(Return(CURLE_OK));
EXPECT_CALL(*curlMock, ReadFromSocket(_, _, _))
.WillOnce(DoAll(
SetArrayArgument<1>(response.data(), response.data() + response.size()),
Return(response.size())));
// Create the unique ptr to take care about memory free at the end
std::unique_ptr<MockCurlNetworkConnection> uniqueCurlMock(curlMock);
// Simulate a request to be sent
Azure::Core::Http::Url url("http://microsoft.com");
Azure::Core::Http::Request request(Azure::Core::Http::HttpMethod::Get, url);
// Move the curlMock to build a session and then send the request
// The session will get the response we mock before, so it will pass for this GET
auto session
= std::make_unique<Azure::Core::Http::CurlSession>(request, std::move(uniqueCurlMock));
EXPECT_NO_THROW(session->Perform(Azure::Core::GetApplicationContext()));
}
}}} // namespace Azure::Core::Test

View File

@ -1,6 +1,12 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
/**
* @file
* @brief The base class for the HTTP test cases.
*
*/
#include "gtest/gtest.h"
#include <azure/core/http/http.hpp>

View File

@ -1,6 +1,14 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
/**
* @file
* @brief The base class for the common bahavior of the transport adapter tests.
*
* @brief Any http trasport adapter can be used for this tests.
*
*/
#include "gtest/gtest.h"
#include <azure/core/http/body_stream.hpp>
#include <azure/core/http/curl/curl.hpp>