use size_t for buffers in libcurl internal impl (#2376)

* use size_t for buffers in libcurl internal impl
This commit is contained in:
Victor Vazquez 2021-06-14 14:28:36 -07:00 committed by GitHub
parent c6a00a97b4
commit 81e0d5aab0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 133 additions and 97 deletions

View File

@ -399,7 +399,7 @@ CURLcode CurlSession::Perform(Context const& context)
}
Log::Write(Logger::Level::Verbose, LogMsgPrefix + "Upload payload");
if (this->m_bodyStartInBuffer > 0)
if (this->m_bodyStartInBuffer < this->m_innerBufferSize)
{
// If internal buffer has more data after the 100-continue means Server return an error.
// We don't need to upload body, just parse the response from Server and return
@ -556,7 +556,7 @@ CURLcode CurlSession::SendRawHttp(Context const& context)
{
// something like GET /path HTTP1.0 \r\nheaders\r\n
auto rawRequest = GetHTTPMessagePreBody(this->m_request);
int64_t rawRequestLen = rawRequest.size();
auto rawRequestLen = rawRequest.size();
CURLcode sendResult = m_connection->SendBuffer(
reinterpret_cast<uint8_t const*>(rawRequest.data()),
@ -581,20 +581,20 @@ void CurlSession::ParseChunkSize(Context const& context)
// Move to after chunk size
for (bool keepPolling = true; keepPolling;)
{
for (int64_t index = this->m_bodyStartInBuffer, i = 0; index < this->m_innerBufferSize;
index++, i++)
for (size_t index = this->m_bodyStartInBuffer, iteration = 0; index < this->m_innerBufferSize;
index++, iteration++)
{
strChunkSize.append(reinterpret_cast<char*>(&this->m_readBuffer[index]), 1);
if (i > 1 && this->m_readBuffer[index] == '\n')
if (iteration > 1 && this->m_readBuffer[index] == '\n')
{
// get chunk size. Chunk size comes in Hex value
try
{
this->m_chunkSize = static_cast<int64_t>(std::stoull(strChunkSize, nullptr, 16));
// Required cast for MSVC x86
this->m_chunkSize = static_cast<size_t>(std::stoull(strChunkSize, nullptr, 16));
}
catch (const std::invalid_argument& ex)
catch (std::invalid_argument const&)
{
(void)ex;
// Server can return something like `\n\r\n` for a chunk of zero length data. This is
// allowed by RFC. `stoull` will throw invalid_argument if there is not at least one hex
// digit to be parsed. For those cases, we consider the response as zero-length.
@ -656,23 +656,22 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse(
bool reuseInternalBuffer)
{
auto parser = ResponseBufferParser();
auto bufferSize = int64_t();
auto bufferSize = size_t();
// Keep reading until all headers were read
while (!parser.IsParseCompleted())
{
int64_t bytesParsed = 0;
size_t bytesParsed = 0;
if (reuseInternalBuffer)
{
// parse from internal buffer. This means previous read from server got more than one
// response. This happens when Server returns a 100-continue plus an error code
bufferSize = this->m_innerBufferSize - this->m_bodyStartInBuffer;
bytesParsed = parser.Parse(
this->m_readBuffer + this->m_bodyStartInBuffer, static_cast<size_t>(bufferSize));
bytesParsed = parser.Parse(this->m_readBuffer + this->m_bodyStartInBuffer, bufferSize);
// if parsing from internal buffer is not enough, do next read from wire
reuseInternalBuffer = false;
// reset body start
this->m_bodyStartInBuffer = -1;
this->m_bodyStartInBuffer = _detail::DefaultLibcurlReaderSize;
}
else
{
@ -687,7 +686,7 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse(
"Connection was closed by the server while trying to read a response");
}
// returns the number of bytes parsed up to the body Start
bytesParsed = parser.Parse(this->m_readBuffer, static_cast<size_t>(bufferSize));
bytesParsed = parser.Parse(this->m_readBuffer, bufferSize);
}
if (bytesParsed < bufferSize)
@ -697,7 +696,7 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse(
}
this->m_response = parser.ExtractResponse();
this->m_innerBufferSize = static_cast<size_t>(bufferSize);
this->m_innerBufferSize = bufferSize;
this->m_lastStatusCode = this->m_response->GetStatusCode();
// For Head request, set the length of body response to 0.
@ -709,7 +708,7 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse(
|| this->m_lastStatusCode == HttpStatusCode::NoContent)
{
this->m_contentLength = 0;
this->m_bodyStartInBuffer = -1;
this->m_bodyStartInBuffer = _detail::DefaultLibcurlReaderSize;
return;
}
@ -724,6 +723,7 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse(
return;
}
// No content-length from headers, check transfer-encoding
this->m_contentLength = -1;
auto isTransferEncodingHeaderInResponse = headers.find("transfer-encoding");
if (isTransferEncodingHeaderInResponse != headers.end())
@ -738,10 +738,17 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse(
this->m_isChunkedResponseType = true;
// Need to move body start after chunk size
if (this->m_bodyStartInBuffer == -1)
if (this->m_bodyStartInBuffer >= this->m_innerBufferSize)
{ // if nothing on inner buffer, pull from wire
this->m_innerBufferSize = m_connection->ReadFromSocket(
this->m_readBuffer, _detail::DefaultLibcurlReaderSize, context);
if (this->m_innerBufferSize == 0)
{
// closed connection, prevent application from keep trying to pull more bytes from the
// wire
throw TransportException(
"Connection was closed by the server while trying to read a response");
}
this->m_bodyStartInBuffer = 0;
}
@ -758,13 +765,25 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse(
*/
}
/**
* @brief Reads data from network and validates the data is equal to \p expected.
*
* @param expected The data that should came from the wire.
* @param context A context to control the request lifetime.
*/
void CurlSession::ReadExpected(uint8_t expected, Context const& context)
{
if (this->m_bodyStartInBuffer == -1 || this->m_bodyStartInBuffer == this->m_innerBufferSize)
if (this->m_bodyStartInBuffer >= this->m_innerBufferSize)
{
// end of buffer, pull data from wire
this->m_innerBufferSize = m_connection->ReadFromSocket(
this->m_readBuffer, _detail::DefaultLibcurlReaderSize, context);
if (this->m_innerBufferSize == 0)
{
// closed connection, prevent application from keep trying to pull more bytes from the wire
throw TransportException(
"Connection was closed by the server while trying to read a response");
}
this->m_bodyStartInBuffer = 0;
}
auto data = this->m_readBuffer[this->m_bodyStartInBuffer];
@ -812,47 +831,40 @@ size_t CurlSession::OnRead(uint8_t* buffer, size_t count, Context const& context
}
auto totalRead = size_t();
int64_t readRequestLength = this->m_isChunkedResponseType
? (std::min)(this->m_chunkSize - this->m_sessionTotalRead, static_cast<int64_t>(count))
: static_cast<int64_t>(count);
size_t readRequestLength = this->m_isChunkedResponseType
? (std::min)(this->m_chunkSize - this->m_sessionTotalRead, count)
: count;
// For responses with content-length, avoid trying to read beyond Content-length or
// libcurl could return a second response as BadRequest.
// https://github.com/Azure/azure-sdk-for-cpp/issues/306
if (this->m_contentLength > 0)
{
auto remainingBodyContent = this->m_contentLength - this->m_sessionTotalRead;
size_t remainingBodyContent
= static_cast<size_t>(this->m_contentLength) - this->m_sessionTotalRead;
readRequestLength = (std::min)(readRequestLength, remainingBodyContent);
}
// Take data from inner buffer if any
if (this->m_bodyStartInBuffer >= 0)
if (this->m_bodyStartInBuffer < this->m_innerBufferSize)
{
// still have data to take from innerbuffer
// TODO: Change the fields to be size_t for a less error-prone implementation
// The casts here are safe to do because we know the buffers and the offset are within the
// range.
Azure::Core::IO::MemoryBodyStream innerBufferMemoryStream(
this->m_readBuffer + this->m_bodyStartInBuffer,
static_cast<size_t>(this->m_innerBufferSize - this->m_bodyStartInBuffer));
this->m_innerBufferSize - this->m_bodyStartInBuffer);
// From code inspection, it is guaranteed that the readRequestLength will fit within size_t
// since count is bounded by size_t.
totalRead
= innerBufferMemoryStream.Read(buffer, static_cast<size_t>(readRequestLength), context);
totalRead = innerBufferMemoryStream.Read(buffer, readRequestLength, context);
this->m_bodyStartInBuffer += totalRead;
this->m_sessionTotalRead += totalRead;
if (this->m_bodyStartInBuffer == this->m_innerBufferSize)
{
this->m_bodyStartInBuffer = -1; // read everything from inner buffer already
}
return totalRead;
}
// Head request have contentLength = 0, so we won't read more, just return 0
// Also if we have already read all contentLength
if (this->m_sessionTotalRead == this->m_contentLength || this->IsEOF())
if (this->m_sessionTotalRead == static_cast<size_t>(this->m_contentLength) || this->IsEOF())
{
return 0;
}
@ -949,9 +961,9 @@ size_t CurlConnection::ReadFromSocket(uint8_t* buffer, size_t bufferSize, Contex
std::unique_ptr<RawResponse> CurlSession::ExtractResponse() { return std::move(this->m_response); }
int64_t CurlSession::ResponseBufferParser::Parse(
size_t CurlSession::ResponseBufferParser::Parse(
uint8_t const* const buffer,
int64_t const bufferSize)
size_t const bufferSize)
{
if (this->m_parseCompleted)
{
@ -959,7 +971,7 @@ int64_t CurlSession::ResponseBufferParser::Parse(
}
// Read all buffer until \r\n is found
int64_t start = 0, index = 0;
size_t start = 0, index = 0;
for (; index < bufferSize; index++)
{
if (buffer[index] == '\r')
@ -1069,9 +1081,9 @@ int64_t CurlSession::ResponseBufferParser::Parse(
}
// Finds delimiter '\r' as the end of the
int64_t CurlSession::ResponseBufferParser::BuildStatusCode(
size_t CurlSession::ResponseBufferParser::BuildStatusCode(
uint8_t const* const buffer,
int64_t const bufferSize)
size_t const bufferSize)
{
if (this->state != ResponseParserState::StatusLine)
{
@ -1120,9 +1132,9 @@ int64_t CurlSession::ResponseBufferParser::BuildStatusCode(
}
// Finds delimiter '\r' as the end of the
int64_t CurlSession::ResponseBufferParser::BuildHeader(
size_t CurlSession::ResponseBufferParser::BuildHeader(
uint8_t const* const buffer,
int64_t const bufferSize)
size_t const bufferSize)
{
if (this->state != ResponseParserState::Headers)
{

View File

@ -119,7 +119,7 @@ namespace Azure { namespace Core { namespace Http { namespace _detail {
// Makes possible to know the number of current connections in the connection pool for an
// index
int64_t ConnectionsOnPool(std::string const& host) { return ConnectionPoolIndex[host].size(); };
size_t ConnectionsOnPool(std::string const& host) { return ConnectionPoolIndex[host].size(); };
std::thread m_cleanThread;
};

View File

@ -20,8 +20,8 @@ namespace Azure { namespace Core { namespace Http {
namespace _detail {
// libcurl CURL_MAX_WRITE_SIZE is 64k. Using same value for default uploading chunk size.
// This can be customizable in the HttpRequest
constexpr static int64_t DefaultUploadChunkSize = 1024 * 64;
constexpr static auto DefaultLibcurlReaderSize = 1024;
constexpr static size_t DefaultUploadChunkSize = 1024 * 64;
constexpr static size_t DefaultLibcurlReaderSize = 1024;
// Run time error template
constexpr static const char* DefaultFailedToGetNewConnectionTemplate
= "Fail to get a new connection for: ";
@ -35,7 +35,7 @@ namespace Azure { namespace Core { namespace Http {
constexpr static int32_t DefaultConnectionExpiredMilliseconds = 1000 * 60;
// Define the maximun allowed connections per host-index in the pool. If this number is reached
// for the host-index, next connections trying to be added to the pool will be ignored.
constexpr static size_t MaxConnectionsPerIndex = 1024;
constexpr static int32_t MaxConnectionsPerIndex = 1024;
} // namespace _detail
/**

View File

@ -146,7 +146,7 @@ namespace Azure { namespace Core { namespace Http {
* @param bufferSize Indicates the size of the buffer.
* @return Returns the index of the last parsed position from buffer.
*/
int64_t BuildStatusCode(uint8_t const* const buffer, int64_t const bufferSize);
size_t BuildStatusCode(uint8_t const* const buffer, size_t const bufferSize);
/**
* @brief This method is invoked by the Parsing process if the internal state is set to
@ -159,7 +159,7 @@ namespace Azure { namespace Core { namespace Http {
* value is smaller than the body size, means there is part of the body response in the
* buffer.
*/
int64_t BuildHeader(uint8_t const* const buffer, int64_t const bufferSize);
size_t BuildHeader(uint8_t const* const buffer, size_t const bufferSize);
public:
/**
@ -182,8 +182,9 @@ namespace Azure { namespace Core { namespace Http {
* Returning a value smaller than the buffer size will likely indicate that the HTTP
* RawResponse is completed and that the rest of the buffer contains part of the response
* body.
*
*/
int64_t Parse(uint8_t const* const buffer, int64_t const bufferSize);
size_t Parse(uint8_t const* const buffer, size_t const bufferSize);
/**
* @brief Indicates when the parser has completed parsing and building the HTTP RawResponse.
@ -238,8 +239,10 @@ namespace Azure { namespace Core { namespace Http {
* inner buffer. When a libcurl stream tries to read part of the body, this field will help to
* decide how much data to take from the inner buffer before pulling more data from network.
*
* @note The initial value is set to the size of the inner buffer as a sentinel that indicate
* that the buffer has not data or all data has already taken from it.
*/
int64_t m_bodyStartInBuffer = -1;
size_t m_bodyStartInBuffer = _detail::DefaultLibcurlReaderSize;
/**
* @brief Control field to handle the number of bytes containing relevant data within the
@ -247,7 +250,7 @@ namespace Azure { namespace Core { namespace Http {
* from wire into it, it can be holding less then N bytes.
*
*/
int64_t m_innerBufferSize = _detail::DefaultLibcurlReaderSize;
size_t m_innerBufferSize = _detail::DefaultLibcurlReaderSize;
bool m_isChunkedResponseType = false;
@ -264,12 +267,12 @@ namespace Azure { namespace Core { namespace Http {
/**
* @brief For chunked responses, this field knows the size of the current chuck size server
* will de sending
* will be sending.
*
*/
int64_t m_chunkSize = 0;
size_t m_chunkSize = 0;
int64_t m_sessionTotalRead = 0;
size_t m_sessionTotalRead = 0;
/**
* @brief Internal buffer from a session used to read bytes from a socket. This buffer is only
@ -333,7 +336,9 @@ namespace Azure { namespace Core { namespace Http {
*/
bool IsEOF()
{
auto eof = m_isChunkedResponseType ? m_chunkSize == 0 : m_contentLength == m_sessionTotalRead;
auto eof = m_isChunkedResponseType
? m_chunkSize == 0
: static_cast<size_t>(m_contentLength) == m_sessionTotalRead;
// `IsEOF` is called before trying to move a connection back to the connection pool.
// If the session state is `PERFORM` it means the request could not complete an upload

View File

@ -81,25 +81,31 @@ int main()
Azure::Core::Http::WinHttpTransportOptions winHttpOptions;
auto implementationClient = std::make_shared<Azure::Core::Http::WinHttpTransport>(winHttpOptions);
#endif
try
{
FaultInjectionClientOptions options;
options.m_url = Azure::Core::Url("https://localhost:7778");
options.m_transport = implementationClient;
FaultInjectionClient client(options);
FaultInjectionClientOptions options;
options.m_url = Azure::Core::Url("https://localhost:7778");
options.m_transport = implementationClient;
FaultInjectionClient client(options);
std::cout << "Sending request..." << std::endl;
std::cout << "Sending request..." << std::endl;
Azure::Core::Context context;
auto request = Azure::Core::Http::Request(
Azure::Core::Http::HttpMethod::Get, Azure::Core::Url("https://www.example.org"));
auto response = client.Send(request, context);
// Make sure to pull all bytes from network.
auto body = response->ExtractBodyStream()->ReadToEnd();
std::cout << "Status Code: "
<< static_cast<typename std::underlying_type<Azure::Core::Http::HttpStatusCode>::type>(
response->GetStatusCode())
<< std::endl;
Azure::Core::Context context;
auto request = Azure::Core::Http::Request(
Azure::Core::Http::HttpMethod::Get, Azure::Core::Url("https://www.example.org"));
auto response = client.Send(request, context);
// Make sure to pull all bytes from network.
auto body = response->ExtractBodyStream()->ReadToEnd();
std::cout
<< "Status Code: "
<< static_cast<typename std::underlying_type<Azure::Core::Http::HttpStatusCode>::type>(
response->GetStatusCode())
<< std::endl;
}
catch (std::exception const&)
{
std::cout << "Check fault injector server is running.";
}
return 0;
}

View File

@ -21,6 +21,7 @@ namespace Azure { namespace Core { namespace Test {
{
std::string response(
"HTTP/1.1 200 Ok\r\nContent-Type: text/html; charset=UTF-8\r\n\r\n{\r\n\"somejson\":45\r}");
int32_t const payloadSize = static_cast<int32_t>(response.size());
// Can't mock the curMock directly from a unique ptr, heap allocate it first and then make a
// unique ptr for it
@ -28,8 +29,8 @@ namespace Azure { namespace Core { namespace Test {
EXPECT_CALL(*curlMock, SendBuffer(_, _, _)).WillOnce(Return(CURLE_OK));
EXPECT_CALL(*curlMock, ReadFromSocket(_, _, _))
.WillOnce(DoAll(
SetArrayArgument<0>(response.data(), response.data() + response.size()),
Return(response.size())));
SetArrayArgument<0>(response.data(), response.data() + payloadSize),
Return(payloadSize)));
// Create the unique ptr to take care about memory free at the end
std::unique_ptr<MockCurlNetworkConnection> uniqueCurlMock(curlMock);
@ -51,6 +52,7 @@ namespace Azure { namespace Core { namespace Test {
// chunked response with no content and no size
std::string response("HTTP/1.1 200 Ok\r\ntransfer-encoding: chunked\r\n\r\n\n\r\n");
std::string connectionKey("connection-key");
int32_t const payloadSize = static_cast<int32_t>(response.size());
// Can't mock the curMock directly from a unique ptr, heap allocate it first and then make a
// unique ptr for it
@ -58,8 +60,8 @@ namespace Azure { namespace Core { namespace Test {
EXPECT_CALL(*curlMock, SendBuffer(_, _, _)).WillOnce(Return(CURLE_OK));
EXPECT_CALL(*curlMock, ReadFromSocket(_, _, _))
.WillOnce(DoAll(
SetArrayArgument<0>(response.data(), response.data() + response.size()),
Return(response.size())));
SetArrayArgument<0>(response.data(), response.data() + payloadSize),
Return(payloadSize)));
EXPECT_CALL(*curlMock, GetConnectionKey()).WillRepeatedly(ReturnRef(connectionKey));
EXPECT_CALL(*curlMock, UpdateLastUsageTime());
EXPECT_CALL(*curlMock, DestructObj());
@ -89,6 +91,8 @@ namespace Azure { namespace Core { namespace Test {
std::string response("HTTP/1.1 200 Ok\r\ntransfer-encoding: chunked\r\n\r\n9\r\n");
std::string response2("123456789\r\n0\r\n\rx");
std::string connectionKey("connection-key");
int32_t const payloadSize = static_cast<int32_t>(response.size());
int32_t const payloadSize2 = static_cast<int32_t>(response2.size());
// Can't mock the curMock directly from a unique ptr, heap allocate it first and then make a
// unique ptr for it
@ -96,11 +100,11 @@ namespace Azure { namespace Core { namespace Test {
EXPECT_CALL(*curlMock, SendBuffer(_, _, _)).WillOnce(Return(CURLE_OK));
EXPECT_CALL(*curlMock, ReadFromSocket(_, _, _))
.WillOnce(DoAll(
SetArrayArgument<0>(response.data(), response.data() + response.size()),
Return(response.size())))
SetArrayArgument<0>(response.data(), response.data() + payloadSize),
Return(payloadSize)))
.WillOnce(DoAll(
SetArrayArgument<0>(response2.data(), response2.data() + response2.size()),
Return(response2.size())));
SetArrayArgument<0>(response2.data(), response2.data() + payloadSize2),
Return(payloadSize2)));
EXPECT_CALL(*curlMock, GetConnectionKey()).WillRepeatedly(ReturnRef(connectionKey));
EXPECT_CALL(*curlMock, UpdateLastUsageTime());
EXPECT_CALL(*curlMock, DestructObj());
@ -144,6 +148,15 @@ namespace Azure { namespace Core { namespace Test {
std::string response6("123");
std::string response7("\r\n0\r\n");
std::string response8("\r\n");
int32_t const payloadSize0 = static_cast<int32_t>(response0.size());
int32_t const payloadSize1 = static_cast<int32_t>(response1.size());
int32_t const payloadSize2 = static_cast<int32_t>(response2.size());
int32_t const payloadSize3 = static_cast<int32_t>(response3.size());
int32_t const payloadSize4 = static_cast<int32_t>(response4.size());
int32_t const payloadSize5 = static_cast<int32_t>(response5.size());
int32_t const payloadSize6 = static_cast<int32_t>(response6.size());
int32_t const payloadSize7 = static_cast<int32_t>(response7.size());
int32_t const payloadSize8 = static_cast<int32_t>(response8.size());
std::string connectionKey("connection-key");
@ -153,32 +166,32 @@ namespace Azure { namespace Core { namespace Test {
EXPECT_CALL(*curlMock, SendBuffer(_, _, _)).WillOnce(Return(CURLE_OK));
EXPECT_CALL(*curlMock, ReadFromSocket(_, _, _))
.WillOnce(DoAll(
SetArrayArgument<0>(response0.data(), response0.data() + response0.size()),
Return(response0.size())))
SetArrayArgument<0>(response0.data(), response0.data() + payloadSize0),
Return(payloadSize0)))
.WillOnce(DoAll(
SetArrayArgument<0>(response1.data(), response1.data() + response1.size()),
Return(response1.size())))
SetArrayArgument<0>(response1.data(), response1.data() + payloadSize1),
Return(payloadSize1)))
.WillOnce(DoAll(
SetArrayArgument<0>(response2.data(), response2.data() + response2.size()),
Return(response2.size())))
SetArrayArgument<0>(response2.data(), response2.data() + payloadSize2),
Return(payloadSize2)))
.WillOnce(DoAll(
SetArrayArgument<0>(response3.data(), response3.data() + response3.size()),
Return(response3.size())))
SetArrayArgument<0>(response3.data(), response3.data() + payloadSize3),
Return(payloadSize3)))
.WillOnce(DoAll(
SetArrayArgument<0>(response4.data(), response4.data() + response4.size()),
Return(response4.size())))
SetArrayArgument<0>(response4.data(), response4.data() + payloadSize4),
Return(payloadSize4)))
.WillOnce(DoAll(
SetArrayArgument<0>(response5.data(), response5.data() + response5.size()),
Return(response5.size())))
SetArrayArgument<0>(response5.data(), response5.data() + payloadSize5),
Return(payloadSize5)))
.WillOnce(DoAll(
SetArrayArgument<0>(response6.data(), response6.data() + response6.size()),
Return(response6.size())))
SetArrayArgument<0>(response6.data(), response6.data() + payloadSize6),
Return(payloadSize6)))
.WillOnce(DoAll(
SetArrayArgument<0>(response7.data(), response7.data() + response7.size()),
Return(response7.size())))
SetArrayArgument<0>(response7.data(), response7.data() + payloadSize7),
Return(payloadSize7)))
.WillOnce(DoAll(
SetArrayArgument<0>(response8.data(), response8.data() + response8.size()),
Return(response8.size())));
SetArrayArgument<0>(response8.data(), response8.data() + payloadSize8),
Return(payloadSize8)));
EXPECT_CALL(*curlMock, GetConnectionKey()).WillRepeatedly(ReturnRef(connectionKey));
EXPECT_CALL(*curlMock, UpdateLastUsageTime());
EXPECT_CALL(*curlMock, DestructObj());