diff --git a/sdk/core/azure-core/inc/http/curl/curl.hpp b/sdk/core/azure-core/inc/http/curl/curl.hpp index fdd884fa7..d0f58454f 100644 --- a/sdk/core/azure-core/inc/http/curl/curl.hpp +++ b/sdk/core/azure-core/inc/http/curl/curl.hpp @@ -237,6 +237,13 @@ namespace Azure { namespace Core { namespace Http { */ int64_t m_contentLength; + /** + * @brief For chunked responses, this field knows the size of the current chuck size server will + * de sending + * + */ + int64_t m_chunkSize; + int64_t m_sessionTotalRead = 0; /** @@ -335,6 +342,13 @@ namespace Azure { namespace Core { namespace Http { */ void ReadStatusLineAndHeadersFromRawResponse(); + /** + * @brief Reads from inner buffer or from Wire until chunkSize is parsed and converted to + * unsigned long long + * + */ + void ParseChunkSize(); + /** * @brief This function is used when working with streams to pull more data from the wire. * Function will try to keep pulling data from socket until the buffer is all written or until diff --git a/sdk/core/azure-core/src/http/curl/curl.cpp b/sdk/core/azure-core/src/http/curl/curl.cpp index 98ee5f673..245308c34 100644 --- a/sdk/core/azure-core/src/http/curl/curl.cpp +++ b/sdk/core/azure-core/src/http/curl/curl.cpp @@ -296,6 +296,47 @@ CURLcode CurlSession::HttpRawSend(Context& context) return this->UploadBody(context); } +void CurlSession::ParseChunkSize() +{ + // Use this string to construct the chunk size. This is because we could have an internal + // buffer like [headers...\r\n123], where 123 is chunk size but we still need to pull more + // data fro wire to get the full chunkSize. Next data could be just [\r\n] or [456\r\n] + auto strChunkSize = std::string(); + + // Move to after chunk size + for (bool keepPolling = true; keepPolling;) + { + for (int64_t index = this->m_bodyStartInBuffer, i = 0; index < this->m_innerBufferSize; + index++, i++) + { + strChunkSize.append(reinterpret_cast(&this->m_readBuffer[index]), 1); + if (i > 1 && this->m_readBuffer[index] == '\n') + { + // get chunk size. Chunk size comes in Hex value + this->m_chunkSize = static_cast(std::stoull(strChunkSize, nullptr, 16)); + + if (index + 1 == this->m_innerBufferSize) + { // on last index. Whatever we read is the BodyStart here + this->m_innerBufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize); + this->m_bodyStartInBuffer = 0; + } + else + { // not at the end, buffer like [999 \r\nBody...] + this->m_bodyStartInBuffer = index + 1; + } + keepPolling = false; + break; + } + } + if (keepPolling) + { // Read all internal buffer and \n was not found, pull from wire + this->m_innerBufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize); + this->m_bodyStartInBuffer = 0; + } + } + return; +} + // Read status line plus headers to create a response with no body void CurlSession::ReadStatusLineAndHeadersFromRawResponse() { @@ -362,30 +403,7 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse() this->m_bodyStartInBuffer = 0; } - for (bool keepPolling = true; keepPolling;) - { - for (int64_t index = this->m_bodyStartInBuffer; index < this->m_innerBufferSize; index++) - { - if (index > 0 && this->m_readBuffer[index] == '\n') - { - if (index + 1 == bufferSize) - { // on last index. Whatever we read is the BodyStart here - this->m_innerBufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize); - this->m_bodyStartInBuffer = 0; - } - else - { // not at the end, buffer like [999 \r\nBody...] - this->m_bodyStartInBuffer = index + 1; - } - keepPolling = false; - break; - } - } - if (keepPolling) - { // Read all internal buffer and \n was not found, pull from wire - this->m_innerBufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize); - } - } + ParseChunkSize(); return; } } @@ -409,7 +427,36 @@ int64_t CurlSession::Read(Azure::Core::Context& context, uint8_t* buffer, int64_ return 0; } + // check if all chunked is read already + if (this->m_isChunkedResponseType && this->m_chunkSize == 0) + { + // Need to read CRLF after all chunk was read + for (int8_t i = 0; i < 2; i++) + { + if (this->m_bodyStartInBuffer > 0 && this->m_bodyStartInBuffer < this->m_innerBufferSize) + { + this->m_bodyStartInBuffer += 1; + } + else + { // end of buffer, pull data from wire + this->m_innerBufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize); + this->m_bodyStartInBuffer = 1; // jump first char (could be \r or \n) + } + } + // get the size of next chunk + ParseChunkSize(); + + if (this->m_chunkSize == 0) + { + // end of transfer + this->m_rawResponseEOF = true; + return 0; + } + } + auto totalRead = int64_t(); + auto ReadRequestLength + = this->m_isChunkedResponseType ? std::min(this->m_chunkSize, count) : count; // Take data from inner buffer if any if (this->m_bodyStartInBuffer >= 0) @@ -419,9 +466,13 @@ int64_t CurlSession::Read(Azure::Core::Context& context, uint8_t* buffer, int64_ this->m_readBuffer + this->m_bodyStartInBuffer, this->m_innerBufferSize - this->m_bodyStartInBuffer); - totalRead = innerBufferMemoryStream.Read(context, buffer, count); + totalRead = innerBufferMemoryStream.Read(context, buffer, ReadRequestLength); this->m_bodyStartInBuffer += totalRead; this->m_sessionTotalRead += totalRead; + if (this->m_isChunkedResponseType) + { + this->m_chunkSize -= totalRead; + } if (this->m_bodyStartInBuffer == this->m_innerBufferSize) { @@ -439,26 +490,14 @@ int64_t CurlSession::Read(Azure::Core::Context& context, uint8_t* buffer, int64_ } // Read from socket when no more data on internal buffer - totalRead = ReadSocketToBuffer(buffer, static_cast(count)); + // For chunk request, read a chunk based on chunk size + totalRead = ReadSocketToBuffer(buffer, static_cast(ReadRequestLength)); this->m_sessionTotalRead += totalRead; - - if (this->m_isChunkedResponseType && totalRead > 0) + if (this->m_isChunkedResponseType) { - // Check if the end of chunked is part of the body - auto endOfBody = std::find(buffer, buffer + totalRead, '\r'); - if (endOfBody != buffer + totalRead) - { - // End of stream is when chunk is 0 (0\r\n) - if (buffer[0] == '0' && buffer + 1 == endOfBody) - { - // got already the end. Usually when all body was at innerBuffer and next pull from wire - // returns only the end of chunk - return 0; - } - totalRead -= std::distance(endOfBody, buffer + totalRead); - this->m_rawResponseEOF = true; - } + this->m_chunkSize -= totalRead; } + return totalRead; }