Http transport adapter - read all chunks from server until chunkSize is zero (#273)
* fix for getting all chunks from a chunked response until chuksize is zero * fix for pulling from wire when inner buffer is empty
This commit is contained in:
parent
05c91d6002
commit
c0faea5f90
@ -237,6 +237,13 @@ namespace Azure { namespace Core { namespace Http {
|
||||
*/
|
||||
int64_t m_contentLength;
|
||||
|
||||
/**
|
||||
* @brief For chunked responses, this field knows the size of the current chuck size server will
|
||||
* de sending
|
||||
*
|
||||
*/
|
||||
int64_t m_chunkSize;
|
||||
|
||||
int64_t m_sessionTotalRead = 0;
|
||||
|
||||
/**
|
||||
@ -335,6 +342,13 @@ namespace Azure { namespace Core { namespace Http {
|
||||
*/
|
||||
void ReadStatusLineAndHeadersFromRawResponse();
|
||||
|
||||
/**
|
||||
* @brief Reads from inner buffer or from Wire until chunkSize is parsed and converted to
|
||||
* unsigned long long
|
||||
*
|
||||
*/
|
||||
void ParseChunkSize();
|
||||
|
||||
/**
|
||||
* @brief This function is used when working with streams to pull more data from the wire.
|
||||
* Function will try to keep pulling data from socket until the buffer is all written or until
|
||||
|
||||
@ -296,6 +296,47 @@ CURLcode CurlSession::HttpRawSend(Context& context)
|
||||
return this->UploadBody(context);
|
||||
}
|
||||
|
||||
void CurlSession::ParseChunkSize()
|
||||
{
|
||||
// Use this string to construct the chunk size. This is because we could have an internal
|
||||
// buffer like [headers...\r\n123], where 123 is chunk size but we still need to pull more
|
||||
// data fro wire to get the full chunkSize. Next data could be just [\r\n] or [456\r\n]
|
||||
auto strChunkSize = std::string();
|
||||
|
||||
// Move to after chunk size
|
||||
for (bool keepPolling = true; keepPolling;)
|
||||
{
|
||||
for (int64_t index = this->m_bodyStartInBuffer, i = 0; index < this->m_innerBufferSize;
|
||||
index++, i++)
|
||||
{
|
||||
strChunkSize.append(reinterpret_cast<char*>(&this->m_readBuffer[index]), 1);
|
||||
if (i > 1 && this->m_readBuffer[index] == '\n')
|
||||
{
|
||||
// get chunk size. Chunk size comes in Hex value
|
||||
this->m_chunkSize = static_cast<int64_t>(std::stoull(strChunkSize, nullptr, 16));
|
||||
|
||||
if (index + 1 == this->m_innerBufferSize)
|
||||
{ // on last index. Whatever we read is the BodyStart here
|
||||
this->m_innerBufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize);
|
||||
this->m_bodyStartInBuffer = 0;
|
||||
}
|
||||
else
|
||||
{ // not at the end, buffer like [999 \r\nBody...]
|
||||
this->m_bodyStartInBuffer = index + 1;
|
||||
}
|
||||
keepPolling = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (keepPolling)
|
||||
{ // Read all internal buffer and \n was not found, pull from wire
|
||||
this->m_innerBufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize);
|
||||
this->m_bodyStartInBuffer = 0;
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Read status line plus headers to create a response with no body
|
||||
void CurlSession::ReadStatusLineAndHeadersFromRawResponse()
|
||||
{
|
||||
@ -362,30 +403,7 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse()
|
||||
this->m_bodyStartInBuffer = 0;
|
||||
}
|
||||
|
||||
for (bool keepPolling = true; keepPolling;)
|
||||
{
|
||||
for (int64_t index = this->m_bodyStartInBuffer; index < this->m_innerBufferSize; index++)
|
||||
{
|
||||
if (index > 0 && this->m_readBuffer[index] == '\n')
|
||||
{
|
||||
if (index + 1 == bufferSize)
|
||||
{ // on last index. Whatever we read is the BodyStart here
|
||||
this->m_innerBufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize);
|
||||
this->m_bodyStartInBuffer = 0;
|
||||
}
|
||||
else
|
||||
{ // not at the end, buffer like [999 \r\nBody...]
|
||||
this->m_bodyStartInBuffer = index + 1;
|
||||
}
|
||||
keepPolling = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (keepPolling)
|
||||
{ // Read all internal buffer and \n was not found, pull from wire
|
||||
this->m_innerBufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize);
|
||||
}
|
||||
}
|
||||
ParseChunkSize();
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -409,7 +427,36 @@ int64_t CurlSession::Read(Azure::Core::Context& context, uint8_t* buffer, int64_
|
||||
return 0;
|
||||
}
|
||||
|
||||
// check if all chunked is read already
|
||||
if (this->m_isChunkedResponseType && this->m_chunkSize == 0)
|
||||
{
|
||||
// Need to read CRLF after all chunk was read
|
||||
for (int8_t i = 0; i < 2; i++)
|
||||
{
|
||||
if (this->m_bodyStartInBuffer > 0 && this->m_bodyStartInBuffer < this->m_innerBufferSize)
|
||||
{
|
||||
this->m_bodyStartInBuffer += 1;
|
||||
}
|
||||
else
|
||||
{ // end of buffer, pull data from wire
|
||||
this->m_innerBufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize);
|
||||
this->m_bodyStartInBuffer = 1; // jump first char (could be \r or \n)
|
||||
}
|
||||
}
|
||||
// get the size of next chunk
|
||||
ParseChunkSize();
|
||||
|
||||
if (this->m_chunkSize == 0)
|
||||
{
|
||||
// end of transfer
|
||||
this->m_rawResponseEOF = true;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
auto totalRead = int64_t();
|
||||
auto ReadRequestLength
|
||||
= this->m_isChunkedResponseType ? std::min(this->m_chunkSize, count) : count;
|
||||
|
||||
// Take data from inner buffer if any
|
||||
if (this->m_bodyStartInBuffer >= 0)
|
||||
@ -419,9 +466,13 @@ int64_t CurlSession::Read(Azure::Core::Context& context, uint8_t* buffer, int64_
|
||||
this->m_readBuffer + this->m_bodyStartInBuffer,
|
||||
this->m_innerBufferSize - this->m_bodyStartInBuffer);
|
||||
|
||||
totalRead = innerBufferMemoryStream.Read(context, buffer, count);
|
||||
totalRead = innerBufferMemoryStream.Read(context, buffer, ReadRequestLength);
|
||||
this->m_bodyStartInBuffer += totalRead;
|
||||
this->m_sessionTotalRead += totalRead;
|
||||
if (this->m_isChunkedResponseType)
|
||||
{
|
||||
this->m_chunkSize -= totalRead;
|
||||
}
|
||||
|
||||
if (this->m_bodyStartInBuffer == this->m_innerBufferSize)
|
||||
{
|
||||
@ -439,26 +490,14 @@ int64_t CurlSession::Read(Azure::Core::Context& context, uint8_t* buffer, int64_
|
||||
}
|
||||
|
||||
// Read from socket when no more data on internal buffer
|
||||
totalRead = ReadSocketToBuffer(buffer, static_cast<size_t>(count));
|
||||
// For chunk request, read a chunk based on chunk size
|
||||
totalRead = ReadSocketToBuffer(buffer, static_cast<size_t>(ReadRequestLength));
|
||||
this->m_sessionTotalRead += totalRead;
|
||||
|
||||
if (this->m_isChunkedResponseType && totalRead > 0)
|
||||
if (this->m_isChunkedResponseType)
|
||||
{
|
||||
// Check if the end of chunked is part of the body
|
||||
auto endOfBody = std::find(buffer, buffer + totalRead, '\r');
|
||||
if (endOfBody != buffer + totalRead)
|
||||
{
|
||||
// End of stream is when chunk is 0 (0\r\n)
|
||||
if (buffer[0] == '0' && buffer + 1 == endOfBody)
|
||||
{
|
||||
// got already the end. Usually when all body was at innerBuffer and next pull from wire
|
||||
// returns only the end of chunk
|
||||
return 0;
|
||||
}
|
||||
totalRead -= std::distance(endOfBody, buffer + totalRead);
|
||||
this->m_rawResponseEOF = true;
|
||||
}
|
||||
this->m_chunkSize -= totalRead;
|
||||
}
|
||||
|
||||
return totalRead;
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user