Make sure Read will always start on body start. (#256)

* make stream use int64_t instead of uint64_t
This commit is contained in:
Victor Vazquez 2020-07-10 23:02:18 -07:00 committed by GitHub
parent 43dcc6c495
commit c699888daa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 506 additions and 252 deletions

View File

@ -34,7 +34,7 @@ if(BUILD_TESTING)
# tests
include(AddGoogleTest)
enable_testing ()
add_subdirectory(sdk/core/azure-core/test)
add_subdirectory(sdk/core/azure-core/test/ut)
endif()
# compiler warning flags globally
@ -45,5 +45,7 @@ include(${CMAKE_SOURCE_DIR}/cmake-modules/doxygen_common.cmake)
# sub-projects
add_subdirectory(sdk/core/azure-core)
add_subdirectory(sdk/samples/http_client/curl) # will work only if BUILD_CURL_TRANSPORT=ON
if(BUILD_CURL_TRANSPORT)
add_subdirectory(sdk/core/azure-core/test/e2e) # will work only if BUILD_CURL_TRANSPORT=ON
endif()
add_subdirectory(sdk/storage)

View File

@ -3,13 +3,14 @@
#pragma once
#include <string>
#include <internal/contract.hpp>
#include <string>
#define AZURE_UNREFERENCED_PARAMETER(x) ((void) (x));
#define AZURE_UNREFERENCED_PARAMETER(x) ((void)(x));
namespace Azure { namespace Core { namespace Details {
bool LocaleInvariantCaseInsensitiveEqual(const std::string& lhs, const std::string& rhs) noexcept;
bool LocaleInvariantCaseInsensitiveEqual(const std::string& lhs, const std::string& rhs) noexcept;
std::string const ToLower(const std::string& src) noexcept;
}}} // namespace Azure::Core::Details

View File

@ -14,7 +14,8 @@
namespace Azure { namespace Core { namespace Http {
constexpr auto UploadStreamPageSize = 1024;
// libcurl CURL_MAX_WRITE_SIZE is 16k.
constexpr auto UploadStreamPageSize = 1024 * 64;
constexpr auto LibcurlReaderSize = 1024;
/**
@ -43,18 +44,6 @@ namespace Azure { namespace Core { namespace Http {
EndOfHeaders,
};
/**
* @brief Defines the strategy to read the body from an HTTP Response
*
*/
enum class ResponseBodyLengthType
{
ContentLength,
Chunked,
ReadToCloseConnection,
NoBody,
};
/**
* @brief stateful component used to read and parse a buffer to construct a valid HTTP Response.
*
@ -87,6 +76,8 @@ namespace Azure { namespace Core { namespace Http {
*/
bool m_parseCompleted;
bool m_delimiterStartInPrevPosition;
/**
* @brief This buffer is used when the parsed buffer doesn't contain a completed token. The
* content from the buffer will be appended to this buffer. Once that a delimiter is found,
@ -135,6 +126,7 @@ namespace Azure { namespace Core { namespace Http {
{
state = ResponseParserState::StatusLine;
this->m_parseCompleted = false;
this->m_delimiterStartInPrevPosition = false;
}
// Parse contents of buffer to construct HttpResponse. Returns the index of the last parsed
@ -207,7 +199,7 @@ namespace Azure { namespace Core { namespace Http {
* an offset to move the pointer to read the body from the HTTP Request on each callback.
*
*/
size_t uploadedBytes;
int64_t m_uploadedBytes;
/**
* @brief Control field that gets true as soon as there is no more data to read from network. A
@ -232,11 +224,7 @@ namespace Azure { namespace Core { namespace Http {
*/
int64_t m_innerBufferSize;
/**
* @brief Defines the strategy to read a body from an HTTP Response
*
*/
ResponseBodyLengthType m_bodyLengthType;
bool m_isChunkedResponseType;
/**
* @brief This is a copy of the value of an HTTP response header `content-length`. The value is
@ -326,6 +314,7 @@ namespace Azure { namespace Core { namespace Http {
* @return CURL_OK when response is sent successfully.
*/
CURLcode HttpRawSend(Context& context);
CURLcode UploadBody(Context& context);
/**
* @brief This method will use libcurl socket to write all the bytes from buffer.
@ -344,7 +333,7 @@ namespace Azure { namespace Core { namespace Http {
*
* @return CURL_OK when an HTTP response is created.
*/
CURLcode ReadStatusLineAndHeadersFromRawResponse();
void ReadStatusLineAndHeadersFromRawResponse();
/**
* @brief This function is used when working with streams to pull more data from the wire.
@ -367,10 +356,15 @@ namespace Azure { namespace Core { namespace Http {
CurlSession(Request& request) : m_request(request)
{
this->m_pCurl = curl_easy_init();
this->m_bodyStartInBuffer = 0;
this->m_bodyStartInBuffer = -1;
this->m_innerBufferSize = LibcurlReaderSize;
this->m_rawResponseEOF = false;
this->m_isChunkedResponseType = false;
this->m_uploadedBytes = 0;
}
~CurlSession() override { curl_easy_cleanup(this->m_pCurl); }
/**
* @brief Function will use the HTTP request received in constutor to perform a network call
* based on the HTTP request configuration.
@ -388,19 +382,7 @@ namespace Azure { namespace Core { namespace Http {
*/
std::unique_ptr<Azure::Core::Http::Response> GetResponse();
int64_t Length() const override
{
if (this->m_bodyLengthType == ResponseBodyLengthType::Chunked
|| this->m_bodyLengthType == ResponseBodyLengthType::ReadToCloseConnection)
{
return -1;
}
if (this->m_bodyLengthType == ResponseBodyLengthType::Chunked)
{
return 0;
}
return this->m_contentLength;
}
int64_t Length() const override { return this->m_contentLength; }
void Rewind() override {}

View File

@ -315,6 +315,7 @@ namespace Azure { namespace Core { namespace Http {
void AddHeader(std::string const& name, std::string const& value);
// rfc form header-name: OWS header-value OWS
void AddHeader(std::string const& header);
void AddHeader(uint8_t const* const begin, uint8_t const* const last);
void SetBodyStream(std::unique_ptr<BodyStream> stream);
// adding getters for version and stream body. Clang will complain on Mac if we have unused

View File

@ -43,10 +43,10 @@ CURLcode CurlSession::Perform(Context& context)
AZURE_UNREFERENCED_PARAMETER(context);
// Working with Body Buffer. let Libcurl use the classic callback to read/write
auto settingUp = SetUrl();
if (settingUp != CURLE_OK)
auto result = SetUrl();
if (result != CURLE_OK)
{
return settingUp;
return result;
}
// Make sure host is set
@ -60,53 +60,88 @@ CURLcode CurlSession::Perform(Context& context)
}
}
settingUp = SetConnectOnly();
if (settingUp != CURLE_OK)
result = SetConnectOnly();
if (result != CURLE_OK)
{
return settingUp;
return result;
}
// stablish connection only (won't send or receive nothing yet)
settingUp = curl_easy_perform(this->m_pCurl);
if (settingUp != CURLE_OK)
//curl_easy_setopt(this->m_pCurl, CURLOPT_VERBOSE, 1L);
// Set timeout to 24h. Libcurl will fail uploading on windows if timeout is:
// timeout >= 25 days. Fails as soon as trying to upload any data
// 25 days < timeout > 1 days. Fail on huge uploads ( > 1GB)
curl_easy_setopt(this->m_pCurl, CURLOPT_TIMEOUT, 60L * 60L * 24L);
// use expect:100 for PUT requests. Server will decide if it can take our request
if (this->m_request.GetMethod() == HttpMethod::Put) {
this->m_request.AddHeader("expect", "100-continue");
}
// establish connection only (won't send or receive anything yet)
result = curl_easy_perform(this->m_pCurl);
if (result != CURLE_OK)
{
return settingUp;
return result;
}
// Record socket to be used
settingUp = curl_easy_getinfo(this->m_pCurl, CURLINFO_ACTIVESOCKET, &this->m_curlSocket);
if (settingUp != CURLE_OK)
result = curl_easy_getinfo(this->m_pCurl, CURLINFO_ACTIVESOCKET, &this->m_curlSocket);
if (result != CURLE_OK)
{
return settingUp;
return result;
}
// Send request
result = HttpRawSend(context);
if (result != CURLE_OK)
{
return result;
}
ReadStatusLineAndHeadersFromRawResponse();
// Upload body for PUT
if (this->m_request.GetMethod() != HttpMethod::Put)
{
return result;
}
// Send request
settingUp = HttpRawSend(context);
if (settingUp != CURLE_OK)
// Check server response from Expect:100-continue for PUT;
// This help to prevent us from start uploading data when Server can't handle it
if (this->m_response->GetStatusCode() != HttpStatusCode::Continue)
{
return settingUp;
return result; // Won't upload.
}
this->m_rawResponseEOF = false; // Control EOF for response;
return ReadStatusLineAndHeadersFromRawResponse();
// Start upload
result = this->UploadBody(context);
if (result != CURLE_OK) {
return result; // will throw trnasport exception before trying to read
}
ReadStatusLineAndHeadersFromRawResponse();
return result;
}
// Creates an HTTP Response with specific bodyType
static std::unique_ptr<Response> CreateHTTPResponse(std::string const& header)
static std::unique_ptr<Response> CreateHTTPResponse(
uint8_t const* const begin,
uint8_t const* const last)
{
// set response code, http version and reason phrase (i.e. HTTP/1.1 200 OK)
auto start = header.begin() + 5; // HTTP = 4, / = 1, moving to 5th place for version
auto end = std::find(start, header.end(), '.');
auto start = begin + 5; // HTTP = 4, / = 1, moving to 5th place for version
auto end = std::find(start, last, '.');
auto majorVersion = std::stoi(std::string(start, end));
start = end + 1; // start of minor version
end = std::find(start, header.end(), ' ');
end = std::find(start, last, ' ');
auto minorVersion = std::stoi(std::string(start, end));
start = end + 1; // start of status code
end = std::find(start, header.end(), ' ');
end = std::find(start, last, ' ');
auto statusCode = std::stoi(std::string(start, end));
start = end + 1; // start of reason phrase
end = std::find(start, header.end(), '\r');
end = std::find(start, last, '\r');
auto reasonPhrase = std::string(start, end); // remove \r
// allocate the instance of response to heap with shared ptr
@ -116,6 +151,14 @@ static std::unique_ptr<Response> CreateHTTPResponse(std::string const& header)
(uint16_t)majorVersion, (uint16_t)minorVersion, HttpStatusCode(statusCode), reasonPhrase);
}
// Creates an HTTP Response with specific bodyType
static std::unique_ptr<Response> CreateHTTPResponse(std::string const& header)
{
return CreateHTTPResponse(
reinterpret_cast<const uint8_t* const>(header.data()),
reinterpret_cast<const uint8_t* const>(header.data() + header.size()));
}
// To wait for a socket to be ready to be read/write
static int WaitForSocketReady(curl_socket_t sockfd, int for_recv, long timeout_ms)
{
@ -165,78 +208,94 @@ CURLcode CurlSession::SetConnectOnly()
// Send buffer thru the wire
CURLcode CurlSession::SendBuffer(uint8_t const* buffer, size_t bufferSize)
{
if (bufferSize <= 0)
for (size_t sentBytesTotal = 0; sentBytesTotal < bufferSize;)
{
return CURLE_OK;
for (CURLcode sendResult = CURLE_AGAIN; sendResult == CURLE_AGAIN;)
{
size_t sentBytesPerRequest = 0;
sendResult = curl_easy_send(
this->m_pCurl,
buffer + sentBytesTotal,
bufferSize - sentBytesTotal,
&sentBytesPerRequest);
switch (sendResult)
{
case CURLE_OK:
sentBytesTotal += sentBytesPerRequest;
this->m_uploadedBytes += sentBytesPerRequest;
break;
case CURLE_AGAIN:
if (!WaitForSocketReady(this->m_curlSocket, 0, 60000L))
{
// TODO: Change this to something more relevant
throw;
}
break;
default:
return sendResult;
}
};
}
size_t sentBytesTotal = 0;
CURLcode sendResult;
return CURLE_OK;
}
do
CURLcode CurlSession::UploadBody(Context& context) {
// Send body UploadStreamPageSize at a time (libcurl default)
// NOTE: if stream is on top a contiguous memory, we can avoid allocating this copying buffer
auto unique_buffer = std::make_unique<uint8_t[]>(UploadStreamPageSize);
auto streamBody = this->m_request.GetBodyStream();
CURLcode sendResult = CURLE_OK;
// reusing rawRequestLen variable to read
this->m_uploadedBytes = 0;
while (true)
{
size_t sentBytesPerRequest;
do
auto rawRequestLen = streamBody->Read(context, unique_buffer.get(), UploadStreamPageSize);
if (rawRequestLen == 0)
{
sentBytesPerRequest = 0;
auto sendFrom = buffer + sentBytesTotal;
auto remainingBytes = bufferSize - sentBytesTotal;
sendResult = curl_easy_send(this->m_pCurl, sendFrom, remainingBytes, &sentBytesPerRequest);
sentBytesTotal += sentBytesPerRequest;
if (sendResult == CURLE_AGAIN && !WaitForSocketReady(this->m_curlSocket, 0, 60000L))
{
throw;
}
} while (sendResult == CURLE_AGAIN);
break;
}
sendResult = SendBuffer(unique_buffer.get(), static_cast<size_t>(rawRequestLen));
if (sendResult != CURLE_OK)
{
return sendResult;
}
} while (sentBytesTotal < bufferSize);
return CURLE_OK;
}
return sendResult;
}
// custom sending to wire an http request
CURLcode CurlSession::HttpRawSend(Context& context)
{
// something like GET /path HTTP1.0 \r\nheaders\r\n
auto rawRequest = this->m_request.GetHTTPMessagePreBody();
int64_t rawRequestLen = rawRequest.size();
CURLcode sendResult = SendBuffer(
reinterpret_cast<uint8_t const*>(rawRequest.data()), static_cast<size_t>(rawRequestLen));
if (sendResult != CURLE_OK || this->m_request.GetMethod() == HttpMethod::Put)
{
return sendResult;
}
auto streamBody = this->m_request.GetBodyStream();
if (streamBody->Length() == 0)
{
// Finish request with no body
uint8_t const endOfRequest[] = "0";
return SendBuffer(endOfRequest, 1); // need one more byte to end request
// Finish request with no body (Head or PUT (expect:100;)
uint8_t const endOfRequest = 0;
return SendBuffer(&endOfRequest, 1); // need one more byte to end request
}
// Send body 64k at a time (libcurl default)
// NOTE: if stream is on top a contiguous memory, we can avoid allocating this copying buffer
std::unique_ptr<uint8_t[]> unique_buffer(new uint8_t[UploadStreamPageSize]);
auto buffer = unique_buffer.get();
while (rawRequestLen > 0)
{
rawRequestLen = streamBody->Read(context, buffer, UploadStreamPageSize);
sendResult = SendBuffer(buffer, static_cast<size_t>(rawRequestLen));
}
return sendResult;
return this->UploadBody(context);
}
// Read status line plus headers to create a response with no body
CURLcode CurlSession::ReadStatusLineAndHeadersFromRawResponse()
void CurlSession::ReadStatusLineAndHeadersFromRawResponse()
{
auto parser = ResponseBufferParser();
auto bufferSize = int64_t();
// Select a default reading strategy. // No content-length or Transfer-Encoding
this->m_bodyLengthType = ResponseBodyLengthType::ReadToCloseConnection;
// Keep reading until all headers were read
while (!parser.IsParseCompleted())
@ -245,12 +304,12 @@ CURLcode CurlSession::ReadStatusLineAndHeadersFromRawResponse()
// If response is smaller than buffer, we will get back the size of the response
bufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize);
// parse from buffer to create response
// returns the number of bytes parsed up to the body Start
auto bytesParsed = parser.Parse(this->m_readBuffer, static_cast<size_t>(bufferSize));
// if end of headers is reach before the end of response, that's where body start
if (bytesParsed + 2 < bufferSize)
if (bytesParsed < bufferSize)
{
this->m_bodyStartInBuffer = bytesParsed + 1; // Set the start of body (skip \r)
this->m_bodyStartInBuffer = bytesParsed; // Body Start
}
}
@ -258,27 +317,27 @@ CURLcode CurlSession::ReadStatusLineAndHeadersFromRawResponse()
this->m_innerBufferSize = static_cast<size_t>(bufferSize);
// For Head request, set the length of body response to 0.
// Response will give us content-length as if we were not doing Head saying what would it be the
// length of the body. However, Server won't send body
if (this->m_request.GetMethod() == HttpMethod::Head)
{
this->m_bodyLengthType = ResponseBodyLengthType::NoBody;
return CURLE_OK;
this->m_contentLength = 0;
this->m_rawResponseEOF = true;
return;
}
// TODO: tolower ContentLength
// headers are already loweCase at this point
auto headers = this->m_response->GetHeaders();
auto isContentLengthHeaderInResponse = headers.find("Content-Length");
if (isContentLengthHeaderInResponse != headers.end())
{
// Response with Content-Length
auto bodySize = std::stoull(headers.at("Content-Length").data());
// content-length is used later by session and session won't have access to the response any
// more (unique_ptr), so we save this value
this->m_contentLength = bodySize;
this->m_bodyLengthType = ResponseBodyLengthType::ContentLength;
return CURLE_OK;
this->m_contentLength
= static_cast<int64_t>(std::stoull(isContentLengthHeaderInResponse->second.data()));
return;
}
this->m_contentLength = -1;
auto isTransferEncodingHeaderInResponse = headers.find("Transfer-Encoding");
if (isTransferEncodingHeaderInResponse != headers.end())
{
@ -289,11 +348,42 @@ CURLcode CurlSession::ReadStatusLineAndHeadersFromRawResponse()
{
// set curl session to know response is chunked
// This will be used to remove chunked info while reading
this->m_bodyLengthType = ResponseBodyLengthType::Chunked;
return CURLE_OK;
this->m_isChunkedResponseType = true;
// Need to move body start after chunk size
if (this->m_bodyStartInBuffer == -1)
{ // if nothing on inner buffer, pull from wire
this->m_innerBufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize);
this->m_bodyStartInBuffer = 0;
}
for (bool keepPolling = true; keepPolling;)
{
for (int64_t index = this->m_bodyStartInBuffer; index < this->m_innerBufferSize; index++)
{
if (index > 0 && this->m_readBuffer[index] == '\n')
{
if (index + 1 == bufferSize)
{ // on last index. Whatever we read is the BodyStart here
this->m_innerBufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize);
this->m_bodyStartInBuffer = 0;
}
else
{ // not at the end, buffer like [999 \r\nBody...]
this->m_bodyStartInBuffer = index + 1;
}
keepPolling = false;
break;
}
}
if (keepPolling)
{ // Read all internal buffer and \n was not found, pull from wire
this->m_innerBufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize);
}
}
return;
}
}
/*
https://tools.ietf.org/html/rfc7230#section-3.3.3
7. Otherwise, this is a response message without a declared message
@ -301,111 +391,67 @@ CURLcode CurlSession::ReadStatusLineAndHeadersFromRawResponse()
number of octets received prior to the server closing the
connection.
*/
// Use unknown size CurlBodyStream. CurlSession will use the ResponseBodyLengthType to select a
// reading strategy
this->m_bodyLengthType = ResponseBodyLengthType::ReadToCloseConnection;
return CURLE_OK;
}
// Read from curl session
int64_t CurlSession::Read(Azure::Core::Context& context, uint8_t* buffer, int64_t count)
{
context.ThrowIfCanceled();
auto totalRead = int64_t();
// Take data from inner buffer if any
if (this->m_bodyStartInBuffer > 0)
if (count <= 0)
{
if (this->m_readBuffer[this->m_bodyStartInBuffer] == '\n' && this->m_sessionTotalRead == 0)
{
// first read. Need to move to next position
if (this->m_bodyLengthType == ResponseBodyLengthType::Chunked)
{
// For chunked, first advance until next `\r` after chunked size (\nsomeNumber\r\n)
auto nextPosition = std::find(
this->m_readBuffer + this->m_bodyStartInBuffer,
this->m_readBuffer + this->m_innerBufferSize,
'\r');
if (nextPosition != this->m_readBuffer + this->m_innerBufferSize)
{
// Found possition of next '\r', +1 jumps the \r
this->m_bodyStartInBuffer
+= std::distance(this->m_readBuffer + this->m_bodyStartInBuffer, nextPosition) + 1;
// Check if the end of body is also at inner buffer
auto endOfChunk = std::find(
this->m_readBuffer + this->m_bodyStartInBuffer,
this->m_readBuffer + this->m_innerBufferSize,
'\r');
if (endOfChunk != this->m_readBuffer + this->m_innerBufferSize)
{
this->m_innerBufferSize
-= std::distance(endOfChunk, this->m_readBuffer + this->m_innerBufferSize);
}
} // TODO: else read from wire until next \r
}
this->m_bodyStartInBuffer += 1;
}
if (this->m_bodyStartInBuffer < this->m_innerBufferSize)
{
// still have data to take from innerbuffer
MemoryBodyStream innerBufferMemoryStream(
this->m_readBuffer + this->m_bodyStartInBuffer,
this->m_innerBufferSize - this->m_bodyStartInBuffer);
totalRead = innerBufferMemoryStream.Read(context, buffer, count);
this->m_bodyStartInBuffer += totalRead;
this->m_sessionTotalRead += totalRead;
if (this->m_bodyStartInBuffer == this->m_innerBufferSize)
{
this->m_bodyStartInBuffer = 0; // read everyting from inner buffer already
}
return totalRead;
}
// After moving the reading start we reached the end
this->m_bodyStartInBuffer = 0;
}
// if the last position in inner buffer is `\r` it means the next
// thing we read from wire is `\n`. (usually this is when reading 1byte per time from wire)
if (this->m_readBuffer[this->m_innerBufferSize - 1] == '\r')
{
// Read one possition from socket on same user buffer, We wil override the value after this
ReadSocketToBuffer(buffer, 1);
}
if (this->m_bodyLengthType == ResponseBodyLengthType::ContentLength
&& this->m_sessionTotalRead == this->m_contentLength)
{
// Read everything already
curl_easy_cleanup(this->m_pCurl);
// LimitStream would try to read 0 bytes
return 0;
}
// Read from socket
auto totalRead = int64_t();
// Take data from inner buffer if any
if (this->m_bodyStartInBuffer >= 0)
{
// still have data to take from innerbuffer
MemoryBodyStream innerBufferMemoryStream(
this->m_readBuffer + this->m_bodyStartInBuffer,
this->m_innerBufferSize - this->m_bodyStartInBuffer);
totalRead = innerBufferMemoryStream.Read(context, buffer, count);
this->m_bodyStartInBuffer += totalRead;
this->m_sessionTotalRead += totalRead;
if (this->m_bodyStartInBuffer == this->m_innerBufferSize)
{
this->m_bodyStartInBuffer = -1; // read everyting from inner buffer already
}
return totalRead;
}
// Head request have contentLength = 0, so we won't read more, just return 0
// Also if we have already read all contentLength
if (this->m_sessionTotalRead == this->m_contentLength || this->m_rawResponseEOF)
{
// Read everything already
return 0;
}
// Read from socket when no more data on internal buffer
totalRead = ReadSocketToBuffer(buffer, static_cast<size_t>(count));
this->m_sessionTotalRead += totalRead;
if (this->m_bodyLengthType == ResponseBodyLengthType::Chunked && totalRead > 0)
if (this->m_isChunkedResponseType && totalRead > 0)
{
// Check if the end of chunked is part of the body
auto endOfBody = std::find(buffer, buffer + totalRead, '\r');
if (endOfBody != buffer + totalRead)
{
// End of stream is when chunk is 0 (0\r\n)
if (buffer[0] == '0' && buffer + 1 == endOfBody)
{
// got already the end
curl_easy_cleanup(this->m_pCurl);
// got already the end. Usually when all body was at innerBuffer and next pull from wire
// returns only the end of chunk
return 0;
}
// Read all remaining to close connection
{
constexpr int64_t finalRead = 50; // usually only 5 more bytes are gotten "0\r\n\r\n"
uint8_t b[finalRead];
ReadSocketToBuffer(b, finalRead);
curl_easy_cleanup(this->m_pCurl);
}
totalRead -= std::distance(endOfBody, buffer + totalRead);
this->m_rawResponseEOF = true;
}
}
return totalRead;
@ -414,23 +460,28 @@ int64_t CurlSession::Read(Azure::Core::Context& context, uint8_t* buffer, int64_
// Read from socket and return the number of bytes taken from socket
int64_t CurlSession::ReadSocketToBuffer(uint8_t* buffer, int64_t bufferSize)
{
CURLcode readResult;
// loop until read result is not CURLE_AGAIN
size_t readBytes = 0;
do // try to read from socket until response is OK
for (CURLcode readResult = CURLE_AGAIN; readResult == CURLE_AGAIN;)
{
readResult = curl_easy_recv(this->m_pCurl, buffer, static_cast<size_t>(bufferSize), &readBytes);
if (readResult == CURLE_AGAIN)
{
readResult = CURLE_AGAIN;
}
// socket not ready. Wait or fail on timeout
if (readResult == CURLE_AGAIN && !WaitForSocketReady(this->m_curlSocket, 1, 60000L))
{
throw;
}
} while (readResult == CURLE_AGAIN); // Keep trying to read until result is not CURLE_AGAIN
switch (readResult)
{
case CURLE_AGAIN:
if (!WaitForSocketReady(this->m_curlSocket, 0, 60000L))
{
// TODO: Change this to somehing more relevant
throw;
}
break;
case CURLE_OK:
break;
default:
// Error code while reading from socket
return -1;
}
}
return readBytes;
}
@ -448,32 +499,113 @@ int64_t CurlSession::ResponseBufferParser::Parse(
return 0;
}
switch (this->state)
// Read all buffer until \r\n is found
int64_t start = 0, index = 0;
for (; index < bufferSize; index++)
{
case ResponseParserState::StatusLine: {
auto parsedBytes = BuildStatusCode(buffer, bufferSize);
if (parsedBytes < bufferSize) // status code is built and buffer can be still parsed
{
// Can keep parsing, Control have moved to headers
return parsedBytes + Parse(buffer + parsedBytes, bufferSize - parsedBytes);
}
return parsedBytes;
if (buffer[index] == '\r')
{
this->m_delimiterStartInPrevPosition = true;
continue;
}
case ResponseParserState::Headers: {
auto parsedBytes = BuildHeader(buffer, bufferSize);
if (!this->m_parseCompleted
&& parsedBytes < bufferSize) // status code is built and buffer can be still parsed
if (buffer[index] == '\n' && this->m_delimiterStartInPrevPosition)
{
// found end of delimiter
if (this->m_internalBuffer.size() > 0) // Check internal buffer
{
// Can keep parsing, Control have moved to headers
return parsedBytes + Parse(buffer + parsedBytes, bufferSize - parsedBytes);
// At this point, we are reading to append more to internal buffer.
// Only append more if index is greater than 1, meaning not when buffer is [\r\nxxx]
// only on buffer like [xxx\r\n yyyy], append xxx
if (index > 1)
{
// Previously appended something
this->m_internalBuffer.append(buffer + start, buffer + index - 1); // minus 1 to remove \r
}
if (this->state == ResponseParserState::StatusLine)
{
// Create Response
this->m_response = CreateHTTPResponse(this->m_internalBuffer);
// Set state to headers
this->state = ResponseParserState::Headers;
this->m_delimiterStartInPrevPosition = false;
start = index + 1; // jump \n
}
else if (this->state == ResponseParserState::Headers)
{
// Add header. TODO: Do toLower so all headers are lowerCase
this->m_response->AddHeader(this->m_internalBuffer);
this->m_delimiterStartInPrevPosition = false;
start = index + 1; // jump \n
}
else
{
// Should never happen that parser is not statusLIne or Headers and we still try to parse
// more.
throw;
}
// clean internal buffer
this->m_internalBuffer.clear();
}
else
{
// Nothing at internal buffer. Add directly from internal buffer
if (this->state == ResponseParserState::StatusLine)
{
// Create Response
this->m_response = CreateHTTPResponse(buffer + start, buffer + index - 1);
// Set state to headers
this->state = ResponseParserState::Headers;
this->m_delimiterStartInPrevPosition = false;
start = index + 1; // jump \n
}
else if (this->state == ResponseParserState::Headers)
{
// Check if this is end of headers delimiter
// 1) internal buffer is empty and \n is the first char on buffer [\nBody...]
// 2) index == start + 1. No header data after last \r\n [header\r\n\r\n]
if (index == 0 || index == start + 1)
{
this->m_parseCompleted = true;
return index + 1; // plus 1 to advance the \n. If we were at buffer end.
}
// Add header. TODO: Do toLower so all headers are lowerCase
this->m_response->AddHeader(buffer + start, buffer + index - 1);
this->m_delimiterStartInPrevPosition = false;
start = index + 1; // jump \n
}
else
{
// Should never happen that parser is not statusLIne or Headers and we still try to parse
// more.
throw;
}
}
return parsedBytes;
}
case ResponseParserState::EndOfHeaders:
default: {
return 0;
else
{
if (index == 0 && this->m_internalBuffer.size() > 0 && this->m_delimiterStartInPrevPosition)
{
// unlikely. But this means a case with buffers like [xx\r], [xxxx]
// \r is not delimiter and in previous loop it was omitted, so adding it now
this->m_internalBuffer.append("\r");
}
// \r in the response without \n after it. keep parsing
this->m_delimiterStartInPrevPosition = false;
}
}
if (start < bufferSize)
{
// didn't find the end of delimiter yet, save at internal buffer
// If this->m_delimiterStartInPrevPosition is true, buffer ends in \r [xxxx\r]
// Don't add \r. IF next char is not \n, we will append \r then on next loop
this->m_internalBuffer.append(
buffer + start, buffer + bufferSize - (this->m_delimiterStartInPrevPosition ? 1 : 0));
}
return index;
}
// Finds delimiter '\r' as the end of the

View File

@ -1,6 +1,8 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
#include <azure.hpp>
#include <cctype>
#include <http/http.hpp>
#include <cctype>
@ -16,35 +18,42 @@ std::string const& Response::GetReasonPhrase() { return m_reasonPhrase; }
std::map<std::string, std::string> const& Response::GetHeaders() const { return this->m_headers; }
void Response::AddHeader(std::string const& header)
void Response::AddHeader(uint8_t const* const begin, uint8_t const* const last)
{
// get name and value from header
auto start = header.begin();
auto end = std::find(start, header.end(), ':');
auto start = begin;
auto end = std::find(start, last, ':');
if (end == header.end())
if (end == last)
{
return; // not a valid header or end of headers symbol reached
}
// Always toLower() headers
// auto headerName = Azure::Core::Details::ToLower(std::string(start, end));
auto headerName = std::string(start, end);
start = end + 1; // start value
while (start < header.end() && (*start == ' ' || *start == '\t'))
while (start < last && (*start == ' ' || *start == '\t'))
{
++start;
}
end = std::find(start, header.end(), '\r');
end = std::find(start, last, '\r');
auto headerValue = std::string(start, end); // remove \r
AddHeader(headerName, headerValue);
}
void Response::AddHeader(std::string const& header)
{
return AddHeader(
reinterpret_cast<uint8_t const* const>(header.data()),
reinterpret_cast<uint8_t const* const>(header.data() + header.size()));
}
void Response::AddHeader(std::string const& name, std::string const& value)
{
// TODO: make sure the Content-Length header is insterted as "Content-Length" no mather the case
// We currently assume we receive it like it and expected to be there from all HTTP
// Responses.
this->m_headers.insert(std::pair<std::string, std::string>(name, value));
}

View File

@ -65,6 +65,20 @@ const unsigned char c_LocaleInvariantUppercaseTable[256] = {
namespace Azure { namespace Core { namespace Details {
std::string const ToLower(const std::string& src) noexcept
{
auto result = std::string(src);
for (auto i = result.begin(); i < result.end(); i++)
{
*i = c_LocaleInvariantUppercaseTable[static_cast<unsigned char>(*i)];
if (*i >= 'A' && *i <= 'Z')
{
*i += 32;
}
}
return result;
}
bool LocaleInvariantCaseInsensitiveEqual(const std::string& lhs, const std::string& rhs) noexcept
{
return std::equal(

View File

@ -6,6 +6,8 @@ if (BUILD_CURL_TRANSPORT)
cmake_minimum_required (VERSION 3.12)
set(TARGET_NAME "azure_core_with_curl")
set(TARGET_NAME_STREAM "azure_core_with_curl_stream")
set(TARGET_NAME_STORAGE_ISSUE_249 "azure_core_storage_issue_249")
set(TARGET_NAME_STORAGE_ISSUE_248 "azure_core_storage_issue_248")
project(${TARGET_NAME} LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 14)
@ -13,15 +15,27 @@ set(CMAKE_CXX_STANDARD_REQUIRED True)
add_executable (
${TARGET_NAME}
src/azure_core_with_curl_bodyBuffer
azure_core_with_curl_bodyBuffer
)
add_executable (
${TARGET_NAME_STREAM}
src/azure_core_with_curl_bodyStream
azure_core_with_curl_bodyStream
)
add_executable (
${TARGET_NAME_STORAGE_ISSUE_249}
azure_core_storage_test_sample
)
add_executable (
${TARGET_NAME_STORAGE_ISSUE_248}
azure_core_storage_list_containers_sample
)
target_link_libraries(${TARGET_NAME} PRIVATE azure-core)
target_link_libraries(${TARGET_NAME_STREAM} PRIVATE azure-core)
target_link_libraries(${TARGET_NAME_STORAGE_ISSUE_249} PRIVATE azure-core)
target_link_libraries(${TARGET_NAME_STORAGE_ISSUE_248} PRIVATE azure-core)
endif()

View File

@ -0,0 +1,43 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
#include "http/pipeline.hpp"
#include <array>
#include <http/curl/curl.hpp>
#include <http/http.hpp>
#include <iostream>
#include <memory>
#include <vector>
using namespace Azure::Core;
using namespace Azure::Core::Http;
using namespace std;
int main()
{
// Create the Transport
std::shared_ptr<HttpTransport> transport = std::make_unique<CurlTransport>();
std::vector<std::unique_ptr<HttpPolicy>> policies;
// Add the transport policy
policies.push_back(std::make_unique<TransportPolicy>(std::move(transport)));
auto httpPipeline = Http::HttpPipeline(policies);
auto context = Context();
string host("http://anglesharp.azurewebsites.net/Chunked");
auto request = Http::Request(Http::HttpMethod::Get, host);
auto response = httpPipeline.Send(context, request);
auto response_bodystream = response->GetBodyStream();
auto response_body = BodyStream::ReadToEnd(context, *response_bodystream);
cout << response_body.data() << endl;
return 0;
}

View File

@ -0,0 +1,56 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
#ifdef _MSC_VER
#define _CRT_SECURE_NO_WARNINGS
#endif
#include "http/pipeline.hpp"
#include <array>
#include <http/curl/curl.hpp>
#include <http/http.hpp>
#include <iostream>
#include <memory>
#include <vector>
using namespace Azure::Core;
using namespace Azure::Core::Http;
using namespace std;
int main()
{
// Create the Transport
std::shared_ptr<HttpTransport> transport = std::make_unique<CurlTransport>();
std::vector<std::unique_ptr<HttpPolicy>> policies;
// Add the transport policy
policies.push_back(std::make_unique<TransportPolicy>(std::move(transport)));
auto httpPipeline = Http::HttpPipeline(policies);
auto context = Context();
// STORAGE_BLOB_WITH_SAS = like
// "https://account.windows.net/azure/container/blob?sv=...&ss=...&..."
string host(std::getenv("STORAGE_BLOB_WITH_SAS"));
std::vector<uint8_t> request_bodydata(500 * 1024 * 1024, '1');
cout << request_bodydata.size() << endl;
MemoryBodyStream requestBodyStream(request_bodydata.data(), request_bodydata.size());
auto request = Http::Request(Http::HttpMethod::Put, host, &requestBodyStream);
request.AddHeader("Content-Length", std::to_string(request_bodydata.size()));
request.AddHeader("x-ms-version", "2019-07-07");
request.AddHeader("x-ms-blob-type", "BlockBlob");
auto response = httpPipeline.Send(context, request);
auto bodyS = response->GetBodyStream();
auto body = BodyStream::ReadToEnd(context, *bodyS);
cout << body.data() << endl;
return 0;
}

View File

@ -54,10 +54,10 @@ int main()
auto context = Context();
// Both requests uses a body buffer to be uploaded that would produce responses with bodyBuffer
doHeadRequest(context, httpPipeline);
doFileRequest(context, httpPipeline);
doGetRequest(context, httpPipeline);
doPutRequest(context, httpPipeline);
doHeadRequest(context, httpPipeline);
doDeleteRequest(context, httpPipeline);
doPatchRequest(context, httpPipeline);
}

View File

@ -98,8 +98,8 @@ void doGetRequest(Context context, HttpPipeline& pipeline)
request.AddHeader("Host", "httpbin.org");
request.AddQueryParameter("dinamicArg", "3");
request.AddQueryParameter("dinamicArg2", "4");
request.AddQueryParameter("dynamicArg", "3");
request.AddQueryParameter("dynamicArg2", "4");
auto response = pipeline.Send(context, request);
printStream(context, std::move(response));
@ -155,9 +155,9 @@ void doPutStreamRequest(Context context, HttpPipeline& pipeline)
request.AddHeader("Content-Length", std::to_string(StreamSize));
request.AddQueryParameter("dinamicArg", "1");
request.AddQueryParameter("dinamicArg2", "1");
request.AddQueryParameter("dinamicArg3", "1");
request.AddQueryParameter("dynamicArg", "1");
request.AddQueryParameter("dynamicArg2", "1");
request.AddQueryParameter("dynamicArg3", "1");
printStream(context, std::move(pipeline.Send(context, request)));
}