Make waiting for socket cancelable (#719)
* Make waiting for socket cancelable
This commit is contained in:
parent
aa18492bb9
commit
724e6ca512
@ -12,6 +12,7 @@
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <new> //For the non-allocating placement new
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <type_traits>
|
||||
|
||||
@ -25,6 +26,11 @@ namespace Azure { namespace Core {
|
||||
virtual ~ValueBase() {}
|
||||
};
|
||||
|
||||
struct RequestCanceledException : public std::runtime_error
|
||||
{
|
||||
explicit RequestCanceledException(std::string const& msg) : std::runtime_error(msg) {}
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Represents a value that is associated with context.
|
||||
* @rmark Exists as a substitute for variant which isn't available until C++17.
|
||||
@ -383,8 +389,7 @@ namespace Azure { namespace Core {
|
||||
{
|
||||
if (CancelWhen() < std::chrono::system_clock::now())
|
||||
{
|
||||
// TODO: Runtime Exc
|
||||
throw;
|
||||
throw RequestCanceledException("Request was canceled by context.");
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@ -6,8 +6,10 @@
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#ifdef BUILD_CURL_HTTP_TRANSPORT_ADAPTER
|
||||
|
||||
#include "azure/core/context.hpp"
|
||||
#include "azure/core/http/http.hpp"
|
||||
#include "azure/core/http/policy.hpp"
|
||||
#include "azure/core/internal/log.hpp"
|
||||
@ -88,7 +90,7 @@ namespace Azure { namespace Core { namespace Http {
|
||||
|
||||
/**
|
||||
* @brief Checks whether this CURL connection is expired.
|
||||
* @return `true` if this connextion is onsidered expired, `false` oterwise.
|
||||
* @return `true` if this connection is considered expired, `false` otherwise.
|
||||
*/
|
||||
bool isExpired()
|
||||
{
|
||||
@ -99,7 +101,11 @@ namespace Azure { namespace Core { namespace Http {
|
||||
};
|
||||
|
||||
/**
|
||||
* CURL HTTP connection pool.
|
||||
* @brief CURL HTTP connection pool makes it possible to re-use one curl connection to perform
|
||||
* more than one request. Use this component when connections are not re-used by default.
|
||||
*
|
||||
* This pool offers static methods and it is allocated statically. There can be only one connection
|
||||
* pool per application.
|
||||
*/
|
||||
struct CurlConnectionPool
|
||||
{
|
||||
@ -175,24 +181,18 @@ namespace Azure { namespace Core { namespace Http {
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Stateful component that controls sending an HTTP Request with libcurl thru the wire
|
||||
* and parsing and building an HTTP RawResponse. This session supports the classic libcurl easy
|
||||
* interface to send and receive bytes from network using callbacks. This session also supports
|
||||
* working with the custom HTTP protocol option from libcurl to manually upload and download
|
||||
* bytes using a network socket. This implementation is used when working with streams so
|
||||
* customers can lazily pull data from network using an stream abstraction.
|
||||
* @brief Stateful component that controls sending an HTTP Request with libcurl over the wire.
|
||||
*
|
||||
* @remark This component does not use the classic libcurl easy interface to send and receive
|
||||
* bytes from the network using callbacks. Instead, `CurlSession` supports working with the custom HTTP
|
||||
* protocol option from libcurl to manually upload and download bytes from the network socket using
|
||||
* curl_easy_send() and curl_easy_recv().
|
||||
*
|
||||
* @remarks This component is expected to be used by an HTTP Transporter to ensure that
|
||||
* transporter to be reusable in multiple pipelines while every call to network is unique.
|
||||
*/
|
||||
class CurlSession : public BodyStream {
|
||||
private:
|
||||
/*
|
||||
* Static Connection pool for the application. Multiple CurlSessions will use the connection
|
||||
* pool for getting a curl handle connection.
|
||||
*
|
||||
*/
|
||||
|
||||
/*
|
||||
* Enum used by ResponseBufferParser to control the parsing internal state while building
|
||||
* the HTTP RawResponse
|
||||
@ -209,8 +209,8 @@ namespace Azure { namespace Core { namespace Http {
|
||||
* @brief stateful component used to read and parse a buffer to construct a valid HTTP
|
||||
* RawResponse.
|
||||
*
|
||||
* It uses an internal string as buffers to accumulate a response token (version, code,
|
||||
* header, etc) until the next delimiter is found. Then it uses this string to keep building
|
||||
* @remark It uses an internal string as a buffer to accumulate a response token (version, code,
|
||||
* header, etc.) until the next delimiter is found. Then it uses this string to keep building
|
||||
* the HTTP RawResponse.
|
||||
*
|
||||
* @remark Only status line and headers are parsed and built. Body is ignored by this
|
||||
@ -283,7 +283,6 @@ namespace Azure { namespace Core { namespace Http {
|
||||
public:
|
||||
/**
|
||||
* @brief Construct a new RawResponse Buffer Parser object.
|
||||
* Set the initial state and parsing completion.
|
||||
*
|
||||
*/
|
||||
ResponseBufferParser()
|
||||
@ -293,8 +292,6 @@ namespace Azure { namespace Core { namespace Http {
|
||||
this->m_delimiterStartInPrevPosition = false;
|
||||
}
|
||||
|
||||
// Parse contents of buffer to construct HttpResponse. Returns the index of the last parsed
|
||||
// position. Return bufferSize when all buffer was used to parse
|
||||
/**
|
||||
* @brief Parses the content of a buffer to construct a valid HTTP RawResponse. This method
|
||||
* is expected to be called over and over until it returns 0, indicating there is nothing
|
||||
@ -315,7 +312,7 @@ namespace Azure { namespace Core { namespace Http {
|
||||
/**
|
||||
* @brief Indicates when the parser has completed parsing and building the HTTP RawResponse.
|
||||
*
|
||||
* @return true if parsing is completed. Otherwise false.
|
||||
* @return `true` if parsing is completed. Otherwise `false`.
|
||||
*/
|
||||
bool IsParseCompleted() const { return this->m_parseCompleted; }
|
||||
|
||||
@ -476,40 +473,46 @@ namespace Azure { namespace Core { namespace Http {
|
||||
*
|
||||
* @remarks Hardcoded timeout is used in case a socket stop responding.
|
||||
*
|
||||
* @param context #Context so that operation can be canceled.
|
||||
* @param buffer ptr to the data to be sent to wire.
|
||||
* @param bufferSize size of the buffer to send.
|
||||
* @return CURL_OK when response is sent successfully.
|
||||
*/
|
||||
CURLcode SendBuffer(uint8_t const* buffer, size_t bufferSize);
|
||||
CURLcode SendBuffer(Context const& context, uint8_t const* buffer, size_t bufferSize);
|
||||
|
||||
/**
|
||||
* @brief This function is used after sending an HTTP request to the server to read the HTTP
|
||||
* RawResponse from wire until the end of headers only.
|
||||
*
|
||||
* @param reUseInternalBUffer Indicates whether the internal buffer should be reused.
|
||||
* @param context #Context so that operation can be canceled.
|
||||
* @param reuseInternalBuffer Indicates whether the internal buffer should be reused.
|
||||
*
|
||||
* @return CURL_OK when an HTTP response is created.
|
||||
*/
|
||||
void ReadStatusLineAndHeadersFromRawResponse(bool reUseInternalBUffer = false);
|
||||
void ReadStatusLineAndHeadersFromRawResponse(
|
||||
Context const& context,
|
||||
bool reuseInternalBuffer = false);
|
||||
|
||||
/**
|
||||
* @brief Reads from inner buffer or from Wire until chunkSize is parsed and converted to
|
||||
* unsigned long long
|
||||
*
|
||||
* @param context #Context so that operation can be canceled.
|
||||
*/
|
||||
void ParseChunkSize();
|
||||
void ParseChunkSize(Context const& context);
|
||||
|
||||
/**
|
||||
* @brief This function is used when working with streams to pull more data from the wire.
|
||||
* Function will try to keep pulling data from socket until the buffer is all written or until
|
||||
* there is no more data to get from the socket.
|
||||
*
|
||||
* @param context #Context so that operation can be canceled.
|
||||
* @param buffer ptr to buffer where to copy bytes from socket.
|
||||
* @param bufferSize size of the buffer and the requested bytes to be pulled from wire.
|
||||
* @return return the numbers of bytes pulled from socket. It can be less than what it was
|
||||
* requested.
|
||||
*/
|
||||
int64_t ReadFromSocket(uint8_t* buffer, int64_t bufferSize);
|
||||
int64_t ReadFromSocket(Context const& context, uint8_t* buffer, int64_t bufferSize);
|
||||
|
||||
/**
|
||||
* @brief Last HTTP status code read.
|
||||
@ -572,7 +575,7 @@ namespace Azure { namespace Core { namespace Http {
|
||||
* @brief Function will use the HTTP request received in constructor to perform a network call
|
||||
* based on the HTTP request configuration.
|
||||
*
|
||||
* @param context TBD
|
||||
* @param context #Context so that operation can be canceled.
|
||||
* @return CURLE_OK when the network call is completed successfully.
|
||||
*/
|
||||
CURLcode Perform(Context const& context);
|
||||
@ -585,8 +588,21 @@ namespace Azure { namespace Core { namespace Http {
|
||||
*/
|
||||
std::unique_ptr<Azure::Core::Http::RawResponse> GetResponse();
|
||||
|
||||
/**
|
||||
* @brief Implement #BodyStream length.
|
||||
*
|
||||
* @return The size of the payload.
|
||||
*/
|
||||
int64_t Length() const override { return this->m_contentLength; }
|
||||
|
||||
/**
|
||||
* @brief Implement #BodyStream read. Calling this function pulls data from the wire.
|
||||
*
|
||||
* @param context #Context so that operation can be canceled.
|
||||
* @param buffer Buffer where data from wire is written to.
|
||||
* @param count The number of bytes to read from the network.
|
||||
* @return The actual number of bytes read from the network.
|
||||
*/
|
||||
int64_t Read(Azure::Core::Context const& context, uint8_t* buffer, int64_t count) override;
|
||||
};
|
||||
|
||||
@ -599,7 +615,7 @@ namespace Azure { namespace Core { namespace Http {
|
||||
/**
|
||||
* @brief Implements interface to send an HTTP Request and produce an HTTP RawResponse
|
||||
*
|
||||
* @param context TBD
|
||||
* @param context #Context so that operation can be canceled.
|
||||
* @param request an HTTP Request to be send.
|
||||
* @return unique ptr to an HTTP RawResponse.
|
||||
*/
|
||||
|
||||
@ -11,6 +11,8 @@
|
||||
#ifdef WINDOWS
|
||||
#include <winsock2.h> // for WSAPoll();
|
||||
#endif
|
||||
|
||||
#include <algorithm>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
@ -42,10 +44,13 @@ enum class PollSocketDirection
|
||||
* @param socketFileDescriptor socket descriptor.
|
||||
* @param direction poll events for read or write socket.
|
||||
* @param timeout return if polling for more than \p timeout
|
||||
* @param context The context while polling that can be use to cancel waiting for socket.
|
||||
*
|
||||
* @return int with negative 1 upon any error, 0 on timeout or greater than zero if events were
|
||||
* detected (socket ready to be written/read)
|
||||
*/
|
||||
int pollSocketUntilEventOrTimeout(
|
||||
Azure::Core::Context const& context,
|
||||
curl_socket_t socketFileDescriptor,
|
||||
PollSocketDirection direction,
|
||||
long timeout)
|
||||
@ -54,8 +59,7 @@ int pollSocketUntilEventOrTimeout(
|
||||
#ifndef POSIX
|
||||
#ifndef WINDOWS
|
||||
// platform does not support Poll().
|
||||
throw Azure::Core::Http::TransportException(
|
||||
"Error while sending request. Platform does not support Poll()");
|
||||
throw TransportException("Error while sending request. Platform does not support Poll()");
|
||||
#endif
|
||||
#endif
|
||||
|
||||
@ -72,16 +76,32 @@ int pollSocketUntilEventOrTimeout(
|
||||
poller.events = POLLOUT;
|
||||
}
|
||||
|
||||
// Call poll with the poller struct. Poll can handle multiple file descriptors by making an pollfd
|
||||
// array and passing the size of it as the second arg. Since we are only passing one fd, we use 1
|
||||
// as arg.
|
||||
// Call poll with the poller struct. Poll can handle multiple file descriptors by making an
|
||||
// pollfd array and passing the size of it as the second arg. Since we are only passing one fd,
|
||||
// we use 1 as arg.
|
||||
|
||||
// Cancelation is possible by calling poll() with small time intervals instead of using the
|
||||
// requested timeout. Default interval for calling poll() is 1 sec whenever arg timeout is
|
||||
// greater than 1 sec. Otherwise the interval is set to timeout
|
||||
long interval = 1000; // 1 second
|
||||
if (timeout < interval)
|
||||
{
|
||||
interval = timeout;
|
||||
}
|
||||
int result = 0;
|
||||
for (long counter = 0; counter < timeout && result == 0; counter = counter + interval)
|
||||
{
|
||||
// check cancelation
|
||||
context.ThrowIfCanceled();
|
||||
#ifdef POSIX
|
||||
return poll(&poller, 1, timeout);
|
||||
result = poll(&poller, 1, interval);
|
||||
#endif
|
||||
#ifdef WINDOWS
|
||||
// Windows Vista and greater.
|
||||
return WSAPoll(&poller, 1, timeout);
|
||||
result = WSAPoll(&poller, 1, interval);
|
||||
#endif
|
||||
}
|
||||
// result can be either 0 (timeout) or > 1 (socket ready)
|
||||
return result;
|
||||
}
|
||||
|
||||
#ifdef WINDOWS
|
||||
@ -122,7 +142,15 @@ inline void LogThis(std::string const& msg)
|
||||
}
|
||||
} // namespace
|
||||
|
||||
using namespace Azure::Core::Http;
|
||||
using Azure::Core::Http::CurlConnection;
|
||||
using Azure::Core::Http::CurlConnectionPool;
|
||||
using Azure::Core::Http::CurlSession;
|
||||
using Azure::Core::Http::CurlTransport;
|
||||
using Azure::Core::Http::HttpStatusCode;
|
||||
using Azure::Core::Http::LogClassification;
|
||||
using Azure::Core::Http::RawResponse;
|
||||
using Azure::Core::Http::Request;
|
||||
using Azure::Core::Http::TransportException;
|
||||
|
||||
std::unique_ptr<RawResponse> CurlTransport::Send(Context const& context, Request& request)
|
||||
{
|
||||
@ -154,12 +182,11 @@ std::unique_ptr<RawResponse> CurlTransport::Send(Context const& context, Request
|
||||
{
|
||||
case CURLE_COULDNT_RESOLVE_HOST:
|
||||
{
|
||||
throw Azure::Core::Http::CouldNotResolveHostException(
|
||||
"Could not resolve host " + request.GetUrl().GetHost());
|
||||
throw CouldNotResolveHostException("Could not resolve host " + request.GetUrl().GetHost());
|
||||
}
|
||||
default:
|
||||
{
|
||||
throw Azure::Core::Http::TransportException(
|
||||
throw TransportException(
|
||||
"Error while sending request. " + std::string(curl_easy_strerror(performing)));
|
||||
}
|
||||
}
|
||||
@ -175,7 +202,6 @@ std::unique_ptr<RawResponse> CurlTransport::Send(Context const& context, Request
|
||||
|
||||
CURLcode CurlSession::Perform(Context const& context)
|
||||
{
|
||||
AZURE_UNREFERENCED_PARAMETER(context);
|
||||
|
||||
// Get the socket that libcurl is using from handle. Will use this to wait while reading/writing
|
||||
// into wire
|
||||
@ -222,7 +248,7 @@ CURLcode CurlSession::Perform(Context const& context)
|
||||
}
|
||||
|
||||
this->m_logger("Parse server response");
|
||||
ReadStatusLineAndHeadersFromRawResponse();
|
||||
ReadStatusLineAndHeadersFromRawResponse(context);
|
||||
|
||||
// Upload body for PUT
|
||||
if (this->m_request.GetMethod() != HttpMethod::Put)
|
||||
@ -245,7 +271,7 @@ CURLcode CurlSession::Perform(Context const& context)
|
||||
{
|
||||
// If internal buffer has more data after the 100-continue means Server return an error.
|
||||
// We don't need to upload body, just parse the response from Server and return
|
||||
ReadStatusLineAndHeadersFromRawResponse(true);
|
||||
ReadStatusLineAndHeadersFromRawResponse(context, true);
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -257,7 +283,7 @@ CURLcode CurlSession::Perform(Context const& context)
|
||||
}
|
||||
|
||||
this->m_logger("Upload completed. Parse server response");
|
||||
ReadStatusLineAndHeadersFromRawResponse();
|
||||
ReadStatusLineAndHeadersFromRawResponse(context);
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -305,10 +331,21 @@ bool CurlSession::isUploadRequest()
|
||||
}
|
||||
|
||||
// Send buffer thru the wire
|
||||
CURLcode CurlSession::SendBuffer(uint8_t const* buffer, size_t bufferSize)
|
||||
CURLcode CurlSession::SendBuffer(Context const& context, uint8_t const* buffer, size_t bufferSize)
|
||||
{
|
||||
for (size_t sentBytesTotal = 0; sentBytesTotal < bufferSize;)
|
||||
{
|
||||
// check cancelation for each chunk of data.
|
||||
// Next loop is expected to be called at most 2 times:
|
||||
// The first time we call `curl_easy_send()`, if it return CURLE_AGAIN it would call
|
||||
// `pollSocketUntilEventOrTimeout` to wait for socket to be ready to write.
|
||||
// `pollSocketUntilEventOrTimeout` will then handle cancelation token.
|
||||
// If socket is not ready before the timeout, Exception is thrown.
|
||||
// When socket is ready, it calls curl_easy_send() again (second loop iteration). It is not
|
||||
// expected to return CURLE_AGAIN (since socket is ready), so, a chuck of data will be uploaded
|
||||
// and result will be CURLE_OK which breaks the loop. Also, getting other than CURLE_OK or
|
||||
// CURLE_AGAIN throws.
|
||||
context.ThrowIfCanceled();
|
||||
for (CURLcode sendResult = CURLE_AGAIN; sendResult == CURLE_AGAIN;)
|
||||
{
|
||||
size_t sentBytesPerRequest = 0;
|
||||
@ -328,18 +365,17 @@ CURLcode CurlSession::SendBuffer(uint8_t const* buffer, size_t bufferSize)
|
||||
}
|
||||
case CURLE_AGAIN:
|
||||
{
|
||||
// start polling operation
|
||||
// start polling operation with 1 min timeout
|
||||
auto pollUntilSocketIsReady = pollSocketUntilEventOrTimeout(
|
||||
this->m_curlSocket, PollSocketDirection::Write, 60000L);
|
||||
context, this->m_curlSocket, PollSocketDirection::Write, 60000L);
|
||||
|
||||
if (pollUntilSocketIsReady == 0)
|
||||
{
|
||||
throw Azure::Core::Http::TransportException("Timeout waiting for socket to upload.");
|
||||
throw TransportException("Timeout waiting for socket to upload.");
|
||||
}
|
||||
else if (pollUntilSocketIsReady < 0)
|
||||
{ // negative value, error while polling
|
||||
throw Azure::Core::Http::TransportException(
|
||||
"Error while polling for socket ready write");
|
||||
throw TransportException("Error while polling for socket ready write");
|
||||
}
|
||||
|
||||
// Ready to continue download.
|
||||
@ -381,7 +417,7 @@ CURLcode CurlSession::UploadBody(Context const& context)
|
||||
{
|
||||
break;
|
||||
}
|
||||
sendResult = SendBuffer(unique_buffer.get(), static_cast<size_t>(rawRequestLen));
|
||||
sendResult = SendBuffer(context, unique_buffer.get(), static_cast<size_t>(rawRequestLen));
|
||||
if (sendResult != CURLE_OK)
|
||||
{
|
||||
return sendResult;
|
||||
@ -398,7 +434,9 @@ CURLcode CurlSession::SendRawHttp(Context const& context)
|
||||
int64_t rawRequestLen = rawRequest.size();
|
||||
|
||||
CURLcode sendResult = SendBuffer(
|
||||
reinterpret_cast<uint8_t const*>(rawRequest.data()), static_cast<size_t>(rawRequestLen));
|
||||
context,
|
||||
reinterpret_cast<uint8_t const*>(rawRequest.data()),
|
||||
static_cast<size_t>(rawRequestLen));
|
||||
|
||||
if (sendResult != CURLE_OK || this->m_request.GetMethod() == HttpMethod::Put)
|
||||
{
|
||||
@ -408,7 +446,7 @@ CURLcode CurlSession::SendRawHttp(Context const& context)
|
||||
return this->UploadBody(context);
|
||||
}
|
||||
|
||||
void CurlSession::ParseChunkSize()
|
||||
void CurlSession::ParseChunkSize(Context const& context)
|
||||
{
|
||||
// Use this string to construct the chunk size. This is because we could have an internal
|
||||
// buffer like [headers...\r\n123], where 123 is chunk size but we still need to pull more
|
||||
@ -436,7 +474,7 @@ void CurlSession::ParseChunkSize()
|
||||
if (index + 1 == this->m_innerBufferSize)
|
||||
{ // on last index. Whatever we read is the BodyStart here
|
||||
this->m_innerBufferSize
|
||||
= ReadFromSocket(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
|
||||
= ReadFromSocket(context, this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
|
||||
this->m_bodyStartInBuffer = 0;
|
||||
}
|
||||
else
|
||||
@ -451,7 +489,7 @@ void CurlSession::ParseChunkSize()
|
||||
if (keepPolling)
|
||||
{ // Read all internal buffer and \n was not found, pull from wire
|
||||
this->m_innerBufferSize
|
||||
= ReadFromSocket(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
|
||||
= ReadFromSocket(context, this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
|
||||
this->m_bodyStartInBuffer = 0;
|
||||
}
|
||||
}
|
||||
@ -459,7 +497,9 @@ void CurlSession::ParseChunkSize()
|
||||
}
|
||||
|
||||
// Read status line plus headers to create a response with no body
|
||||
void CurlSession::ReadStatusLineAndHeadersFromRawResponse(bool reUseInternalBUffer)
|
||||
void CurlSession::ReadStatusLineAndHeadersFromRawResponse(
|
||||
Context const& context,
|
||||
bool reuseInternalBuffer)
|
||||
{
|
||||
auto parser = ResponseBufferParser();
|
||||
auto bufferSize = int64_t();
|
||||
@ -468,7 +508,7 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse(bool reUseInternalBUff
|
||||
while (!parser.IsParseCompleted())
|
||||
{
|
||||
int64_t bytesParsed = 0;
|
||||
if (reUseInternalBUffer)
|
||||
if (reuseInternalBuffer)
|
||||
{
|
||||
// parse from internal buffer. This means previous read from server got more than one
|
||||
// response. This happens when Server returns a 100-continue plus an error code
|
||||
@ -476,7 +516,7 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse(bool reUseInternalBUff
|
||||
bytesParsed = parser.Parse(
|
||||
this->m_readBuffer + this->m_bodyStartInBuffer, static_cast<size_t>(bufferSize));
|
||||
// if parsing from internal buffer is not enough, do next read from wire
|
||||
reUseInternalBUffer = false;
|
||||
reuseInternalBuffer = false;
|
||||
// reset body start
|
||||
this->m_bodyStartInBuffer = -1;
|
||||
}
|
||||
@ -484,7 +524,7 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse(bool reUseInternalBUff
|
||||
{
|
||||
// Try to fill internal buffer from socket.
|
||||
// If response is smaller than buffer, we will get back the size of the response
|
||||
bufferSize = ReadFromSocket(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
|
||||
bufferSize = ReadFromSocket(context, this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
|
||||
// returns the number of bytes parsed up to the body Start
|
||||
bytesParsed = parser.Parse(this->m_readBuffer, static_cast<size_t>(bufferSize));
|
||||
}
|
||||
@ -505,7 +545,7 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse(bool reUseInternalBUff
|
||||
// For NoContent status code, also need to set contentLength to 0.
|
||||
// https://github.com/Azure/azure-sdk-for-cpp/issues/406
|
||||
if (this->m_request.GetMethod() == HttpMethod::Head
|
||||
|| this->m_lastStatusCode == Azure::Core::Http::HttpStatusCode::NoContent)
|
||||
|| this->m_lastStatusCode == HttpStatusCode::NoContent)
|
||||
{
|
||||
this->m_contentLength = 0;
|
||||
this->m_bodyStartInBuffer = -1;
|
||||
@ -540,11 +580,11 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse(bool reUseInternalBUff
|
||||
if (this->m_bodyStartInBuffer == -1)
|
||||
{ // if nothing on inner buffer, pull from wire
|
||||
this->m_innerBufferSize
|
||||
= ReadFromSocket(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
|
||||
= ReadFromSocket(context, this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
|
||||
this->m_bodyStartInBuffer = 0;
|
||||
}
|
||||
|
||||
ParseChunkSize();
|
||||
ParseChunkSize(context);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -558,7 +598,7 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse(bool reUseInternalBUff
|
||||
}
|
||||
|
||||
// Read from curl session
|
||||
int64_t CurlSession::Read(Azure::Core::Context const& context, uint8_t* buffer, int64_t count)
|
||||
int64_t CurlSession::Read(Context const& context, uint8_t* buffer, int64_t count)
|
||||
{
|
||||
context.ThrowIfCanceled();
|
||||
|
||||
@ -580,14 +620,14 @@ int64_t CurlSession::Read(Azure::Core::Context const& context, uint8_t* buffer,
|
||||
else
|
||||
{ // end of buffer, pull data from wire
|
||||
this->m_innerBufferSize
|
||||
= ReadFromSocket(this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
|
||||
= ReadFromSocket(context, this->m_readBuffer, Details::c_DefaultLibcurlReaderSize);
|
||||
this->m_bodyStartInBuffer = 1; // jump first char (could be \r or \n)
|
||||
}
|
||||
}
|
||||
// Reset session read counter for next chunk
|
||||
this->m_sessionTotalRead = 0;
|
||||
// get the size of next chunk
|
||||
ParseChunkSize();
|
||||
ParseChunkSize(context);
|
||||
|
||||
if (this->IsEOF())
|
||||
{ // after parsing next chunk, check if it is zero
|
||||
@ -637,7 +677,7 @@ int64_t CurlSession::Read(Azure::Core::Context const& context, uint8_t* buffer,
|
||||
|
||||
// Read from socket when no more data on internal buffer
|
||||
// For chunk request, read a chunk based on chunk size
|
||||
totalRead = ReadFromSocket(buffer, static_cast<size_t>(readRequestLength));
|
||||
totalRead = ReadFromSocket(context, buffer, static_cast<size_t>(readRequestLength));
|
||||
this->m_sessionTotalRead += totalRead;
|
||||
|
||||
// Reading 0 bytes means closed connection.
|
||||
@ -649,7 +689,7 @@ int64_t CurlSession::Read(Azure::Core::Context const& context, uint8_t* buffer,
|
||||
auto expectedToRead = this->m_isChunkedResponseType ? this->m_chunkSize : this->m_contentLength;
|
||||
if (this->m_sessionTotalRead < expectedToRead)
|
||||
{
|
||||
throw Azure::Core::Http::TransportException(
|
||||
throw TransportException(
|
||||
"Connection closed before getting full response or response is less than expected. "
|
||||
"Expected response length = "
|
||||
+ std::to_string(expectedToRead)
|
||||
@ -661,9 +701,18 @@ int64_t CurlSession::Read(Azure::Core::Context const& context, uint8_t* buffer,
|
||||
}
|
||||
|
||||
// Read from socket and return the number of bytes taken from socket
|
||||
int64_t CurlSession::ReadFromSocket(uint8_t* buffer, int64_t bufferSize)
|
||||
int64_t CurlSession::ReadFromSocket(Context const& context, uint8_t* buffer, int64_t bufferSize)
|
||||
{
|
||||
// loop until read result is not CURLE_AGAIN
|
||||
// Next loop is expected to be called at most 2 times:
|
||||
// The first time it calls `curl_easy_recv()`, if it returns CURLE_AGAIN it would call
|
||||
// `pollSocketUntilEventOrTimeout` and wait for socket to be ready to read.
|
||||
// `pollSocketUntilEventOrTimeout` will then handle cancelation token.
|
||||
// If socket is not ready before the timeout, Exception is thrown.
|
||||
// When socket is ready, it calls curl_easy_recv() again (second loop iteration). It is not
|
||||
// expected to return CURLE_AGAIN (since socket is ready), so, a chuck of data will be downloaded
|
||||
// and result will be CURLE_OK which breaks the loop. Also, getting other than CURLE_OK or
|
||||
// CURLE_AGAIN throws.
|
||||
size_t readBytes = 0;
|
||||
for (CURLcode readResult = CURLE_AGAIN; readResult == CURLE_AGAIN;)
|
||||
{
|
||||
@ -675,16 +724,16 @@ int64_t CurlSession::ReadFromSocket(uint8_t* buffer, int64_t bufferSize)
|
||||
case CURLE_AGAIN:
|
||||
{
|
||||
// start polling operation
|
||||
auto pollUntilSocketIsReady
|
||||
= pollSocketUntilEventOrTimeout(this->m_curlSocket, PollSocketDirection::Read, 60000L);
|
||||
auto pollUntilSocketIsReady = pollSocketUntilEventOrTimeout(
|
||||
context, this->m_curlSocket, PollSocketDirection::Read, 60000L);
|
||||
|
||||
if (pollUntilSocketIsReady == 0)
|
||||
{
|
||||
throw Azure::Core::Http::TransportException("Timeout waitting for socket to read.");
|
||||
throw TransportException("Timeout waiting for socket to read.");
|
||||
}
|
||||
else if (pollUntilSocketIsReady < 0)
|
||||
{ // negative value, error while polling
|
||||
throw Azure::Core::Http::TransportException("Error while polling for socket ready read");
|
||||
throw TransportException("Error while polling for socket ready read");
|
||||
}
|
||||
|
||||
// Ready to continue download.
|
||||
@ -697,7 +746,7 @@ int64_t CurlSession::ReadFromSocket(uint8_t* buffer, int64_t bufferSize)
|
||||
default:
|
||||
{
|
||||
// Error reading from socket
|
||||
throw Azure::Core::Http::TransportException(
|
||||
throw TransportException(
|
||||
"Error while reading from network socket. CURLE code: " + std::to_string(readResult)
|
||||
+ ". " + std::string(curl_easy_strerror(readResult)));
|
||||
}
|
||||
@ -709,10 +758,7 @@ int64_t CurlSession::ReadFromSocket(uint8_t* buffer, int64_t bufferSize)
|
||||
return readBytes;
|
||||
}
|
||||
|
||||
std::unique_ptr<Azure::Core::Http::RawResponse> CurlSession::GetResponse()
|
||||
{
|
||||
return std::move(this->m_response);
|
||||
}
|
||||
std::unique_ptr<RawResponse> CurlSession::GetResponse() { return std::move(this->m_response); }
|
||||
|
||||
int64_t CurlSession::ResponseBufferParser::Parse(
|
||||
uint8_t const* const buffer,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user