From c699888daaef75ae89847cfc3b829f9639e49cd6 Mon Sep 17 00:00:00 2001 From: Victor Vazquez Date: Fri, 10 Jul 2020 23:02:18 -0700 Subject: [PATCH] Make sure Read will always start on body start. (#256) * make stream use int64_t instead of uint64_t --- CMakeLists.txt | 6 +- sdk/core/azure-core/inc/azure.hpp | 7 +- sdk/core/azure-core/inc/http/curl/curl.hpp | 50 +- sdk/core/azure-core/inc/http/http.hpp | 1 + sdk/core/azure-core/src/http/curl/curl.cpp | 524 +++++++++++------- sdk/core/azure-core/src/http/response.cpp | 27 +- sdk/core/azure-core/src/strings.cpp | 14 + .../azure-core/test/e2e}/CMakeLists.txt | 18 +- ...re_core_storage_list_containers_sample.cpp | 43 ++ .../e2e/azure_core_storage_test_sample.cpp | 56 ++ .../e2e}/azure_core_with_curl_bodyBuffer.cpp | 2 +- .../e2e}/azure_core_with_curl_bodyStream.cpp | 10 +- .../azure-core/test/{ => ut}/CMakeLists.txt | 0 sdk/core/azure-core/test/{ => ut}/http.cpp | 0 sdk/core/azure-core/test/{ => ut}/main.cpp | 0 .../azure-core/test/{ => ut}/nullable.cpp | 0 sdk/core/azure-core/test/{ => ut}/string.cpp | 0 17 files changed, 506 insertions(+), 252 deletions(-) rename sdk/{samples/http_client/curl => core/azure-core/test/e2e}/CMakeLists.txt (50%) create mode 100644 sdk/core/azure-core/test/e2e/azure_core_storage_list_containers_sample.cpp create mode 100644 sdk/core/azure-core/test/e2e/azure_core_storage_test_sample.cpp rename sdk/{samples/http_client/curl/src => core/azure-core/test/e2e}/azure_core_with_curl_bodyBuffer.cpp (100%) rename sdk/{samples/http_client/curl/src => core/azure-core/test/e2e}/azure_core_with_curl_bodyStream.cpp (95%) rename sdk/core/azure-core/test/{ => ut}/CMakeLists.txt (100%) rename sdk/core/azure-core/test/{ => ut}/http.cpp (100%) rename sdk/core/azure-core/test/{ => ut}/main.cpp (100%) rename sdk/core/azure-core/test/{ => ut}/nullable.cpp (100%) rename sdk/core/azure-core/test/{ => ut}/string.cpp (100%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 41149eb25..4fa1e215b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,7 +34,7 @@ if(BUILD_TESTING) # tests include(AddGoogleTest) enable_testing () - add_subdirectory(sdk/core/azure-core/test) + add_subdirectory(sdk/core/azure-core/test/ut) endif() # compiler warning flags globally @@ -45,5 +45,7 @@ include(${CMAKE_SOURCE_DIR}/cmake-modules/doxygen_common.cmake) # sub-projects add_subdirectory(sdk/core/azure-core) -add_subdirectory(sdk/samples/http_client/curl) # will work only if BUILD_CURL_TRANSPORT=ON +if(BUILD_CURL_TRANSPORT) + add_subdirectory(sdk/core/azure-core/test/e2e) # will work only if BUILD_CURL_TRANSPORT=ON +endif() add_subdirectory(sdk/storage) diff --git a/sdk/core/azure-core/inc/azure.hpp b/sdk/core/azure-core/inc/azure.hpp index df9af305f..18779bfd5 100644 --- a/sdk/core/azure-core/inc/azure.hpp +++ b/sdk/core/azure-core/inc/azure.hpp @@ -3,13 +3,14 @@ #pragma once -#include #include +#include -#define AZURE_UNREFERENCED_PARAMETER(x) ((void) (x)); +#define AZURE_UNREFERENCED_PARAMETER(x) ((void)(x)); namespace Azure { namespace Core { namespace Details { -bool LocaleInvariantCaseInsensitiveEqual(const std::string& lhs, const std::string& rhs) noexcept; + bool LocaleInvariantCaseInsensitiveEqual(const std::string& lhs, const std::string& rhs) noexcept; + std::string const ToLower(const std::string& src) noexcept; }}} // namespace Azure::Core::Details diff --git a/sdk/core/azure-core/inc/http/curl/curl.hpp b/sdk/core/azure-core/inc/http/curl/curl.hpp index fa0386dcb..fdd884fa7 100644 --- a/sdk/core/azure-core/inc/http/curl/curl.hpp +++ b/sdk/core/azure-core/inc/http/curl/curl.hpp @@ -14,7 +14,8 @@ namespace Azure { namespace Core { namespace Http { - constexpr auto UploadStreamPageSize = 1024; + // libcurl CURL_MAX_WRITE_SIZE is 16k. + constexpr auto UploadStreamPageSize = 1024 * 64; constexpr auto LibcurlReaderSize = 1024; /** @@ -43,18 +44,6 @@ namespace Azure { namespace Core { namespace Http { EndOfHeaders, }; - /** - * @brief Defines the strategy to read the body from an HTTP Response - * - */ - enum class ResponseBodyLengthType - { - ContentLength, - Chunked, - ReadToCloseConnection, - NoBody, - }; - /** * @brief stateful component used to read and parse a buffer to construct a valid HTTP Response. * @@ -87,6 +76,8 @@ namespace Azure { namespace Core { namespace Http { */ bool m_parseCompleted; + bool m_delimiterStartInPrevPosition; + /** * @brief This buffer is used when the parsed buffer doesn't contain a completed token. The * content from the buffer will be appended to this buffer. Once that a delimiter is found, @@ -135,6 +126,7 @@ namespace Azure { namespace Core { namespace Http { { state = ResponseParserState::StatusLine; this->m_parseCompleted = false; + this->m_delimiterStartInPrevPosition = false; } // Parse contents of buffer to construct HttpResponse. Returns the index of the last parsed @@ -207,7 +199,7 @@ namespace Azure { namespace Core { namespace Http { * an offset to move the pointer to read the body from the HTTP Request on each callback. * */ - size_t uploadedBytes; + int64_t m_uploadedBytes; /** * @brief Control field that gets true as soon as there is no more data to read from network. A @@ -232,11 +224,7 @@ namespace Azure { namespace Core { namespace Http { */ int64_t m_innerBufferSize; - /** - * @brief Defines the strategy to read a body from an HTTP Response - * - */ - ResponseBodyLengthType m_bodyLengthType; + bool m_isChunkedResponseType; /** * @brief This is a copy of the value of an HTTP response header `content-length`. The value is @@ -326,6 +314,7 @@ namespace Azure { namespace Core { namespace Http { * @return CURL_OK when response is sent successfully. */ CURLcode HttpRawSend(Context& context); + CURLcode UploadBody(Context& context); /** * @brief This method will use libcurl socket to write all the bytes from buffer. @@ -344,7 +333,7 @@ namespace Azure { namespace Core { namespace Http { * * @return CURL_OK when an HTTP response is created. */ - CURLcode ReadStatusLineAndHeadersFromRawResponse(); + void ReadStatusLineAndHeadersFromRawResponse(); /** * @brief This function is used when working with streams to pull more data from the wire. @@ -367,10 +356,15 @@ namespace Azure { namespace Core { namespace Http { CurlSession(Request& request) : m_request(request) { this->m_pCurl = curl_easy_init(); - this->m_bodyStartInBuffer = 0; + this->m_bodyStartInBuffer = -1; this->m_innerBufferSize = LibcurlReaderSize; + this->m_rawResponseEOF = false; + this->m_isChunkedResponseType = false; + this->m_uploadedBytes = 0; } + ~CurlSession() override { curl_easy_cleanup(this->m_pCurl); } + /** * @brief Function will use the HTTP request received in constutor to perform a network call * based on the HTTP request configuration. @@ -388,19 +382,7 @@ namespace Azure { namespace Core { namespace Http { */ std::unique_ptr GetResponse(); - int64_t Length() const override - { - if (this->m_bodyLengthType == ResponseBodyLengthType::Chunked - || this->m_bodyLengthType == ResponseBodyLengthType::ReadToCloseConnection) - { - return -1; - } - if (this->m_bodyLengthType == ResponseBodyLengthType::Chunked) - { - return 0; - } - return this->m_contentLength; - } + int64_t Length() const override { return this->m_contentLength; } void Rewind() override {} diff --git a/sdk/core/azure-core/inc/http/http.hpp b/sdk/core/azure-core/inc/http/http.hpp index 44f65e45a..ed92e8269 100644 --- a/sdk/core/azure-core/inc/http/http.hpp +++ b/sdk/core/azure-core/inc/http/http.hpp @@ -315,6 +315,7 @@ namespace Azure { namespace Core { namespace Http { void AddHeader(std::string const& name, std::string const& value); // rfc form header-name: OWS header-value OWS void AddHeader(std::string const& header); + void AddHeader(uint8_t const* const begin, uint8_t const* const last); void SetBodyStream(std::unique_ptr stream); // adding getters for version and stream body. Clang will complain on Mac if we have unused diff --git a/sdk/core/azure-core/src/http/curl/curl.cpp b/sdk/core/azure-core/src/http/curl/curl.cpp index fc3544b51..2db962bf1 100644 --- a/sdk/core/azure-core/src/http/curl/curl.cpp +++ b/sdk/core/azure-core/src/http/curl/curl.cpp @@ -43,10 +43,10 @@ CURLcode CurlSession::Perform(Context& context) AZURE_UNREFERENCED_PARAMETER(context); // Working with Body Buffer. let Libcurl use the classic callback to read/write - auto settingUp = SetUrl(); - if (settingUp != CURLE_OK) + auto result = SetUrl(); + if (result != CURLE_OK) { - return settingUp; + return result; } // Make sure host is set @@ -60,53 +60,88 @@ CURLcode CurlSession::Perform(Context& context) } } - settingUp = SetConnectOnly(); - if (settingUp != CURLE_OK) + result = SetConnectOnly(); + if (result != CURLE_OK) { - return settingUp; + return result; } - // stablish connection only (won't send or receive nothing yet) - settingUp = curl_easy_perform(this->m_pCurl); - if (settingUp != CURLE_OK) + //curl_easy_setopt(this->m_pCurl, CURLOPT_VERBOSE, 1L); + // Set timeout to 24h. Libcurl will fail uploading on windows if timeout is: + // timeout >= 25 days. Fails as soon as trying to upload any data + // 25 days < timeout > 1 days. Fail on huge uploads ( > 1GB) + curl_easy_setopt(this->m_pCurl, CURLOPT_TIMEOUT, 60L * 60L * 24L); + + // use expect:100 for PUT requests. Server will decide if it can take our request + if (this->m_request.GetMethod() == HttpMethod::Put) { + this->m_request.AddHeader("expect", "100-continue"); + } + + + // establish connection only (won't send or receive anything yet) + result = curl_easy_perform(this->m_pCurl); + if (result != CURLE_OK) { - return settingUp; + return result; } // Record socket to be used - settingUp = curl_easy_getinfo(this->m_pCurl, CURLINFO_ACTIVESOCKET, &this->m_curlSocket); - if (settingUp != CURLE_OK) + result = curl_easy_getinfo(this->m_pCurl, CURLINFO_ACTIVESOCKET, &this->m_curlSocket); + if (result != CURLE_OK) { - return settingUp; + return result; + } + + // Send request + result = HttpRawSend(context); + if (result != CURLE_OK) + { + return result; + } + + ReadStatusLineAndHeadersFromRawResponse(); + + // Upload body for PUT + if (this->m_request.GetMethod() != HttpMethod::Put) + { + return result; } - // Send request - settingUp = HttpRawSend(context); - if (settingUp != CURLE_OK) + // Check server response from Expect:100-continue for PUT; + // This help to prevent us from start uploading data when Server can't handle it + if (this->m_response->GetStatusCode() != HttpStatusCode::Continue) { - return settingUp; + return result; // Won't upload. } - this->m_rawResponseEOF = false; // Control EOF for response; - return ReadStatusLineAndHeadersFromRawResponse(); + + // Start upload + result = this->UploadBody(context); + if (result != CURLE_OK) { + return result; // will throw trnasport exception before trying to read + } + ReadStatusLineAndHeadersFromRawResponse(); + return result; } // Creates an HTTP Response with specific bodyType -static std::unique_ptr CreateHTTPResponse(std::string const& header) +static std::unique_ptr CreateHTTPResponse( + uint8_t const* const begin, + uint8_t const* const last) { // set response code, http version and reason phrase (i.e. HTTP/1.1 200 OK) - auto start = header.begin() + 5; // HTTP = 4, / = 1, moving to 5th place for version - auto end = std::find(start, header.end(), '.'); + auto start = begin + 5; // HTTP = 4, / = 1, moving to 5th place for version + auto end = std::find(start, last, '.'); auto majorVersion = std::stoi(std::string(start, end)); start = end + 1; // start of minor version - end = std::find(start, header.end(), ' '); + end = std::find(start, last, ' '); auto minorVersion = std::stoi(std::string(start, end)); start = end + 1; // start of status code - end = std::find(start, header.end(), ' '); + end = std::find(start, last, ' '); auto statusCode = std::stoi(std::string(start, end)); start = end + 1; // start of reason phrase - end = std::find(start, header.end(), '\r'); + end = std::find(start, last, '\r'); auto reasonPhrase = std::string(start, end); // remove \r // allocate the instance of response to heap with shared ptr @@ -116,6 +151,14 @@ static std::unique_ptr CreateHTTPResponse(std::string const& header) (uint16_t)majorVersion, (uint16_t)minorVersion, HttpStatusCode(statusCode), reasonPhrase); } +// Creates an HTTP Response with specific bodyType +static std::unique_ptr CreateHTTPResponse(std::string const& header) +{ + return CreateHTTPResponse( + reinterpret_cast(header.data()), + reinterpret_cast(header.data() + header.size())); +} + // To wait for a socket to be ready to be read/write static int WaitForSocketReady(curl_socket_t sockfd, int for_recv, long timeout_ms) { @@ -165,78 +208,94 @@ CURLcode CurlSession::SetConnectOnly() // Send buffer thru the wire CURLcode CurlSession::SendBuffer(uint8_t const* buffer, size_t bufferSize) { - if (bufferSize <= 0) + for (size_t sentBytesTotal = 0; sentBytesTotal < bufferSize;) { - return CURLE_OK; + for (CURLcode sendResult = CURLE_AGAIN; sendResult == CURLE_AGAIN;) + { + size_t sentBytesPerRequest = 0; + sendResult = curl_easy_send( + this->m_pCurl, + buffer + sentBytesTotal, + bufferSize - sentBytesTotal, + &sentBytesPerRequest); + + switch (sendResult) + { + case CURLE_OK: + sentBytesTotal += sentBytesPerRequest; + this->m_uploadedBytes += sentBytesPerRequest; + break; + case CURLE_AGAIN: + if (!WaitForSocketReady(this->m_curlSocket, 0, 60000L)) + { + // TODO: Change this to something more relevant + throw; + } + break; + default: + return sendResult; + } + }; } - size_t sentBytesTotal = 0; - CURLcode sendResult; + return CURLE_OK; +} - do +CURLcode CurlSession::UploadBody(Context& context) { + // Send body UploadStreamPageSize at a time (libcurl default) + // NOTE: if stream is on top a contiguous memory, we can avoid allocating this copying buffer + auto unique_buffer = std::make_unique(UploadStreamPageSize); + auto streamBody = this->m_request.GetBodyStream(); + CURLcode sendResult = CURLE_OK; + + // reusing rawRequestLen variable to read + this->m_uploadedBytes = 0; + while (true) { - size_t sentBytesPerRequest; - do + auto rawRequestLen = streamBody->Read(context, unique_buffer.get(), UploadStreamPageSize); + if (rawRequestLen == 0) { - sentBytesPerRequest = 0; - auto sendFrom = buffer + sentBytesTotal; - auto remainingBytes = bufferSize - sentBytesTotal; - - sendResult = curl_easy_send(this->m_pCurl, sendFrom, remainingBytes, &sentBytesPerRequest); - sentBytesTotal += sentBytesPerRequest; - - if (sendResult == CURLE_AGAIN && !WaitForSocketReady(this->m_curlSocket, 0, 60000L)) - { - throw; - } - } while (sendResult == CURLE_AGAIN); - + break; + } + sendResult = SendBuffer(unique_buffer.get(), static_cast(rawRequestLen)); if (sendResult != CURLE_OK) { return sendResult; } - - } while (sentBytesTotal < bufferSize); - - return CURLE_OK; + } + return sendResult; } // custom sending to wire an http request CURLcode CurlSession::HttpRawSend(Context& context) { + // something like GET /path HTTP1.0 \r\nheaders\r\n auto rawRequest = this->m_request.GetHTTPMessagePreBody(); int64_t rawRequestLen = rawRequest.size(); CURLcode sendResult = SendBuffer( reinterpret_cast(rawRequest.data()), static_cast(rawRequestLen)); + if (sendResult != CURLE_OK || this->m_request.GetMethod() == HttpMethod::Put) + { + return sendResult; + } + auto streamBody = this->m_request.GetBodyStream(); if (streamBody->Length() == 0) { - // Finish request with no body - uint8_t const endOfRequest[] = "0"; - return SendBuffer(endOfRequest, 1); // need one more byte to end request + // Finish request with no body (Head or PUT (expect:100;) + uint8_t const endOfRequest = 0; + return SendBuffer(&endOfRequest, 1); // need one more byte to end request } - - // Send body 64k at a time (libcurl default) - // NOTE: if stream is on top a contiguous memory, we can avoid allocating this copying buffer - std::unique_ptr unique_buffer(new uint8_t[UploadStreamPageSize]); - auto buffer = unique_buffer.get(); - while (rawRequestLen > 0) - { - rawRequestLen = streamBody->Read(context, buffer, UploadStreamPageSize); - sendResult = SendBuffer(buffer, static_cast(rawRequestLen)); - } - return sendResult; + return this->UploadBody(context); } // Read status line plus headers to create a response with no body -CURLcode CurlSession::ReadStatusLineAndHeadersFromRawResponse() +void CurlSession::ReadStatusLineAndHeadersFromRawResponse() { auto parser = ResponseBufferParser(); auto bufferSize = int64_t(); - // Select a default reading strategy. // No content-length or Transfer-Encoding - this->m_bodyLengthType = ResponseBodyLengthType::ReadToCloseConnection; // Keep reading until all headers were read while (!parser.IsParseCompleted()) @@ -245,12 +304,12 @@ CURLcode CurlSession::ReadStatusLineAndHeadersFromRawResponse() // If response is smaller than buffer, we will get back the size of the response bufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize); - // parse from buffer to create response + // returns the number of bytes parsed up to the body Start auto bytesParsed = parser.Parse(this->m_readBuffer, static_cast(bufferSize)); - // if end of headers is reach before the end of response, that's where body start - if (bytesParsed + 2 < bufferSize) + + if (bytesParsed < bufferSize) { - this->m_bodyStartInBuffer = bytesParsed + 1; // Set the start of body (skip \r) + this->m_bodyStartInBuffer = bytesParsed; // Body Start } } @@ -258,27 +317,27 @@ CURLcode CurlSession::ReadStatusLineAndHeadersFromRawResponse() this->m_innerBufferSize = static_cast(bufferSize); // For Head request, set the length of body response to 0. + // Response will give us content-length as if we were not doing Head saying what would it be the + // length of the body. However, Server won't send body if (this->m_request.GetMethod() == HttpMethod::Head) { - this->m_bodyLengthType = ResponseBodyLengthType::NoBody; - return CURLE_OK; + this->m_contentLength = 0; + this->m_rawResponseEOF = true; + return; } - // TODO: tolower ContentLength + // headers are already loweCase at this point auto headers = this->m_response->GetHeaders(); auto isContentLengthHeaderInResponse = headers.find("Content-Length"); if (isContentLengthHeaderInResponse != headers.end()) { - // Response with Content-Length - auto bodySize = std::stoull(headers.at("Content-Length").data()); - // content-length is used later by session and session won't have access to the response any - // more (unique_ptr), so we save this value - this->m_contentLength = bodySize; - this->m_bodyLengthType = ResponseBodyLengthType::ContentLength; - return CURLE_OK; + this->m_contentLength + = static_cast(std::stoull(isContentLengthHeaderInResponse->second.data())); + return; } + this->m_contentLength = -1; auto isTransferEncodingHeaderInResponse = headers.find("Transfer-Encoding"); if (isTransferEncodingHeaderInResponse != headers.end()) { @@ -289,11 +348,42 @@ CURLcode CurlSession::ReadStatusLineAndHeadersFromRawResponse() { // set curl session to know response is chunked // This will be used to remove chunked info while reading - this->m_bodyLengthType = ResponseBodyLengthType::Chunked; - return CURLE_OK; + this->m_isChunkedResponseType = true; + + // Need to move body start after chunk size + if (this->m_bodyStartInBuffer == -1) + { // if nothing on inner buffer, pull from wire + this->m_innerBufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize); + this->m_bodyStartInBuffer = 0; + } + + for (bool keepPolling = true; keepPolling;) + { + for (int64_t index = this->m_bodyStartInBuffer; index < this->m_innerBufferSize; index++) + { + if (index > 0 && this->m_readBuffer[index] == '\n') + { + if (index + 1 == bufferSize) + { // on last index. Whatever we read is the BodyStart here + this->m_innerBufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize); + this->m_bodyStartInBuffer = 0; + } + else + { // not at the end, buffer like [999 \r\nBody...] + this->m_bodyStartInBuffer = index + 1; + } + keepPolling = false; + break; + } + } + if (keepPolling) + { // Read all internal buffer and \n was not found, pull from wire + this->m_innerBufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize); + } + } + return; } } - /* https://tools.ietf.org/html/rfc7230#section-3.3.3 7. Otherwise, this is a response message without a declared message @@ -301,111 +391,67 @@ CURLcode CurlSession::ReadStatusLineAndHeadersFromRawResponse() number of octets received prior to the server closing the connection. */ - - // Use unknown size CurlBodyStream. CurlSession will use the ResponseBodyLengthType to select a - // reading strategy - this->m_bodyLengthType = ResponseBodyLengthType::ReadToCloseConnection; - return CURLE_OK; } // Read from curl session int64_t CurlSession::Read(Azure::Core::Context& context, uint8_t* buffer, int64_t count) { context.ThrowIfCanceled(); - auto totalRead = int64_t(); - // Take data from inner buffer if any - if (this->m_bodyStartInBuffer > 0) + if (count <= 0) { - if (this->m_readBuffer[this->m_bodyStartInBuffer] == '\n' && this->m_sessionTotalRead == 0) - { - // first read. Need to move to next position - if (this->m_bodyLengthType == ResponseBodyLengthType::Chunked) - { - // For chunked, first advance until next `\r` after chunked size (\nsomeNumber\r\n) - auto nextPosition = std::find( - this->m_readBuffer + this->m_bodyStartInBuffer, - this->m_readBuffer + this->m_innerBufferSize, - '\r'); - if (nextPosition != this->m_readBuffer + this->m_innerBufferSize) - { - // Found possition of next '\r', +1 jumps the \r - this->m_bodyStartInBuffer - += std::distance(this->m_readBuffer + this->m_bodyStartInBuffer, nextPosition) + 1; - - // Check if the end of body is also at inner buffer - auto endOfChunk = std::find( - this->m_readBuffer + this->m_bodyStartInBuffer, - this->m_readBuffer + this->m_innerBufferSize, - '\r'); - if (endOfChunk != this->m_readBuffer + this->m_innerBufferSize) - { - this->m_innerBufferSize - -= std::distance(endOfChunk, this->m_readBuffer + this->m_innerBufferSize); - } - } // TODO: else read from wire until next \r - } - this->m_bodyStartInBuffer += 1; - } - if (this->m_bodyStartInBuffer < this->m_innerBufferSize) - { - // still have data to take from innerbuffer - MemoryBodyStream innerBufferMemoryStream( - this->m_readBuffer + this->m_bodyStartInBuffer, - this->m_innerBufferSize - this->m_bodyStartInBuffer); - totalRead = innerBufferMemoryStream.Read(context, buffer, count); - this->m_bodyStartInBuffer += totalRead; - this->m_sessionTotalRead += totalRead; - if (this->m_bodyStartInBuffer == this->m_innerBufferSize) - { - this->m_bodyStartInBuffer = 0; // read everyting from inner buffer already - } - return totalRead; - } - // After moving the reading start we reached the end - this->m_bodyStartInBuffer = 0; - } - - // if the last position in inner buffer is `\r` it means the next - // thing we read from wire is `\n`. (usually this is when reading 1byte per time from wire) - if (this->m_readBuffer[this->m_innerBufferSize - 1] == '\r') - { - // Read one possition from socket on same user buffer, We wil override the value after this - ReadSocketToBuffer(buffer, 1); - } - - if (this->m_bodyLengthType == ResponseBodyLengthType::ContentLength - && this->m_sessionTotalRead == this->m_contentLength) - { - // Read everything already - curl_easy_cleanup(this->m_pCurl); + // LimitStream would try to read 0 bytes return 0; } - // Read from socket + auto totalRead = int64_t(); + + // Take data from inner buffer if any + if (this->m_bodyStartInBuffer >= 0) + { + // still have data to take from innerbuffer + MemoryBodyStream innerBufferMemoryStream( + this->m_readBuffer + this->m_bodyStartInBuffer, + this->m_innerBufferSize - this->m_bodyStartInBuffer); + + totalRead = innerBufferMemoryStream.Read(context, buffer, count); + this->m_bodyStartInBuffer += totalRead; + this->m_sessionTotalRead += totalRead; + + if (this->m_bodyStartInBuffer == this->m_innerBufferSize) + { + this->m_bodyStartInBuffer = -1; // read everyting from inner buffer already + } + return totalRead; + } + + // Head request have contentLength = 0, so we won't read more, just return 0 + // Also if we have already read all contentLength + if (this->m_sessionTotalRead == this->m_contentLength || this->m_rawResponseEOF) + { + // Read everything already + return 0; + } + + // Read from socket when no more data on internal buffer totalRead = ReadSocketToBuffer(buffer, static_cast(count)); this->m_sessionTotalRead += totalRead; - if (this->m_bodyLengthType == ResponseBodyLengthType::Chunked && totalRead > 0) + if (this->m_isChunkedResponseType && totalRead > 0) { // Check if the end of chunked is part of the body auto endOfBody = std::find(buffer, buffer + totalRead, '\r'); if (endOfBody != buffer + totalRead) { + // End of stream is when chunk is 0 (0\r\n) if (buffer[0] == '0' && buffer + 1 == endOfBody) { - // got already the end - curl_easy_cleanup(this->m_pCurl); + // got already the end. Usually when all body was at innerBuffer and next pull from wire + // returns only the end of chunk return 0; } - // Read all remaining to close connection - { - constexpr int64_t finalRead = 50; // usually only 5 more bytes are gotten "0\r\n\r\n" - uint8_t b[finalRead]; - ReadSocketToBuffer(b, finalRead); - curl_easy_cleanup(this->m_pCurl); - } totalRead -= std::distance(endOfBody, buffer + totalRead); + this->m_rawResponseEOF = true; } } return totalRead; @@ -414,23 +460,28 @@ int64_t CurlSession::Read(Azure::Core::Context& context, uint8_t* buffer, int64_ // Read from socket and return the number of bytes taken from socket int64_t CurlSession::ReadSocketToBuffer(uint8_t* buffer, int64_t bufferSize) { - CURLcode readResult; + // loop until read result is not CURLE_AGAIN size_t readBytes = 0; - - do // try to read from socket until response is OK + for (CURLcode readResult = CURLE_AGAIN; readResult == CURLE_AGAIN;) { readResult = curl_easy_recv(this->m_pCurl, buffer, static_cast(bufferSize), &readBytes); - if (readResult == CURLE_AGAIN) - { - readResult = CURLE_AGAIN; - } - // socket not ready. Wait or fail on timeout - if (readResult == CURLE_AGAIN && !WaitForSocketReady(this->m_curlSocket, 1, 60000L)) - { - throw; - } - } while (readResult == CURLE_AGAIN); // Keep trying to read until result is not CURLE_AGAIN + switch (readResult) + { + case CURLE_AGAIN: + if (!WaitForSocketReady(this->m_curlSocket, 0, 60000L)) + { + // TODO: Change this to somehing more relevant + throw; + } + break; + case CURLE_OK: + break; + default: + // Error code while reading from socket + return -1; + } + } return readBytes; } @@ -448,32 +499,113 @@ int64_t CurlSession::ResponseBufferParser::Parse( return 0; } - switch (this->state) + // Read all buffer until \r\n is found + int64_t start = 0, index = 0; + for (; index < bufferSize; index++) { - case ResponseParserState::StatusLine: { - auto parsedBytes = BuildStatusCode(buffer, bufferSize); - if (parsedBytes < bufferSize) // status code is built and buffer can be still parsed - { - // Can keep parsing, Control have moved to headers - return parsedBytes + Parse(buffer + parsedBytes, bufferSize - parsedBytes); - } - return parsedBytes; + if (buffer[index] == '\r') + { + this->m_delimiterStartInPrevPosition = true; + continue; } - case ResponseParserState::Headers: { - auto parsedBytes = BuildHeader(buffer, bufferSize); - if (!this->m_parseCompleted - && parsedBytes < bufferSize) // status code is built and buffer can be still parsed + + if (buffer[index] == '\n' && this->m_delimiterStartInPrevPosition) + { + // found end of delimiter + if (this->m_internalBuffer.size() > 0) // Check internal buffer { - // Can keep parsing, Control have moved to headers - return parsedBytes + Parse(buffer + parsedBytes, bufferSize - parsedBytes); + // At this point, we are reading to append more to internal buffer. + // Only append more if index is greater than 1, meaning not when buffer is [\r\nxxx] + // only on buffer like [xxx\r\n yyyy], append xxx + if (index > 1) + { + // Previously appended something + this->m_internalBuffer.append(buffer + start, buffer + index - 1); // minus 1 to remove \r + } + if (this->state == ResponseParserState::StatusLine) + { + // Create Response + this->m_response = CreateHTTPResponse(this->m_internalBuffer); + // Set state to headers + this->state = ResponseParserState::Headers; + this->m_delimiterStartInPrevPosition = false; + start = index + 1; // jump \n + } + else if (this->state == ResponseParserState::Headers) + { + // Add header. TODO: Do toLower so all headers are lowerCase + this->m_response->AddHeader(this->m_internalBuffer); + this->m_delimiterStartInPrevPosition = false; + start = index + 1; // jump \n + } + else + { + // Should never happen that parser is not statusLIne or Headers and we still try to parse + // more. + throw; + } + // clean internal buffer + this->m_internalBuffer.clear(); + } + else + { + // Nothing at internal buffer. Add directly from internal buffer + if (this->state == ResponseParserState::StatusLine) + { + // Create Response + this->m_response = CreateHTTPResponse(buffer + start, buffer + index - 1); + // Set state to headers + this->state = ResponseParserState::Headers; + this->m_delimiterStartInPrevPosition = false; + start = index + 1; // jump \n + } + else if (this->state == ResponseParserState::Headers) + { + // Check if this is end of headers delimiter + // 1) internal buffer is empty and \n is the first char on buffer [\nBody...] + // 2) index == start + 1. No header data after last \r\n [header\r\n\r\n] + if (index == 0 || index == start + 1) + { + this->m_parseCompleted = true; + return index + 1; // plus 1 to advance the \n. If we were at buffer end. + } + + // Add header. TODO: Do toLower so all headers are lowerCase + this->m_response->AddHeader(buffer + start, buffer + index - 1); + this->m_delimiterStartInPrevPosition = false; + start = index + 1; // jump \n + } + else + { + // Should never happen that parser is not statusLIne or Headers and we still try to parse + // more. + throw; + } } - return parsedBytes; } - case ResponseParserState::EndOfHeaders: - default: { - return 0; + else + { + if (index == 0 && this->m_internalBuffer.size() > 0 && this->m_delimiterStartInPrevPosition) + { + // unlikely. But this means a case with buffers like [xx\r], [xxxx] + // \r is not delimiter and in previous loop it was omitted, so adding it now + this->m_internalBuffer.append("\r"); + } + // \r in the response without \n after it. keep parsing + this->m_delimiterStartInPrevPosition = false; } } + + if (start < bufferSize) + { + // didn't find the end of delimiter yet, save at internal buffer + // If this->m_delimiterStartInPrevPosition is true, buffer ends in \r [xxxx\r] + // Don't add \r. IF next char is not \n, we will append \r then on next loop + this->m_internalBuffer.append( + buffer + start, buffer + bufferSize - (this->m_delimiterStartInPrevPosition ? 1 : 0)); + } + + return index; } // Finds delimiter '\r' as the end of the diff --git a/sdk/core/azure-core/src/http/response.cpp b/sdk/core/azure-core/src/http/response.cpp index b83d4c4df..0b5ebb9ff 100644 --- a/sdk/core/azure-core/src/http/response.cpp +++ b/sdk/core/azure-core/src/http/response.cpp @@ -1,6 +1,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // SPDX-License-Identifier: MIT +#include +#include #include #include @@ -16,35 +18,42 @@ std::string const& Response::GetReasonPhrase() { return m_reasonPhrase; } std::map const& Response::GetHeaders() const { return this->m_headers; } -void Response::AddHeader(std::string const& header) +void Response::AddHeader(uint8_t const* const begin, uint8_t const* const last) { // get name and value from header - auto start = header.begin(); - auto end = std::find(start, header.end(), ':'); + auto start = begin; + auto end = std::find(start, last, ':'); - if (end == header.end()) + if (end == last) { return; // not a valid header or end of headers symbol reached } + // Always toLower() headers + // auto headerName = Azure::Core::Details::ToLower(std::string(start, end)); auto headerName = std::string(start, end); start = end + 1; // start value - while (start < header.end() && (*start == ' ' || *start == '\t')) + while (start < last && (*start == ' ' || *start == '\t')) { ++start; } - end = std::find(start, header.end(), '\r'); + end = std::find(start, last, '\r'); auto headerValue = std::string(start, end); // remove \r AddHeader(headerName, headerValue); } +void Response::AddHeader(std::string const& header) +{ + return AddHeader( + reinterpret_cast(header.data()), + reinterpret_cast(header.data() + header.size())); +} + void Response::AddHeader(std::string const& name, std::string const& value) { - // TODO: make sure the Content-Length header is insterted as "Content-Length" no mather the case - // We currently assume we receive it like it and expected to be there from all HTTP - // Responses. + this->m_headers.insert(std::pair(name, value)); } diff --git a/sdk/core/azure-core/src/strings.cpp b/sdk/core/azure-core/src/strings.cpp index 4f45d599e..ef2f840ff 100644 --- a/sdk/core/azure-core/src/strings.cpp +++ b/sdk/core/azure-core/src/strings.cpp @@ -65,6 +65,20 @@ const unsigned char c_LocaleInvariantUppercaseTable[256] = { namespace Azure { namespace Core { namespace Details { + std::string const ToLower(const std::string& src) noexcept + { + auto result = std::string(src); + for (auto i = result.begin(); i < result.end(); i++) + { + *i = c_LocaleInvariantUppercaseTable[static_cast(*i)]; + if (*i >= 'A' && *i <= 'Z') + { + *i += 32; + } + } + return result; + } + bool LocaleInvariantCaseInsensitiveEqual(const std::string& lhs, const std::string& rhs) noexcept { return std::equal( diff --git a/sdk/samples/http_client/curl/CMakeLists.txt b/sdk/core/azure-core/test/e2e/CMakeLists.txt similarity index 50% rename from sdk/samples/http_client/curl/CMakeLists.txt rename to sdk/core/azure-core/test/e2e/CMakeLists.txt index 494363e65..df1bf82fd 100644 --- a/sdk/samples/http_client/curl/CMakeLists.txt +++ b/sdk/core/azure-core/test/e2e/CMakeLists.txt @@ -6,6 +6,8 @@ if (BUILD_CURL_TRANSPORT) cmake_minimum_required (VERSION 3.12) set(TARGET_NAME "azure_core_with_curl") set(TARGET_NAME_STREAM "azure_core_with_curl_stream") +set(TARGET_NAME_STORAGE_ISSUE_249 "azure_core_storage_issue_249") +set(TARGET_NAME_STORAGE_ISSUE_248 "azure_core_storage_issue_248") project(${TARGET_NAME} LANGUAGES CXX) set(CMAKE_CXX_STANDARD 14) @@ -13,15 +15,27 @@ set(CMAKE_CXX_STANDARD_REQUIRED True) add_executable ( ${TARGET_NAME} - src/azure_core_with_curl_bodyBuffer + azure_core_with_curl_bodyBuffer ) add_executable ( ${TARGET_NAME_STREAM} - src/azure_core_with_curl_bodyStream + azure_core_with_curl_bodyStream +) + +add_executable ( + ${TARGET_NAME_STORAGE_ISSUE_249} + azure_core_storage_test_sample +) + +add_executable ( + ${TARGET_NAME_STORAGE_ISSUE_248} + azure_core_storage_list_containers_sample ) target_link_libraries(${TARGET_NAME} PRIVATE azure-core) target_link_libraries(${TARGET_NAME_STREAM} PRIVATE azure-core) +target_link_libraries(${TARGET_NAME_STORAGE_ISSUE_249} PRIVATE azure-core) +target_link_libraries(${TARGET_NAME_STORAGE_ISSUE_248} PRIVATE azure-core) endif() diff --git a/sdk/core/azure-core/test/e2e/azure_core_storage_list_containers_sample.cpp b/sdk/core/azure-core/test/e2e/azure_core_storage_list_containers_sample.cpp new file mode 100644 index 000000000..b65eb6089 --- /dev/null +++ b/sdk/core/azure-core/test/e2e/azure_core_storage_list_containers_sample.cpp @@ -0,0 +1,43 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// SPDX-License-Identifier: MIT + +#include "http/pipeline.hpp" + +#include +#include +#include +#include +#include +#include + +using namespace Azure::Core; +using namespace Azure::Core::Http; +using namespace std; + +int main() +{ + + // Create the Transport + std::shared_ptr transport = std::make_unique(); + + std::vector> policies; + + // Add the transport policy + policies.push_back(std::make_unique(std::move(transport))); + + auto httpPipeline = Http::HttpPipeline(policies); + + auto context = Context(); + + string host("http://anglesharp.azurewebsites.net/Chunked"); + + auto request = Http::Request(Http::HttpMethod::Get, host); + + auto response = httpPipeline.Send(context, request); + auto response_bodystream = response->GetBodyStream(); + auto response_body = BodyStream::ReadToEnd(context, *response_bodystream); + + cout << response_body.data() << endl; + + return 0; +} diff --git a/sdk/core/azure-core/test/e2e/azure_core_storage_test_sample.cpp b/sdk/core/azure-core/test/e2e/azure_core_storage_test_sample.cpp new file mode 100644 index 000000000..0cb8a8259 --- /dev/null +++ b/sdk/core/azure-core/test/e2e/azure_core_storage_test_sample.cpp @@ -0,0 +1,56 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// SPDX-License-Identifier: MIT + +#ifdef _MSC_VER +#define _CRT_SECURE_NO_WARNINGS +#endif + +#include "http/pipeline.hpp" + +#include +#include +#include +#include +#include +#include + +using namespace Azure::Core; +using namespace Azure::Core::Http; +using namespace std; + +int main() +{ + + // Create the Transport + std::shared_ptr transport = std::make_unique(); + + std::vector> policies; + + // Add the transport policy + policies.push_back(std::make_unique(std::move(transport))); + + auto httpPipeline = Http::HttpPipeline(policies); + + auto context = Context(); + + // STORAGE_BLOB_WITH_SAS = like + // "https://account.windows.net/azure/container/blob?sv=...&ss=...&..." + string host(std::getenv("STORAGE_BLOB_WITH_SAS")); + + std::vector request_bodydata(500 * 1024 * 1024, '1'); + cout << request_bodydata.size() << endl; + + MemoryBodyStream requestBodyStream(request_bodydata.data(), request_bodydata.size()); + auto request = Http::Request(Http::HttpMethod::Put, host, &requestBodyStream); + request.AddHeader("Content-Length", std::to_string(request_bodydata.size())); + request.AddHeader("x-ms-version", "2019-07-07"); + request.AddHeader("x-ms-blob-type", "BlockBlob"); + + auto response = httpPipeline.Send(context, request); + + auto bodyS = response->GetBodyStream(); + auto body = BodyStream::ReadToEnd(context, *bodyS); + cout << body.data() << endl; + + return 0; +} diff --git a/sdk/samples/http_client/curl/src/azure_core_with_curl_bodyBuffer.cpp b/sdk/core/azure-core/test/e2e/azure_core_with_curl_bodyBuffer.cpp similarity index 100% rename from sdk/samples/http_client/curl/src/azure_core_with_curl_bodyBuffer.cpp rename to sdk/core/azure-core/test/e2e/azure_core_with_curl_bodyBuffer.cpp index 91ace1da0..23577fcc0 100644 --- a/sdk/samples/http_client/curl/src/azure_core_with_curl_bodyBuffer.cpp +++ b/sdk/core/azure-core/test/e2e/azure_core_with_curl_bodyBuffer.cpp @@ -54,10 +54,10 @@ int main() auto context = Context(); // Both requests uses a body buffer to be uploaded that would produce responses with bodyBuffer + doHeadRequest(context, httpPipeline); doFileRequest(context, httpPipeline); doGetRequest(context, httpPipeline); doPutRequest(context, httpPipeline); - doHeadRequest(context, httpPipeline); doDeleteRequest(context, httpPipeline); doPatchRequest(context, httpPipeline); } diff --git a/sdk/samples/http_client/curl/src/azure_core_with_curl_bodyStream.cpp b/sdk/core/azure-core/test/e2e/azure_core_with_curl_bodyStream.cpp similarity index 95% rename from sdk/samples/http_client/curl/src/azure_core_with_curl_bodyStream.cpp rename to sdk/core/azure-core/test/e2e/azure_core_with_curl_bodyStream.cpp index d64011185..78ad7ec22 100644 --- a/sdk/samples/http_client/curl/src/azure_core_with_curl_bodyStream.cpp +++ b/sdk/core/azure-core/test/e2e/azure_core_with_curl_bodyStream.cpp @@ -98,8 +98,8 @@ void doGetRequest(Context context, HttpPipeline& pipeline) request.AddHeader("Host", "httpbin.org"); - request.AddQueryParameter("dinamicArg", "3"); - request.AddQueryParameter("dinamicArg2", "4"); + request.AddQueryParameter("dynamicArg", "3"); + request.AddQueryParameter("dynamicArg2", "4"); auto response = pipeline.Send(context, request); printStream(context, std::move(response)); @@ -155,9 +155,9 @@ void doPutStreamRequest(Context context, HttpPipeline& pipeline) request.AddHeader("Content-Length", std::to_string(StreamSize)); - request.AddQueryParameter("dinamicArg", "1"); - request.AddQueryParameter("dinamicArg2", "1"); - request.AddQueryParameter("dinamicArg3", "1"); + request.AddQueryParameter("dynamicArg", "1"); + request.AddQueryParameter("dynamicArg2", "1"); + request.AddQueryParameter("dynamicArg3", "1"); printStream(context, std::move(pipeline.Send(context, request))); } diff --git a/sdk/core/azure-core/test/CMakeLists.txt b/sdk/core/azure-core/test/ut/CMakeLists.txt similarity index 100% rename from sdk/core/azure-core/test/CMakeLists.txt rename to sdk/core/azure-core/test/ut/CMakeLists.txt diff --git a/sdk/core/azure-core/test/http.cpp b/sdk/core/azure-core/test/ut/http.cpp similarity index 100% rename from sdk/core/azure-core/test/http.cpp rename to sdk/core/azure-core/test/ut/http.cpp diff --git a/sdk/core/azure-core/test/main.cpp b/sdk/core/azure-core/test/ut/main.cpp similarity index 100% rename from sdk/core/azure-core/test/main.cpp rename to sdk/core/azure-core/test/ut/main.cpp diff --git a/sdk/core/azure-core/test/nullable.cpp b/sdk/core/azure-core/test/ut/nullable.cpp similarity index 100% rename from sdk/core/azure-core/test/nullable.cpp rename to sdk/core/azure-core/test/ut/nullable.cpp diff --git a/sdk/core/azure-core/test/string.cpp b/sdk/core/azure-core/test/ut/string.cpp similarity index 100% rename from sdk/core/azure-core/test/string.cpp rename to sdk/core/azure-core/test/ut/string.cpp