Http/making upload chunk size overridle with Request option (#342)

* adding http request option to customize the upload chunk size for bodyStream
This commit is contained in:
Victor Vazquez 2020-07-29 00:16:36 -07:00 committed by GitHub
parent b0facd3dd5
commit 9563d9dd3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 274 additions and 87 deletions

View File

@ -14,9 +14,12 @@
namespace Azure { namespace Core { namespace Http {
// libcurl CURL_MAX_WRITE_SIZE is 16k.
constexpr auto UploadStreamPageSize = 1024 * 64;
constexpr auto LibcurlReaderSize = 1024;
namespace Details {
// libcurl CURL_MAX_WRITE_SIZE is 64k. Using same value for default uploading chunk size.
// This can be customizable in the HttpRequest
constexpr int64_t c_UploadDefaultChunkSize = 1024 * 64;
constexpr auto c_LibcurlReaderSize = 1024;
} // namespace Details
/**
* @brief Statefull component that controls sending an HTTP Request with libcurl thru the wire and
@ -45,15 +48,16 @@ namespace Azure { namespace Core { namespace Http {
};
/**
* @brief stateful component used to read and parse a buffer to construct a valid HTTP RawResponse.
* @brief stateful component used to read and parse a buffer to construct a valid HTTP
* RawResponse.
*
* It uses an internal string as buffers to accumulate a response token (version, code, header,
* etc) until the next delimiter is found. Then it uses this string to keep building the HTTP
* RawResponse.
*
* @remark Only status line and headers are parsed and built. Body is ignored by this component.
* A libcurl session will use this component to build and return the HTTP RawResponse with a body
* stream to the pipeline.
* A libcurl session will use this component to build and return the HTTP RawResponse with a
* body stream to the pipeline.
*/
class ResponseBufferParser {
private:
@ -63,9 +67,9 @@ namespace Azure { namespace Core { namespace Http {
*/
ResponseParserState state;
/**
* @brief Unique prt to a response. Parser will create an Initial-valid HTTP RawResponse and then
* it will append headers to it. This response is moved to a different owner once parsing is
* completed.
* @brief Unique ptr to a response. Parser will create an Initial-valid HTTP RawResponse and
* then it will append headers to it. This response is moved to a different owner once parsing
* is completed.
*
*/
std::unique_ptr<RawResponse> m_response;
@ -141,8 +145,9 @@ namespace Azure { namespace Core { namespace Http {
* @return Returns the index of the last parsed position. Returning a 0 means nothing was
* parsed and it is likely that the HTTP RawResponse is completed. Returning the same value as
* the buffer size means all buffer was parsed and the HTTP might be completed or not.
* Returning a value smaller than the buffer size will likely indicate that the HTTP RawResponse
* is completed and that the rest of the buffer contains part of the response body.
* Returning a value smaller than the buffer size will likely indicate that the HTTP
* RawResponse is completed and that the rest of the buffer contains part of the response
* body.
*/
int64_t Parse(uint8_t const* const buffer, int64_t const bufferSize);
@ -182,8 +187,8 @@ namespace Azure { namespace Core { namespace Http {
curl_socket_t m_curlSocket;
/**
* @brief unique ptr for the HTTP RawResponse. The session is responsable for creating the response
* once that an HTTP status line is received.
* @brief unique ptr for the HTTP RawResponse. The session is responsable for creating the
* response once that an HTTP status line is received.
*
*/
std::unique_ptr<RawResponse> m_response;
@ -252,7 +257,7 @@ namespace Azure { namespace Core { namespace Http {
* provide their own buffer to copy from socket when reading the HTTP body using streams.
*
*/
uint8_t m_readBuffer[LibcurlReaderSize]; // to work with libcurl custom read.
uint8_t m_readBuffer[Details::c_LibcurlReaderSize]; // to work with libcurl custom read.
/**
* @brief convenient function that indicates when the HTTP Request will need to upload a payload
@ -354,7 +359,7 @@ namespace Azure { namespace Core { namespace Http {
* Function will try to keep pulling data from socket until the buffer is all written or until
* there is no more data to get from the socket.
*
* @param buffer prt to buffer where to copy bytes from socket.
* @param buffer ptr to buffer where to copy bytes from socket.
* @param bufferSize size of the buffer and the requested bytes to be pulled from wire.
* @return return the numbers of bytes pulled from socket. It can be less than what it was
* requested.
@ -371,7 +376,7 @@ namespace Azure { namespace Core { namespace Http {
{
this->m_pCurl = curl_easy_init();
this->m_bodyStartInBuffer = -1;
this->m_innerBufferSize = LibcurlReaderSize;
this->m_innerBufferSize = Details::c_LibcurlReaderSize;
this->m_rawResponseEOF = false;
this->m_isChunkedResponseType = false;
this->m_uploadedBytes = 0;
@ -391,8 +396,8 @@ namespace Azure { namespace Core { namespace Http {
/**
* @brief Moved the ownership of the HTTP RawResponse out of the session.
*
* @return the unique ptr to the HTTP RawResponse or null if the HTTP RawResponse is not yet created
* or was moved before.
* @return the unique ptr to the HTTP RawResponse or null if the HTTP RawResponse is not yet
* created or was moved before.
*/
std::unique_ptr<Azure::Core::Http::RawResponse> GetResponse();

View File

@ -234,6 +234,11 @@ namespace Azure { namespace Core { namespace Http {
std::string GetQueryString() const;
// This value can be used to override the default value that an http transport adapter uses to
// read and upload chunks of data from the payload body stream. If it is not set, the transport
// adapter will decide chunk size.
int64_t m_uploadChunkSize = 0;
public:
explicit Request(
HttpMethod httpMethod,
@ -267,6 +272,7 @@ namespace Azure { namespace Core { namespace Http {
void AddQueryParameter(std::string const& name, std::string const& value);
void AddHeader(std::string const& name, std::string const& value);
void StartRetry(); // only called by retry policy
void SetUploadChunkSize(int64_t size) { this->m_uploadChunkSize = size; }
// Methods used by transport layer (and logger) to send request
HttpMethod GetMethod() const;
@ -275,6 +281,7 @@ namespace Azure { namespace Core { namespace Http {
std::map<std::string, std::string> GetHeaders() const;
BodyStream* GetBodyStream() { return this->m_bodyStream; }
std::string GetHTTPMessagePreBody() const;
int64_t GetUploadChunkSize() { return this->m_uploadChunkSize; }
bool IsDownloadViaStream() { return m_isDownloadViaStream; }
};

View File

@ -59,6 +59,12 @@ CURLcode CurlSession::Perform(Context& context)
{
this->m_request.AddHeader("Host", this->m_request.GetHost());
}
auto isContentLengthHeaderInRequest = headers.find("content-length");
if (isContentLengthHeaderInRequest == headers.end())
{
this->m_request.AddHeader(
"content-length", std::to_string(this->m_request.GetBodyStream()->Length()));
}
}
result = SetConnectOnly();
@ -118,7 +124,7 @@ CURLcode CurlSession::Perform(Context& context)
result = this->UploadBody(context);
if (result != CURLE_OK)
{
return result; // will throw trnasport exception before trying to read
return result; // will throw transport exception before trying to read
}
ReadStatusLineAndHeadersFromRawResponse();
return result;
@ -199,7 +205,7 @@ bool CurlSession::isUploadRequest()
CURLcode CurlSession::SetUrl()
{
return curl_easy_setopt(this->m_pCurl, CURLOPT_URL, this->m_request.GetEncodedUrl().c_str());
return curl_easy_setopt(this->m_pCurl, CURLOPT_URL, this->m_request.GetEncodedUrl().data());
}
CURLcode CurlSession::SetConnectOnly()
@ -247,15 +253,21 @@ CURLcode CurlSession::UploadBody(Context& context)
{
// Send body UploadStreamPageSize at a time (libcurl default)
// NOTE: if stream is on top a contiguous memory, we can avoid allocating this copying buffer
auto unique_buffer = std::make_unique<uint8_t[]>(UploadStreamPageSize);
auto streamBody = this->m_request.GetBodyStream();
CURLcode sendResult = CURLE_OK;
// reusing rawRequestLen variable to read
this->m_uploadedBytes = 0;
int64_t uploadChunkSize = this->m_request.GetUploadChunkSize();
if (uploadChunkSize <= 0)
{
// use default size
uploadChunkSize = Details::c_UploadDefaultChunkSize;
}
auto unique_buffer = std::make_unique<uint8_t[]>(static_cast<size_t>(uploadChunkSize));
while (true)
{
auto rawRequestLen = streamBody->Read(context, unique_buffer.get(), UploadStreamPageSize);
auto rawRequestLen = streamBody->Read(context, unique_buffer.get(), uploadChunkSize);
if (rawRequestLen == 0)
{
break;
@ -315,7 +327,8 @@ void CurlSession::ParseChunkSize()
if (index + 1 == this->m_innerBufferSize)
{ // on last index. Whatever we read is the BodyStart here
this->m_innerBufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize);
this->m_innerBufferSize
= ReadSocketToBuffer(this->m_readBuffer, Details::c_LibcurlReaderSize);
this->m_bodyStartInBuffer = 0;
}
else
@ -328,7 +341,8 @@ void CurlSession::ParseChunkSize()
}
if (keepPolling)
{ // Read all internal buffer and \n was not found, pull from wire
this->m_innerBufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize);
this->m_innerBufferSize
= ReadSocketToBuffer(this->m_readBuffer, Details::c_LibcurlReaderSize);
this->m_bodyStartInBuffer = 0;
}
}
@ -346,7 +360,7 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse()
{
// Try to fill internal buffer from socket.
// If response is smaller than buffer, we will get back the size of the response
bufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize);
bufferSize = ReadSocketToBuffer(this->m_readBuffer, Details::c_LibcurlReaderSize);
// returns the number of bytes parsed up to the body Start
auto bytesParsed = parser.Parse(this->m_readBuffer, static_cast<size_t>(bufferSize));
@ -398,7 +412,8 @@ void CurlSession::ReadStatusLineAndHeadersFromRawResponse()
// Need to move body start after chunk size
if (this->m_bodyStartInBuffer == -1)
{ // if nothing on inner buffer, pull from wire
this->m_innerBufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize);
this->m_innerBufferSize
= ReadSocketToBuffer(this->m_readBuffer, Details::c_LibcurlReaderSize);
this->m_bodyStartInBuffer = 0;
}
@ -438,7 +453,8 @@ int64_t CurlSession::Read(Azure::Core::Context& context, uint8_t* buffer, int64_
}
else
{ // end of buffer, pull data from wire
this->m_innerBufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize);
this->m_innerBufferSize
= ReadSocketToBuffer(this->m_readBuffer, Details::c_LibcurlReaderSize);
this->m_bodyStartInBuffer = 1; // jump first char (could be \r or \n)
}
}

View File

@ -5,12 +5,23 @@ cmake_minimum_required (VERSION 3.12)
set(TARGET_NAME "azure-core-test")
# Create test data for FileUpload test (100K) by writing 1K * 100 times
set(RANGE 0)
set(1K "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
file(WRITE ${CMAKE_BINARY_DIR}/fileData "")
while(RANGE LESS 100)
file(APPEND ${CMAKE_BINARY_DIR}/fileData "${1K}")
MATH(EXPR RANGE "${RANGE}+1")
endwhile()
add_compile_definitions(AZURE_TEST_DATA_PATH="${CMAKE_BINARY_DIR}")
project (${TARGET_NAME} LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED True)
add_executable (
${TARGET_NAME}
file_upload.cpp
http.cpp
main.cpp
nullable.cpp

View File

@ -0,0 +1,185 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
#ifdef POSIX
#include <fcntl.h>
#endif // Posix
#ifdef WINDOWS
#define WIN32_LEAN_AND_MEAN
#define NOMINMAX
#include <Windows.h>
#endif // Windows
#include "transport_adapter.hpp"
//#include <iostream>
#include <response.hpp>
#include <string>
namespace Azure { namespace Core { namespace Test {
namespace Datails {
constexpr int64_t c_fileSize = 1024 * 100;
}
void TransportAdapter::checkResponseCode(Azure::Core::Http::HttpStatusCode code)
{
/* if (code != Azure::Core::Http::HttpStatusCode::Ok)
{
std::cout << static_cast<typename std::underlying_type<Http::HttpStatusCode>::type>(code);
return;
} */
EXPECT_TRUE(code == Azure::Core::Http::HttpStatusCode::Ok);
}
void TransportAdapter::CheckBodyFromBuffer(
Azure::Core::Http::RawResponse& response,
int64_t size,
std::string expectedBody)
{
auto body = response.GetBodyStream();
EXPECT_EQ(body, nullptr);
std::vector<uint8_t> bodyVector = response.GetBody();
int64_t bodySize = bodyVector.size();
if (size > 0)
{ // only for known body size
EXPECT_EQ(bodyVector.size(), size);
}
if (expectedBody.size() > 0)
{
auto bodyString = std::string(bodyVector.begin(), bodyVector.end());
EXPECT_STREQ(expectedBody.data(), bodyString.data());
}
}
void TransportAdapter::CheckBodyFromStream(
Azure::Core::Http::RawResponse& response,
int64_t size,
std::string expectedBody)
{
auto body = response.GetBodyStream();
EXPECT_NE(body, nullptr);
std::vector<uint8_t> bodyVector = Azure::Core::Http::BodyStream::ReadToEnd(context, *body);
int64_t bodySize = body->Length();
EXPECT_EQ(bodySize, size);
bodySize = bodyVector.size();
if (size > 0)
{ // only for known body size
EXPECT_EQ(bodyVector.size(), size);
}
if (expectedBody.size() > 0)
{
auto bodyString = std::string(bodyVector.begin(), bodyVector.end());
EXPECT_STREQ(expectedBody.data(), bodyString.data());
}
}
TEST_F(TransportAdapter, customSizePutFromFile)
{
std::string host("http://httpbin.org/put");
std::string testDataPath(AZURE_TEST_DATA_PATH);
#ifdef POSIX
testDataPath.append("/fileData");
int f = open(testDataPath.data(), O_RDONLY);
#endif
#ifdef WINDOWS
testDataPath.append("\\fileData");
HANDLE f = CreateFile(
testDataPath.data(),
GENERIC_READ,
FILE_SHARE_READ,
NULL,
OPEN_EXISTING,
FILE_FLAG_SEQUENTIAL_SCAN,
NULL);
#endif
auto requestBodyStream
= Azure::Core::Http::FileBodyStream(f, 0, Azure::Core::Test::Datails::c_fileSize);
auto request = Azure::Core::Http::Request(
Azure::Core::Http::HttpMethod::Put, host, &requestBodyStream, true);
// Make transport adapter to read all stream content for uploading instead of chunks
request.SetUploadChunkSize(Azure::Core::Test::Datails::c_fileSize);
{
auto response = pipeline.Send(context, request);
checkResponseCode(response->GetStatusCode());
auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length"));
CheckBodyFromStream(*response, expectedResponseBodySize);
}
}
TEST_F(TransportAdapter, customSizePutFromFileDefault)
{
std::string host("http://httpbin.org/put");
std::string testDataPath(AZURE_TEST_DATA_PATH);
#ifdef POSIX
testDataPath.append("/fileData");
int f = open(testDataPath.data(), O_RDONLY);
#endif
#ifdef WINDOWS
testDataPath.append("\\fileData");
HANDLE f = CreateFile(
testDataPath.data(),
GENERIC_READ,
FILE_SHARE_READ,
NULL,
OPEN_EXISTING,
FILE_FLAG_SEQUENTIAL_SCAN,
NULL);
#endif
auto requestBodyStream
= Azure::Core::Http::FileBodyStream(f, 0, Azure::Core::Test::Datails::c_fileSize);
auto request = Azure::Core::Http::Request(
Azure::Core::Http::HttpMethod::Put, host, &requestBodyStream, true);
// Make transport adapter to read default chunk size
{
auto response = pipeline.Send(context, request);
checkResponseCode(response->GetStatusCode());
auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length"));
CheckBodyFromStream(*response, expectedResponseBodySize);
}
}
TEST_F(TransportAdapter, customSizePutFromFileBiggerPage)
{
std::string host("http://httpbin.org/put");
std::string testDataPath(AZURE_TEST_DATA_PATH);
#ifdef POSIX
testDataPath.append("/fileData");
int f = open(testDataPath.data(), O_RDONLY);
#endif
#ifdef WINDOWS
testDataPath.append("\\fileData");
HANDLE f = CreateFile(
testDataPath.data(),
GENERIC_READ,
FILE_SHARE_READ,
NULL,
OPEN_EXISTING,
FILE_FLAG_SEQUENTIAL_SCAN,
NULL);
#endif
auto requestBodyStream
= Azure::Core::Http::FileBodyStream(f, 0, Azure::Core::Test::Datails::c_fileSize);
auto request = Azure::Core::Http::Request(
Azure::Core::Http::HttpMethod::Put, host, &requestBodyStream, true);
// Make transport adapter to read more than file size (5Mb)
request.SetUploadChunkSize(Azure::Core::Test::Datails::c_fileSize * 5);
{
auto response = pipeline.Send(context, request);
checkResponseCode(response->GetStatusCode());
auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length"));
CheckBodyFromStream(*response, expectedResponseBodySize);
}
}
}}} // namespace Azure::Core::Test

View File

@ -3,7 +3,6 @@
#include "transport_adapter.hpp"
#include <context.hpp>
#include <iostream>
#include <response.hpp>
#include <string>
@ -25,63 +24,6 @@ namespace Azure { namespace Core { namespace Test {
Azure::Core::Http::HttpPipeline TransportAdapter::pipeline(policies);
Azure::Core::Context TransportAdapter::context = Azure::Core::GetApplicationContext();
static void checkResponseCode(Azure::Core::Http::HttpStatusCode code)
{
if (code != Azure::Core::Http::HttpStatusCode::Ok)
{
std::cout << static_cast<typename std::underlying_type<Http::HttpStatusCode>::type>(code);
return;
}
EXPECT_TRUE(code == Azure::Core::Http::HttpStatusCode::Ok);
}
void TransportAdapter::CheckBodyFromBuffer(
Azure::Core::Http::RawResponse& response,
int64_t size,
std::string expectedBody)
{
auto body = response.GetBodyStream();
EXPECT_EQ(body, nullptr);
std::vector<uint8_t> bodyVector = response.GetBody();
int64_t bodySize = bodyVector.size();
if (size > 0)
{ // only for known body size
EXPECT_EQ(bodyVector.size(), size);
}
if (expectedBody.size() > 0)
{
auto bodyString = std::string(bodyVector.begin(), bodyVector.end());
EXPECT_STREQ(expectedBody.data(), bodyString.data());
}
}
void TransportAdapter::CheckBodyFromStream(
Azure::Core::Http::RawResponse& response,
int64_t size,
std::string expectedBody)
{
auto body = response.GetBodyStream();
EXPECT_NE(body, nullptr);
std::vector<uint8_t> bodyVector = Azure::Core::Http::BodyStream::ReadToEnd(context, *body);
int64_t bodySize = body->Length();
EXPECT_EQ(bodySize, size);
bodySize = bodyVector.size();
if (size > 0)
{ // only for known body size
EXPECT_EQ(bodyVector.size(), size);
}
if (expectedBody.size() > 0)
{
auto bodyString = std::string(bodyVector.begin(), bodyVector.end());
EXPECT_STREQ(expectedBody.data(), bodyString.data());
}
} // namespace Test
TEST_F(TransportAdapter, get)
{
std::string host("http://httpbin.org/get");
@ -342,4 +284,23 @@ namespace Azure { namespace Core { namespace Test {
EXPECT_STREQ(result.data(), std::string("").data());
}
TEST_F(TransportAdapter, customSizePut)
{
std::string host("http://httpbin.org/put");
// PUT 1MB
auto requestBodyVector = std::vector<uint8_t>(1024 * 1024, 'x');
auto bodyRequest = Azure::Core::Http::MemoryBodyStream(requestBodyVector);
auto request
= Azure::Core::Http::Request(Azure::Core::Http::HttpMethod::Put, host, &bodyRequest);
// Make transport adapter to read all stream content for uploading instead of chunks
request.SetUploadChunkSize(1024 * 1024);
{
auto response = pipeline.Send(context, request);
checkResponseCode(response->GetStatusCode());
auto expectedResponseBodySize = std::stoull(response->GetHeaders().at("content-length"));
CheckBodyFromBuffer(*response, expectedResponseBodySize);
}
}
}}} // namespace Azure::Core::Test

View File

@ -27,6 +27,8 @@ namespace Azure { namespace Core { namespace Test {
Azure::Core::Http::RawResponse& response,
int64_t size,
std::string expectedBody = std::string(""));
static void checkResponseCode(Azure::Core::Http::HttpStatusCode code);
};
}}} // namespace Azure::Core::Test