Memory Stream - File Stream (win/lin) - HTTP Request / Response - Updates (#236)

* make stream use int64_t instead of uint64_t
This commit is contained in:
Victor Vazquez 2020-07-03 22:22:16 -07:00 committed by GitHub
parent c22e4f8386
commit 9fb4119ccb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 1278 additions and 1045 deletions

View File

@ -22,6 +22,9 @@ if(DEFINED ENV{VCPKG_DEFAULT_TRIPLET} AND NOT DEFINED VCPKG_TARGET_TRIPLET)
set(VCPKG_TARGET_TRIPLET "$ENV{VCPKG_DEFAULT_TRIPLET}" CACHE STRING "")
endif()
# Define WINDOWS of POSIX for specific code implementations like FileStream
include(DefinePlatform)
# Project definition
project(az LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 14)

View File

@ -1,28 +1,40 @@
{
"configurations": [
"configurations": [
{
"name": "x64-Debug",
"generator": "Ninja",
"configurationType": "Debug",
"inheritEnvironments": [ "msvc_x64_x64" ],
"buildRoot": "${projectDir}\\out\\build\\${name}",
"installRoot": "${projectDir}\\out\\install\\${name}",
"cmakeCommandArgs": "-DINSTALL_GTEST=OFF -DBUILD_TESTING=ON -DBUILD_CURL_TRANSPORT=ON -DBUILD_STORAGE_SAMPLES=ON",
"buildCommandArgs": "-v",
"ctestCommandArgs": "",
"variables": [
{
"name": "x64-Debug",
"generator": "Ninja",
"configurationType": "Debug",
"inheritEnvironments": [ "msvc_x64_x64" ],
"buildRoot": "${projectDir}\\out\\build\\${name}",
"installRoot": "${projectDir}\\out\\install\\${name}",
"cmakeCommandArgs": "-DINSTALL_GTEST=OFF -DBUILD_TESTING=ON -DBUILD_CURL_TRANSPORT=ON -DBUILD_STORAGE_SAMPLES=ON",
"buildCommandArgs": "-v",
"ctestCommandArgs": "",
"variables": []
},
{
"name": "x86-Debug",
"generator": "Ninja",
"configurationType": "Debug",
"buildRoot": "${projectDir}\\out\\build\\${name}",
"installRoot": "${projectDir}\\out\\install\\${name}",
"cmakeCommandArgs": "",
"buildCommandArgs": "",
"ctestCommandArgs": "",
"inheritEnvironments": [ "msvc_x86" ],
"variables": []
"name": "VCPKG_TARGET_TRIPLET",
"value": "x64-windows-static",
"type": "STRING"
}
]
]
},
{
"name": "x86-Debug",
"generator": "Ninja",
"configurationType": "Debug",
"buildRoot": "${projectDir}\\out\\build\\${name}",
"installRoot": "${projectDir}\\out\\install\\${name}",
"cmakeCommandArgs": "",
"buildCommandArgs": "",
"ctestCommandArgs": "",
"inheritEnvironments": [ "msvc_x86" ],
"variables": [
{
"name": "VCPKG_TARGET_TRIPLET",
"value": "x86-windows-static",
"type": "STRING"
}
]
}
]
}

View File

@ -0,0 +1,18 @@
# Cmake variables:
# UNIX : is TRUE on all UNIX-like OS's, including Apple OS X and CygWin
# WIN32 : is TRUE on Windows. Prior to 2.8.4 this included CygWin
# MINGW : is TRUE when using the MinGW compiler in Windows
# MSYS : is TRUE when using the MSYS developer environment in Windows
# CYGWIN : is TRUE on Windows when using the CygWin version of cmake
# APPLE : is TRUE on Apple systems. Note this does not imply the
# system is Mac OS X, only that APPLE is #defined in C/C++
# header files.
if (WIN32 OR MINGW OR MSYS OR CYGWIN)
add_compile_definitions(WINDOWS)
endif ()
if (UNIX)
add_compile_definitions(POSIX)
endif ()

View File

@ -22,7 +22,7 @@ add_library (
src/http/policy.cpp
src/http/request.cpp
src/http/response.cpp
src/http/stream.cpp
src/http/body_stream.cpp
src/http/url.cpp
src/http/curl/curl.cpp
src/http/winhttp/win_http_transport.cpp

View File

@ -59,39 +59,39 @@ namespace Azure { namespace Core {
ContextValue(ContextValue&& other) noexcept : m_contextValueType(other.m_contextValueType)
{
switch (m_contextValueType)
{
case ContextValueType::Bool:
m_b = other.m_b;
break;
case ContextValueType::Int:
m_i = other.m_i;
break;
case ContextValueType::StdString:
::new (&m_s) std::string(std::move(other.m_s));
break;
case ContextValueType::UniquePtr:
::new (&m_p) std::unique_ptr<ValueBase>(std::move(other.m_p));
break;
case ContextValueType::Undefined:
break;
}
{
case ContextValueType::Bool:
m_b = other.m_b;
break;
case ContextValueType::Int:
m_i = other.m_i;
break;
case ContextValueType::StdString:
::new (&m_s) std::string(std::move(other.m_s));
break;
case ContextValueType::UniquePtr:
::new (&m_p) std::unique_ptr<ValueBase>(std::move(other.m_p));
break;
case ContextValueType::Undefined:
break;
}
}
~ContextValue()
{
switch (m_contextValueType)
{
case ContextValueType::StdString:
m_s.~basic_string();
break;
case ContextValueType::UniquePtr:
m_p.~unique_ptr<ValueBase>();
break;
case ContextValueType::Bool:
case ContextValueType::Int:
case ContextValueType::Undefined:
break;
}
{
case ContextValueType::StdString:
m_s.~basic_string();
break;
case ContextValueType::UniquePtr:
m_p.~unique_ptr<ValueBase>();
break;
case ContextValueType::Bool:
case ContextValueType::Int:
case ContextValueType::Undefined:
break;
}
}
ContextValue& operator=(const ContextValue& other) = delete;
@ -104,36 +104,36 @@ namespace Azure { namespace Core {
template <> inline const bool& ContextValue::Get() const noexcept
{
if (m_contextValueType != ContextValueType::Bool)
{
abort();
}
{
abort();
}
return m_b;
}
template <> inline const int& ContextValue::Get() const noexcept
{
if (m_contextValueType != ContextValueType::Int)
{
abort();
}
{
abort();
}
return m_i;
}
template <> inline const std::string& ContextValue::Get() const noexcept
{
if (m_contextValueType != ContextValueType::StdString)
{
abort();
}
{
abort();
}
return m_s;
}
template <> inline const std::unique_ptr<ValueBase>& ContextValue::Get() const noexcept
{
if (m_contextValueType != ContextValueType::UniquePtr)
{
abort();
}
{
abort();
}
return m_p;
}
@ -190,21 +190,30 @@ namespace Azure { namespace Core {
const ContextValue& operator[](const std::string& key)
{
if (!key.empty())
{
for (auto ptr = m_contextSharedState; ptr; ptr = ptr->Parent)
{
for (auto ptr = m_contextSharedState; ptr; ptr = ptr->Parent)
{
if (ptr->Key == key)
{
return ptr->Value;
}
}
if (ptr->Key == key)
{
return ptr->Value;
}
}
}
static ContextValue empty;
return empty;
}
void Cancel() { m_contextSharedState->CancelAt = time_point::min(); }
void ThrowIfCanceled()
{
if (CancelWhen() < std::chrono::system_clock::now())
{
// TODO: Runtime Exc
throw;
}
}
};
}} // namespace Azure::Core
}} // namespace Azure::Core

View File

@ -0,0 +1,176 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
#pragma once
#ifdef POSIX
#include <unistd.h>
#endif
#ifdef WINDOWS
#define WIN32_LEAN_AND_MEAN
#ifndef NOMINMAX
#define NOMINMAX
#endif
#include <Windows.h>
#endif // Windows
#include <algorithm>
#include <context.hpp>
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <memory>
#include <vector>
namespace Azure { namespace Core { namespace Http {
// BodyStream is used to read data to/from a service
class BodyStream {
public:
virtual ~BodyStream() = default;
// Returns the length of the data; used with the HTTP Content-Length header
virtual int64_t Length() const = 0;
// Resets the stream back to the beginning (for retries)
// Derived classes that send data in an HTTP request MUST override this and implement it
// properly.
virtual void Rewind()
{
throw "Not Implemented"; // TODO: Replace with best practice as defined by guideline
};
// Reads more data; throws if error/canceled
// return copied size
virtual int64_t Read(Context& context, uint8_t* buffer, int64_t count) = 0;
// Keep reading until buffer is all fill out of the end of stream content is reached
static int64_t ReadToCount(Context& context, BodyStream& body, uint8_t* buffer, int64_t count);
static std::vector<uint8_t> ReadToEnd(Context& context, BodyStream& body);
};
class MemoryBodyStream : public BodyStream {
private:
const uint8_t* m_data;
int64_t m_length;
int64_t m_offset = 0;
public:
// Forbid constructor for rval so we don't end up storing dangling ptr
MemoryBodyStream(std::vector<uint8_t> const&&) = delete;
MemoryBodyStream(std::vector<uint8_t> const& buffer)
: MemoryBodyStream(buffer.data(), static_cast<int64_t>(buffer.size()))
{
}
// cast as vector from ptr and length
explicit MemoryBodyStream(const uint8_t* data, int64_t length) : m_data(data), m_length(length)
{
}
int64_t Length() const override { return this->m_length; }
int64_t Read(Context& context, uint8_t* buffer, int64_t count) override;
void Rewind() override { m_offset = 0; }
};
// Use for request with no body
class NullBodyStream : public Azure::Core::Http::BodyStream {
public:
explicit NullBodyStream() {}
int64_t Length() const override { return 0; }
void Rewind() override {}
int64_t Read(Azure::Core::Context& context, uint8_t* buffer, int64_t count) override
{
(void)context;
(void)buffer;
(void)count;
return 0;
};
static NullBodyStream* GetNullBodyStream()
{
static NullBodyStream nullBodyStream;
return &nullBodyStream;
}
};
#ifdef POSIX
class FileBodyStream : public BodyStream {
private:
// in mutable
int m_fd;
int64_t m_baseOffset;
int64_t m_length;
// mutable
int64_t m_offset;
public:
FileBodyStream(int fd, int64_t offset, int64_t length)
: m_fd(fd), m_baseOffset(offset), m_length(length), m_offset(0)
{
}
// Rewind seek back to 0
void Rewind() override { this->m_offset = 0; }
int64_t Read(Azure::Core::Context& context, uint8_t* buffer, int64_t count) override;
int64_t Length() const override { return this->m_length; };
};
#endif
#ifdef WINDOWS
class FileBodyStream : public BodyStream {
private:
// in mutable
HANDLE m_hFile;
int64_t m_baseOffset;
int64_t m_length;
// mutable
int64_t m_offset;
public:
FileBodyStream(HANDLE hFile, int64_t offset, int64_t length)
: m_hFile(hFile), m_baseOffset(offset), m_length(length), m_offset(0)
{
}
// Rewind seek back to 0
void Rewind() override { this->m_offset = 0; }
int64_t Read(Azure::Core::Context& context, uint8_t* buffer, int64_t count) override;
int64_t Length() const override { return this->m_length; };
};
#endif // Windows
class LimitBodyStream : public BodyStream {
private:
BodyStream* m_inner;
int64_t m_length;
int64_t m_bytesRead = 0;
public:
LimitBodyStream(BodyStream* inner, int64_t max_length)
: m_inner(inner), m_length(std::min(inner->Length(), max_length))
{
}
int64_t Length() const override { return this->m_length; }
void Rewind() override
{
this->m_inner->Rewind();
this->m_bytesRead = 0;
}
int64_t Read(Context& context, uint8_t* buffer, int64_t count) override;
};
}}} // namespace Azure::Core::Http

View File

@ -14,8 +14,8 @@
namespace Azure { namespace Core { namespace Http {
constexpr auto UploadStreamPageSize = 1024 * 64;
constexpr auto LibcurlReaderSize = 100;
constexpr auto UploadStreamPageSize = 1024;
constexpr auto LibcurlReaderSize = 1024;
/**
* @brief Statefull component that controls sending an HTTP Request with libcurl thru the wire and
@ -29,7 +29,7 @@ namespace Azure { namespace Core { namespace Http {
* @remarks This component is expected to be used by an HTTP Transporter to ensure that
* transporter to be re usuable in multiple pipelines while every call to network is unique.
*/
class CurlSession {
class CurlSession : public BodyStream {
private:
/**
* @brief Enum used by ResponseBufferParser to control the parsing internal state while building
@ -111,7 +111,7 @@ namespace Azure { namespace Core { namespace Http {
* @param bufferSize Indicates the size of the buffer.
* @return Returns the index of the last parsed position from buffer.
*/
size_t BuildStatusCode(uint8_t const* const buffer, size_t const bufferSize);
int64_t BuildStatusCode(uint8_t const* const buffer, int64_t const bufferSize);
/**
* @brief This method is invoked by the Parsing process if the internal state is set to
@ -123,7 +123,7 @@ namespace Azure { namespace Core { namespace Http {
* @return Returns the index of the last parsed position from buffer. When the returned value
* is smaller than the body size, means there is part of the body response in the buffer.
*/
size_t BuildHeader(uint8_t const* const buffer, size_t const bufferSize);
int64_t BuildHeader(uint8_t const* const buffer, int64_t const bufferSize);
public:
/**
@ -152,7 +152,7 @@ namespace Azure { namespace Core { namespace Http {
* Returning a value smaller than the buffer size will likely indicate that the HTTP Response
* is completed and that the rest of the buffer contains part of the response body.
*/
size_t Parse(uint8_t const* const buffer, size_t const bufferSize);
int64_t Parse(uint8_t const* const buffer, int64_t const bufferSize);
/**
* @brief Indicates when the parser has completed parsing and building the HTTP Response.
@ -222,7 +222,7 @@ namespace Azure { namespace Core { namespace Http {
* decide how much data to take from the inner buffer before pulling more data from network.
*
*/
size_t m_bodyStartInBuffer;
int64_t m_bodyStartInBuffer;
/**
* @brief Control field to handle the number of bytes containing relevant data within the
@ -230,7 +230,7 @@ namespace Azure { namespace Core { namespace Http {
* from wire into it, it can be holding less then N bytes.
*
*/
size_t m_innerBufferSize;
int64_t m_innerBufferSize;
/**
* @brief Defines the strategy to read a body from an HTTP Response
@ -247,7 +247,9 @@ namespace Azure { namespace Core { namespace Http {
* are expecting to.
*
*/
uint64_t m_contentLength;
int64_t m_contentLength;
int64_t m_sessionTotalRead = 0;
/**
* @brief Internal buffer from a session used to read bytes from a socket. This buffer is only
@ -323,7 +325,7 @@ namespace Azure { namespace Core { namespace Http {
*
* @return CURL_OK when response is sent successfully.
*/
CURLcode HttpRawSend();
CURLcode HttpRawSend(Context& context);
/**
* @brief This method will use libcurl socket to write all the bytes from buffer.
@ -334,7 +336,7 @@ namespace Azure { namespace Core { namespace Http {
* @param bufferSize size of the buffer to send.
* @return CURL_OK when response is sent successfully.
*/
CURLcode SendBuffer(uint8_t* buffer, size_t bufferSize);
CURLcode SendBuffer(uint8_t const* buffer, size_t bufferSize);
/**
* @brief This function is used after sending an HTTP request to the server to read the HTTP
@ -354,8 +356,7 @@ namespace Azure { namespace Core { namespace Http {
* @return return the numbers of bytes pulled from socket. It can be less than what it was
* requested.
*/
uint64_t ReadSocketToBuffer(uint8_t* buffer, size_t bufferSize);
uint64_t ReadChunkedBody(uint8_t* buffer, uint64_t bufferSize, uint64_t offset);
int64_t ReadSocketToBuffer(uint8_t* buffer, int64_t bufferSize);
public:
/**
@ -387,17 +388,23 @@ namespace Azure { namespace Core { namespace Http {
*/
std::unique_ptr<Azure::Core::Http::Response> GetResponse();
/**
* @brief Helper method for reading with a Stream. Function will figure it out where to get
* bytes from (either the libcurl socket of the internal buffer from session). The offset is
* how stream controls how much it was already read.
*
* @param buffer ptr to a buffer where to write bytes from HTTP Response body.
* @param bufferSize size of the buffer.
* @param offset the number of bytes previously read.
* @return the number of bytes read.
*/
uint64_t ReadWithOffset(uint8_t* buffer, uint64_t bufferSize, uint64_t offset);
int64_t Length() const override
{
if (this->m_bodyLengthType == ResponseBodyLengthType::Chunked
|| this->m_bodyLengthType == ResponseBodyLengthType::ReadToCloseConnection)
{
return -1;
}
if (this->m_bodyLengthType == ResponseBodyLengthType::Chunked)
{
return 0;
}
return this->m_contentLength;
}
void Rewind() override {}
int64_t Read(Azure::Core::Context& context, uint8_t* buffer, int64_t count) override;
};
/**
@ -416,85 +423,4 @@ namespace Azure { namespace Core { namespace Http {
std::unique_ptr<Response> Send(Context& context, Request& request) override;
};
/**
* @brief concrete implementation of a body stream to read bytes for the HTTP body using libcurl
* handler.
*/
class CurlBodyStream : public Azure::Core::Http::BodyStream {
private:
/**
* @brief length of the entire HTTP Response body.
*
*/
uint64_t m_length;
/**
* @brief reference to a Curl Session with all the configuration to be used to read from wire.
*
*/
CurlSession* m_curlSession;
/**
* @brief Numbers of bytes already read.
*
*/
uint64_t m_offset;
bool m_unknownSize;
public:
/**
* @brief Construct a new Curl Body Stream object.
*
* @param length size of the HTTP Response body.
* @param curlSession reference to a libcurl session that contains the libcurl handler to be
* used.
*/
CurlBodyStream(uint64_t length, CurlSession* curlSession)
: m_length(length), m_curlSession(curlSession), m_offset(0)
{
}
CurlBodyStream(CurlSession* curlSession)
: m_length(0), m_curlSession(curlSession), m_offset(0), m_unknownSize(true)
{
}
/**
* @brief Gets the length of the HTTP Response body.
*
* @return uint64_t
*/
uint64_t Length() const override { return this->m_length; }
/**
* @brief Gets the number of bytes received on count from netwok. Copies the bytes to the
* buffer.
*
* @param buffer ptr to a buffer where to copy bytes from network.
* @param count number of bytes to copy from network into buffer.
* @return the number of read and copied bytes from network to buffer.
*/
uint64_t Read(uint8_t* buffer, uint64_t count) override
{
if (this->m_length == this->m_offset && !this->m_unknownSize)
{
return 0;
}
// Read bytes from curl into buffer. As max as the length of Stream is allowed
auto readCount = this->m_curlSession->ReadWithOffset(buffer, count, this->m_offset);
this->m_offset += readCount;
return readCount;
}
/**
* @brief clean up heap. Removes the libcurl session and stream from the heap.
*
* @remark calling this method deletes the stream.
*
*/
void Close() override{};
};
}}} // namespace Azure::Core::Http

View File

@ -3,7 +3,7 @@
#pragma once
#include "stream.hpp"
#include "body_stream.hpp"
#include <algorithm>
#include <internal/contract.hpp>
@ -214,8 +214,8 @@ namespace Azure { namespace Core { namespace Http {
std::map<std::string, std::string> m_headers;
std::map<std::string, std::string> m_retryHeaders;
std::map<std::string, std::string> m_retryQueryParameters;
// Work only with streams
std::unique_ptr<BodyStream> m_bodyStream;
BodyStream* m_bodyStream;
// flag to know where to insert header
bool m_retryModeEnabled;
@ -233,14 +233,17 @@ namespace Azure { namespace Core { namespace Http {
std::string GetQueryString() const;
public:
Request(HttpMethod httpMethod, std::string const& url, std::unique_ptr<BodyStream> bodyStream)
: m_method(std::move(httpMethod)), m_url(url), m_bodyStream(std::move(bodyStream)),
Request(HttpMethod httpMethod, std::string const& url, BodyStream* bodyStream)
: m_method(std::move(httpMethod)), m_url(url), m_bodyStream(bodyStream),
m_retryModeEnabled(false)
{
}
// Typically used for GET with no request body.
Request(HttpMethod httpMethod, std::string const& url) : Request(httpMethod, url, nullptr) {}
Request(HttpMethod httpMethod, std::string const& url)
: Request(httpMethod, url, NullBodyStream::GetNullBodyStream())
{
}
// Methods used to build HTTP request
void AppendPath(std::string const& path);
@ -253,7 +256,7 @@ namespace Azure { namespace Core { namespace Http {
std::string GetEncodedUrl() const; // should call URL encode
std::string GetHost() const;
std::map<std::string, std::string> GetHeaders() const;
BodyStream* GetBodyStream();
BodyStream* GetBodyStream() { return this->m_bodyStream; }
std::string GetHTTPMessagePreBody() const;
};
@ -323,18 +326,9 @@ namespace Azure { namespace Core { namespace Http {
std::map<std::string, std::string> const& GetHeaders();
std::unique_ptr<BodyStream> GetBodyStream()
{
if (this->m_bodyStream == nullptr)
{
// Moved before or not yet created
return nullptr;
}
// If m_bodyStream was moved before. nullpr is returned
return std::move(this->m_bodyStream);
}
// Allocates a buffer in heap and reads and copy stream content into it.
// util for any API that needs to get the content from stream as a buffer
static std::unique_ptr<std::vector<uint8_t>> ConstructBodyBufferFromStream(
BodyStream* const stream);
};
}}} // namespace Azure::Core::Http

View File

@ -37,10 +37,11 @@ namespace Azure { namespace Core { namespace Http {
/**
* @brief Starts the pipeline
* @param ctx A cancellation token. Can also be used to provide overrides to individual policies
* @param ctx A cancellation token. Can also be used to provide overrides to individual
* policies
* @param request The request to be processed
* @return unique_ptr<Response>
*/
*/
std::unique_ptr<Response> Send(Context& ctx, Request& request) const
{
return m_policies[0]->Send(ctx, request, NextHttpPolicy(0, &m_policies));

View File

@ -1,125 +0,0 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
#pragma once
#include <algorithm>
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <vector>
namespace Azure { namespace Core { namespace Http {
// BodyStream is used to read data to/from a service
class BodyStream {
public:
virtual ~BodyStream() = 0;
// Returns the length of the data; used with the HTTP Content-Length header
virtual uint64_t Length() const = 0;
// Resets the stream back to the beginning (for retries)
// Derived classes that send data in an HTTP request MUST override this and implement it
// properly.
virtual void Rewind()
{
throw "Not Implemented"; // TODO: Replace with best practice as defined by guideline
};
// Reads more data; throws if error/canceled
// return copied size
virtual uint64_t Read(/*Context& context, */ uint8_t* buffer, uint64_t count) = 0;
// Closes the stream; typically called after all data read or if an error occurs.
virtual void Close() = 0;
};
class MemoryBodyStream : public BodyStream {
private:
std::vector<uint8_t> m_buffer;
uint64_t m_offset = 0;
public:
MemoryBodyStream(std::vector<uint8_t> buffer) : m_buffer(std::move(buffer)) {}
// cast as vector from ptr and length
MemoryBodyStream(uint8_t* ptr, uint64_t length) : m_buffer(ptr, ptr + length) {}
uint64_t Length() const override { return this->m_buffer.size(); }
uint64_t Read(uint8_t* buffer, uint64_t count) override
{
uint64_t copy_length = std::min(count, (this->m_buffer.size() - m_offset));
// Copy what's left or just the count
std::memcpy(buffer, m_buffer.data() + m_offset, (size_t)copy_length);
// move position
m_offset += copy_length;
return copy_length;
}
void Rewind() override { m_offset = 0; }
void Close() override {}
};
/*
TODO: fix file to work multi-platform
class FileBodyStream : public BodyStream {
private:
FILE* stream;
uint64_t length;
public:
FileBodyStream(FILE* stream)
{
// set internal fields
this->stream = stream;
// calculate size seeking end...
this->length = fseeko64(stream, 0, SEEK_END);
// seek back to beggin
this->Rewind();
}
// Rewind seek back to 0
void Rewind() { rewind(this->stream); }
uint64_t Read( uint8_t* buffer, uint64_t count)
{
// do static cast here?
return (uint64_t)fread(buffer, 1, count, this->stream);
}
// close does nothing opp
void Close() { fclose(this->stream); }
}; */
class LimitBodyStream : public BodyStream {
BodyStream* m_inner;
uint64_t m_length;
uint64_t m_bytesRead = 0;
LimitBodyStream(BodyStream* inner, uint64_t max_length)
: m_inner(inner), m_length(std::min(inner->Length(), max_length))
{
}
uint64_t Length() const override { return this->m_length; }
void Rewind() override
{
this->m_inner->Rewind();
this->m_bytesRead = 0;
}
uint64_t Read(uint8_t* buffer, uint64_t count) override
{
// Read up to count or whatever length is remaining; whichever is less
uint64_t bytesRead
= m_inner->Read(buffer, std::min(count, this->m_length - this->m_bytesRead));
this->m_bytesRead += bytesRead;
return bytesRead;
}
void Close() override { this->m_inner->Close(); }
};
}}} // namespace Azure::Core::Http

View File

@ -2,10 +2,10 @@
// SPDX-License-Identifier: MIT
#include <credentials/credentials.hpp>
#include <http/body_stream.hpp>
#include <http/curl/curl.hpp>
#include <http/http.hpp>
#include <http/pipeline.hpp>
#include <http/stream.hpp>
#include <iomanip>
#include <sstream>
#include <stdexcept>
@ -64,20 +64,14 @@ AccessToken ClientSecretCredential::GetToken(
}
auto const bodyString = body.str();
std::vector<std::uint8_t> bodyVec;
bodyVec.reserve(bodyString.size());
for (auto c : bodyString)
{
bodyVec.push_back(static_cast<std::uint8_t>(c));
}
auto bodyStream
= std::make_unique<Http::MemoryBodyStream>((uint8_t*)bodyString.data(), bodyString.size());
auto bodyStream = std::make_unique<Http::MemoryBodyStream>(bodyVec);
Http::Request request(Http::HttpMethod::Post, url.str(), std::move(bodyStream));
Http::Request request(Http::HttpMethod::Post, url.str(), bodyStream.get());
bodyStream.release();
request.AddHeader("Content-Type", "application/x-www-form-urlencoded");
request.AddHeader("Content-Length", std::to_string(bodyVec.size()));
request.AddHeader("Content-Length", std::to_string(bodyString.size()));
std::shared_ptr<Http::HttpTransport> transport = std::make_unique<Http::CurlTransport>();
@ -114,8 +108,11 @@ AccessToken ClientSecretCredential::GetToken(
std::string responseBody(static_cast<std::string::size_type>(responseStreamLength), 0);
responseStream->Read(
static_cast<std::uint8_t*>(static_cast<void*>(&responseBody[0])), responseStreamLength);
Azure::Core::Http::BodyStream::ReadToCount(
context,
*responseStream,
static_cast<std::uint8_t*>(static_cast<void*>(&responseBody[0])),
responseStreamLength);
// TODO: use JSON parser.
auto const responseBodySize = responseBody.size();

View File

@ -0,0 +1,124 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
#ifdef POSIX
#include <unistd.h>
#endif
#ifdef WINDOWS
#define WIN32_LEAN_AND_MEAN
#define NOMINMAX
#include <Windows.h>
#endif // Windows
#include <algorithm>
#include <context.hpp>
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <http/body_stream.hpp>
#include <memory>
#include <vector>
using namespace Azure::Core::Http;
// Keep reading until buffer is all fill out of the end of stream content is reached
int64_t BodyStream::ReadToCount(Context& context, BodyStream& body, uint8_t* buffer, int64_t count)
{
int64_t totalRead = 0;
for (;;)
{
int64_t readBytes = body.Read(context, buffer + totalRead, count - totalRead);
totalRead += readBytes;
// Reach all of buffer size
if (totalRead == count || readBytes == 0)
{
return totalRead;
}
}
}
std::vector<uint8_t> BodyStream::ReadToEnd(Context& context, BodyStream& body)
{
constexpr int64_t chunkSize = 1024 * 8;
auto buffer = std::vector<uint8_t>();
for (auto chunkNumber = 0;; chunkNumber++)
{
buffer.resize((chunkNumber + 1) * chunkSize);
int64_t readBytes
= ReadToCount(context, body, buffer.data() + (chunkNumber * chunkSize), chunkSize);
if (readBytes < chunkSize)
{
buffer.resize(static_cast<size_t>((chunkNumber * chunkSize) + readBytes));
return buffer;
}
}
}
int64_t MemoryBodyStream::Read(Context& context, uint8_t* buffer, int64_t count)
{
context.ThrowIfCanceled();
int64_t copy_length = std::min(count, static_cast<int64_t>(this->m_length - this->m_offset));
// Copy what's left or just the count
std::memcpy(buffer, this->m_data + m_offset, static_cast<size_t>(copy_length));
// move position
m_offset += copy_length;
return copy_length;
}
#ifdef POSIX
int64_t FileBodyStream::Read(Azure::Core::Context& context, uint8_t* buffer, int64_t count)
{
context.ThrowIfCanceled();
auto result = pread(
this->m_fd,
buffer,
std::min(count, this->m_length - this->m_offset),
this->m_baseOffset + this->m_offset);
this->m_offset += result;
return result;
}
#endif
#ifdef WINDOWS
int64_t FileBodyStream::Read(Azure::Core::Context& context, uint8_t* buffer, int64_t count)
{
context.ThrowIfCanceled();
DWORD numberOfBytesRead;
auto o = OVERLAPPED();
o.Offset = (DWORD)(this->m_baseOffset + this->m_offset);
o.OffsetHigh = (DWORD)((this->m_baseOffset + this->m_offset) >> 32);
auto result = ReadFile(
this->m_hFile,
buffer,
// at most 4Gb to be read
(DWORD)std::min(
(uint64_t)0xFFFFFFFFUL, (uint64_t)std::min(count, (this->m_length - this->m_offset))),
&numberOfBytesRead,
&o);
(void)result;
this->m_offset += numberOfBytesRead;
return numberOfBytesRead;
}
#endif // Windows
int64_t LimitBodyStream::Read(Context& context, uint8_t* buffer, int64_t count)
{
(void)context;
// Read up to count or whatever length is remaining; whichever is less
uint64_t bytesRead
= m_inner->Read(context, buffer, std::min(count, this->m_length - this->m_bytesRead));
this->m_bytesRead += bytesRead;
return bytesRead;
}

View File

@ -10,8 +10,8 @@ using namespace Azure::Core::Http;
std::unique_ptr<Response> CurlTransport::Send(Context& context, Request& request)
{
// Create CurlSession in heap. We will point to it from response's stream to keep it alive
CurlSession* session = new CurlSession(request);
// Create CurlSession to perform request
auto session = std::make_unique<CurlSession>(request);
auto performing = session->Perform(context);
@ -31,7 +31,11 @@ std::unique_ptr<Response> CurlTransport::Send(Context& context, Request& request
}
}
return session->GetResponse();
// Move Response out of the session
auto response = session->GetResponse();
// Move the ownership of the CurlSession (bodyStream) to the response
response->SetBodyStream(std::move(session));
return response;
}
CURLcode CurlSession::Perform(Context& context)
@ -76,7 +80,7 @@ CURLcode CurlSession::Perform(Context& context)
}
// Send request
settingUp = HttpRawSend();
settingUp = HttpRawSend(context);
if (settingUp != CURLE_OK)
{
return settingUp;
@ -159,7 +163,7 @@ CURLcode CurlSession::SetConnectOnly()
}
// Send buffer thru the wire
CURLcode CurlSession::SendBuffer(uint8_t* buffer, size_t bufferSize)
CURLcode CurlSession::SendBuffer(uint8_t const* buffer, size_t bufferSize)
{
if (bufferSize <= 0)
{
@ -198,18 +202,19 @@ CURLcode CurlSession::SendBuffer(uint8_t* buffer, size_t bufferSize)
}
// custom sending to wire an http request
CURLcode CurlSession::HttpRawSend()
CURLcode CurlSession::HttpRawSend(Context& context)
{
auto rawRequest = this->m_request.GetHTTPMessagePreBody();
uint64_t rawRequestLen = rawRequest.size();
int64_t rawRequestLen = rawRequest.size();
CURLcode sendResult = SendBuffer((uint8_t*)rawRequest.data(), (size_t)rawRequestLen);
CURLcode sendResult = SendBuffer(
reinterpret_cast<uint8_t const*>(rawRequest.data()), static_cast<size_t>(rawRequestLen));
auto streamBody = this->m_request.GetBodyStream();
if (streamBody == nullptr)
if (streamBody->Length() == 0)
{
// Finish request with no body
uint8_t endOfRequest[] = "0";
uint8_t const endOfRequest[] = "0";
return SendBuffer(endOfRequest, 1); // need one more byte to end request
}
@ -219,8 +224,8 @@ CURLcode CurlSession::HttpRawSend()
auto buffer = unique_buffer.get();
while (rawRequestLen > 0)
{
rawRequestLen = streamBody->Read(buffer, UploadStreamPageSize);
sendResult = SendBuffer(buffer, (size_t)rawRequestLen);
rawRequestLen = streamBody->Read(context, buffer, UploadStreamPageSize);
sendResult = SendBuffer(buffer, static_cast<size_t>(rawRequestLen));
}
return sendResult;
}
@ -229,7 +234,7 @@ CURLcode CurlSession::HttpRawSend()
CURLcode CurlSession::ReadStatusLineAndHeadersFromRawResponse()
{
auto parser = ResponseBufferParser();
auto bufferSize = uint64_t();
auto bufferSize = int64_t();
// Select a default reading strategy. // No content-length or Transfer-Encoding
this->m_bodyLengthType = ResponseBodyLengthType::ReadToCloseConnection;
@ -241,7 +246,7 @@ CURLcode CurlSession::ReadStatusLineAndHeadersFromRawResponse()
bufferSize = ReadSocketToBuffer(this->m_readBuffer, LibcurlReaderSize);
// parse from buffer to create response
auto bytesParsed = parser.Parse(this->m_readBuffer, (size_t)bufferSize);
auto bytesParsed = parser.Parse(this->m_readBuffer, static_cast<size_t>(bufferSize));
// if end of headers is reach before the end of response, that's where body start
if (bytesParsed + 2 < bufferSize)
{
@ -256,7 +261,6 @@ CURLcode CurlSession::ReadStatusLineAndHeadersFromRawResponse()
if (this->m_request.GetMethod() == HttpMethod::Head)
{
this->m_bodyLengthType = ResponseBodyLengthType::NoBody;
this->m_response->SetBodyStream(std::make_unique<CurlBodyStream>(0, this));
return CURLE_OK;
}
@ -272,8 +276,6 @@ CURLcode CurlSession::ReadStatusLineAndHeadersFromRawResponse()
// more (unique_ptr), so we save this value
this->m_contentLength = bodySize;
this->m_bodyLengthType = ResponseBodyLengthType::ContentLength;
// Move session to live inside the stream from response.
this->m_response->SetBodyStream(std::make_unique<CurlBodyStream>(bodySize, this));
return CURLE_OK;
}
@ -288,6 +290,7 @@ CURLcode CurlSession::ReadStatusLineAndHeadersFromRawResponse()
// set curl session to know response is chunked
// This will be used to remove chunked info while reading
this->m_bodyLengthType = ResponseBodyLengthType::Chunked;
return CURLE_OK;
}
}
@ -301,210 +304,126 @@ CURLcode CurlSession::ReadStatusLineAndHeadersFromRawResponse()
// Use unknown size CurlBodyStream. CurlSession will use the ResponseBodyLengthType to select a
// reading strategy
this->m_response->SetBodyStream(std::make_unique<CurlBodyStream>(this));
this->m_bodyLengthType = ResponseBodyLengthType::ReadToCloseConnection;
return CURLE_OK;
}
uint64_t CurlSession::ReadChunkedBody(uint8_t* buffer, uint64_t bufferSize, uint64_t offset)
// Read from curl session
int64_t CurlSession::Read(Azure::Core::Context& context, uint8_t* buffer, int64_t count)
{
// Remove the chunk info up to the next delimiter \r\n
if (offset == 0)
context.ThrowIfCanceled();
auto totalRead = int64_t();
// Take data from inner buffer if any
if (this->m_bodyStartInBuffer > 0)
{
// first time calling read. move to the next \r
if (this->m_bodyStartInBuffer > 0
&& this->m_bodyStartInBuffer + offset < this->m_innerBufferSize)
if (this->m_readBuffer[this->m_bodyStartInBuffer] == '\n' && this->m_sessionTotalRead == 0)
{
for (uint64_t index = 1; index < this->m_innerBufferSize - this->m_bodyStartInBuffer; index++)
// first read. Need to move to next position
if (this->m_bodyLengthType == ResponseBodyLengthType::Chunked)
{
if (this->m_readBuffer[this->m_bodyStartInBuffer + index] == '\r')
// For chunked, first advance until next `\r` after chunked size (\nsomeNumber\r\n)
auto nextPosition = std::find(
this->m_readBuffer + this->m_bodyStartInBuffer,
this->m_readBuffer + this->m_innerBufferSize,
'\r');
if (nextPosition != this->m_readBuffer + this->m_innerBufferSize)
{
// found end of chunked info. Start reading from there
return ReadChunkedBody(buffer, bufferSize, offset + index + 1); // +1 to skip found '\r'
}
// Found possition of next '\r', +1 jumps the \r
this->m_bodyStartInBuffer
+= std::distance(this->m_readBuffer + this->m_bodyStartInBuffer, nextPosition) + 1;
// Check if the end of body is also at inner buffer
auto endOfChunk = std::find(
this->m_readBuffer + this->m_bodyStartInBuffer,
this->m_readBuffer + this->m_innerBufferSize,
'\r');
if (endOfChunk != this->m_readBuffer + this->m_innerBufferSize)
{
this->m_innerBufferSize
-= std::distance(endOfChunk, this->m_readBuffer + this->m_innerBufferSize);
}
} // TODO: else read from wire until next \r
}
// Inner buffer only has part or chunked info. Set it as no body in it
// Then read again
this->m_bodyStartInBuffer = 0;
return ReadChunkedBody(buffer, bufferSize, offset);
this->m_bodyStartInBuffer += 1;
}
else
if (this->m_bodyStartInBuffer < this->m_innerBufferSize)
{
// nothing on internal buffer, and first read. Let's read from socket until we found \r
auto totalRead = uint64_t();
while (ReadSocketToBuffer(buffer, 1) != 0)
// still have data to take from innerbuffer
MemoryBodyStream innerBufferMemoryStream(
this->m_readBuffer + this->m_bodyStartInBuffer,
this->m_innerBufferSize - this->m_bodyStartInBuffer);
totalRead = innerBufferMemoryStream.Read(context, buffer, count);
this->m_bodyStartInBuffer += totalRead;
this->m_sessionTotalRead += totalRead;
if (this->m_bodyStartInBuffer == this->m_innerBufferSize)
{
totalRead += 1;
if (buffer[0] == '\r')
{
return ReadChunkedBody(buffer, bufferSize, offset + totalRead);
}
this->m_bodyStartInBuffer = 0; // read everyting from inner buffer already
}
// Didn't fin the end of chunked data in body. throw
throw;
return totalRead;
}
// After moving the reading start we reached the end
this->m_bodyStartInBuffer = 0;
}
uint64_t totalOffset = this->m_bodyStartInBuffer + offset;
auto writePosition = buffer;
auto toBeWritten = bufferSize;
auto bytesRead = uint64_t();
// At this point, offset must be greater than 0, and we are after \r. We must read \n next and
// then the body
if (this->m_bodyStartInBuffer > 0 && totalOffset < this->m_innerBufferSize)
// if the last position in inner buffer is `\r` it means the next
// thing we read from wire is `\n`. (usually this is when reading 1byte per time from wire)
if (this->m_readBuffer[this->m_innerBufferSize - 1] == '\r')
{
if (this->m_readBuffer[totalOffset] == '\n')
{
// increase offset and advance to next position
return ReadChunkedBody(buffer, bufferSize, offset + 1);
}
// Check if the end of chunked body is at inner buffer
auto endOfChunkedBody = std::find(
this->m_readBuffer + totalOffset, this->m_readBuffer + this->m_innerBufferSize, '\r');
if (endOfChunkedBody != this->m_readBuffer + this->m_innerBufferSize)
{
// reduce the size of the body to the end of body. This way trying to read more than the body
// end will end up reading up to the body end only
this->m_innerBufferSize
= std::distance(this->m_readBuffer + this->m_innerBufferSize, endOfChunkedBody);
toBeWritten = 0; // Setting to zero to avoid reading from buffer
}
// Still have some body content in internal buffer after skipping \n
if (bufferSize < this->m_innerBufferSize - totalOffset)
{
// requested less content than available in internal buffer
std::memcpy(buffer, this->m_readBuffer + totalOffset, (size_t)bufferSize);
return bufferSize;
}
// requested more than what it's available in internal buffer
bytesRead = this->m_innerBufferSize - totalOffset;
std::memcpy(buffer, this->m_readBuffer + totalOffset, (size_t)bytesRead + 1);
writePosition += bytesRead;
// setting toBeWritten
if (toBeWritten > 0)
{
toBeWritten -= bytesRead;
}
// Read one possition from socket on same user buffer, We wil override the value after this
ReadSocketToBuffer(buffer, 1);
}
if (toBeWritten > 0)
{
// Read from socket
bytesRead += ReadSocketToBuffer(writePosition, (size_t)toBeWritten);
if (bytesRead > 0)
{
// Check if reading include chunked termination and remove it if true
auto endOfBody = std::find(buffer, buffer + bytesRead, '\r');
if (endOfBody != buffer + bytesRead)
{
// Read all remaining to close connection
{
constexpr uint64_t finalRead = 50; // usually only 5 more bytes are gotten "0\r\n\r\n"
uint8_t b[finalRead];
ReadSocketToBuffer(b, finalRead);
curl_easy_cleanup(this->m_pCurl);
}
return bytesRead - std::distance(endOfBody, buffer + bytesRead) + 1;
}
return bytesRead; // didn't find end of body
}
}
// Return read bytes
return 0;
}
uint64_t CurlSession::ReadWithOffset(uint8_t* buffer, uint64_t bufferSize, uint64_t offset)
{
if (bufferSize <= 0)
if (this->m_bodyLengthType == ResponseBodyLengthType::ContentLength
&& this->m_sessionTotalRead == this->m_contentLength)
{
// Read everything already
curl_easy_cleanup(this->m_pCurl);
return 0;
}
if (this->m_bodyLengthType == ResponseBodyLengthType::Chunked)
// Read from socket
totalRead = ReadSocketToBuffer(buffer, static_cast<size_t>(count));
this->m_sessionTotalRead += totalRead;
if (this->m_bodyLengthType == ResponseBodyLengthType::Chunked && totalRead > 0)
{
// won't use content-length as the maximun to be read
return ReadChunkedBody(buffer, bufferSize, offset);
}
// calculate where to start reading from inner buffer
auto fixedOffset
= offset == 0 ? offset + 1 : offset; // advance the last '\n' from headers end on first read
auto innerBufferStart = this->m_bodyStartInBuffer + fixedOffset;
// total size from content-length header less any bytes already read
auto remainingBodySize = this->m_contentLength - fixedOffset;
// set ptr for writting
auto writePosition = buffer;
// Set the max to be written as the size of remaining body size
auto bytesToWrite = std::min<>(bufferSize, remainingBodySize);
// total of bytes read (any from inner buffer plus any from socket)
uint64_t bytesRead = uint64_t();
// If bodyStartInBuffer is set and while innerBufferStart is less than the buffer, it means there
// is still data at innerbuffer for the body that is not yet read
if (this->m_bodyStartInBuffer > 0 && this->m_innerBufferSize > innerBufferStart)
{
// Calculate the right size of innerBuffer:
// It can be smaller than the total body, in that case we will take as much as the size of
// buffer
// It can be bugger than the total body, in that case we will take as much as the size of the
// body
auto innerBufferWithBodyContent = this->m_innerBufferSize - innerBufferStart;
auto innerbufferSize = remainingBodySize < innerBufferWithBodyContent
? remainingBodySize
: innerBufferWithBodyContent;
// Requested less data than what we have at inner buffer, take it from innerBuffer
if (bufferSize <= innerbufferSize)
// Check if the end of chunked is part of the body
auto endOfBody = std::find(buffer, buffer + totalRead, '\r');
if (endOfBody != buffer + totalRead)
{
std::memcpy(writePosition, this->m_readBuffer + innerBufferStart, (size_t)bytesToWrite);
return bytesToWrite;
if (buffer[0] == '0' && buffer + 1 == endOfBody)
{
// got already the end
curl_easy_cleanup(this->m_pCurl);
return 0;
}
// Read all remaining to close connection
{
constexpr int64_t finalRead = 50; // usually only 5 more bytes are gotten "0\r\n\r\n"
uint8_t b[finalRead];
ReadSocketToBuffer(b, finalRead);
curl_easy_cleanup(this->m_pCurl);
}
totalRead -= std::distance(endOfBody, buffer + totalRead);
}
// Requested more data than what we have at innerbuffer. Take all from inner buffer and continue
std::memcpy(writePosition, this->m_readBuffer + innerBufferStart, (size_t)innerbufferSize);
// Return if all body was read and theres not need to read socket
if (innerbufferSize == remainingBodySize)
{
// libcurl handle can be clean now. We won't request more data
curl_easy_cleanup(this->m_pCurl);
return innerbufferSize;
}
// next write will be done after reading from socket, move ptr to where to write and how many
// to write
writePosition += innerbufferSize;
bytesToWrite -= innerbufferSize;
bytesRead += innerbufferSize;
}
// read from socket the remaining requested bytes
bytesRead += ReadSocketToBuffer(writePosition, (size_t)bytesToWrite);
if (remainingBodySize - bytesRead == 0)
{
// No more to read from socket
curl_easy_cleanup(this->m_pCurl);
}
return bytesRead;
return totalRead;
}
// Read from socket and return the number of bytes taken from socket
uint64_t CurlSession::ReadSocketToBuffer(uint8_t* buffer, size_t bufferSize)
int64_t CurlSession::ReadSocketToBuffer(uint8_t* buffer, int64_t bufferSize)
{
CURLcode readResult;
size_t readBytes = 0;
do // try to read from socket until response is OK
{
readResult = curl_easy_recv(this->m_pCurl, buffer, bufferSize, &readBytes);
readResult = curl_easy_recv(this->m_pCurl, buffer, static_cast<size_t>(bufferSize), &readBytes);
if (readResult == CURLE_AGAIN)
{
readResult = CURLE_AGAIN;
}
// socket not ready. Wait or fail on timeout
if (readResult == CURLE_AGAIN && !WaitForSocketReady(this->m_curlSocket, 1, 60000L))
{
@ -517,16 +436,12 @@ uint64_t CurlSession::ReadSocketToBuffer(uint8_t* buffer, size_t bufferSize)
std::unique_ptr<Azure::Core::Http::Response> CurlSession::GetResponse()
{
if (this->m_response != nullptr)
{
return std::move(this->m_response);
}
return nullptr;
return std::move(this->m_response);
}
size_t CurlSession::ResponseBufferParser::Parse(
int64_t CurlSession::ResponseBufferParser::Parse(
uint8_t const* const buffer,
size_t const bufferSize)
int64_t const bufferSize)
{
if (this->m_parseCompleted)
{
@ -562,9 +477,9 @@ size_t CurlSession::ResponseBufferParser::Parse(
}
// Finds delimiter '\r' as the end of the
size_t CurlSession::ResponseBufferParser::BuildStatusCode(
int64_t CurlSession::ResponseBufferParser::BuildStatusCode(
uint8_t const* const buffer,
size_t const bufferSize)
int64_t const bufferSize)
{
if (this->state != ResponseParserState::StatusLine)
{
@ -613,9 +528,9 @@ size_t CurlSession::ResponseBufferParser::BuildStatusCode(
}
// Finds delimiter '\r' as the end of the
size_t CurlSession::ResponseBufferParser::BuildHeader(
int64_t CurlSession::ResponseBufferParser::BuildHeader(
uint8_t const* const buffer,
size_t const bufferSize)
int64_t const bufferSize)
{
if (this->state != ResponseParserState::Headers)
{

View File

@ -78,18 +78,6 @@ std::map<std::string, std::string> Request::GetHeaders() const
return Request::MergeMaps(this->m_retryHeaders, this->m_headers);
}
// Return a reference to the unique ptr. We don't move the ownership here out of response.
BodyStream* Request::GetBodyStream()
{
if (this->m_bodyStream == nullptr)
{
// no body in request
return nullptr;
}
// retuning BodyStream* without removing the ownership from Request.
return m_bodyStream.get();
}
// Writes an HTTP request with RFC2730 without the body (head line and headers)
// https://tools.ietf.org/html/rfc7230#section-3.1.1
std::string Request::GetHTTPMessagePreBody() const

View File

@ -51,25 +51,3 @@ void Response::SetBodyStream(std::unique_ptr<BodyStream> stream)
{
this->m_bodyStream = std::move(stream);
}
std::unique_ptr<std::vector<uint8_t>> Response::ConstructBodyBufferFromStream(
BodyStream* const stream)
{
if (stream == nullptr)
{
return nullptr;
}
auto const bodySize = stream->Length();
if (bodySize <= 0)
{
// no body to get
return nullptr;
}
std::unique_ptr<std::vector<uint8_t>> unique_buffer(new std::vector<uint8_t>((size_t)bodySize));
auto buffer = unique_buffer.get()->data();
stream->Read(buffer, bodySize);
return unique_buffer;
}

View File

@ -1,8 +0,0 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
#include <http/stream.hpp>
using namespace Azure::Core::Http;
BodyStream::~BodyStream() {}

View File

@ -8,6 +8,16 @@
#include "http/pipeline.hpp"
#ifdef POSIX
#include <fcntl.h>
#endif // Posix
#ifdef WINDOWS
#define WIN32_LEAN_AND_MEAN
#define NOMINMAX
#include <Windows.h>
#endif // Windows
#include <http/curl/curl.hpp>
#include <http/http.hpp>
#include <iostream>
@ -20,24 +30,18 @@ using namespace std;
constexpr auto BufferSize = 50;
std::vector<uint8_t> buffer(BufferSize);
Http::Request createGetRequest();
Http::Request createPutRequest();
Http::Request createHeadRequest();
Http::Request createDeleteRequest();
Http::Request createPatchRequest();
void doGetRequest(Context context, HttpPipeline& pipeline);
void doPutRequest(Context context, HttpPipeline& pipeline);
void doHeadRequest(Context context, HttpPipeline& pipeline);
void doDeleteRequest(Context context, HttpPipeline& pipeline);
void doPatchRequest(Context context, HttpPipeline& pipeline);
void printRespose(std::unique_ptr<Http::Response> response);
void doFileRequest(Context context, HttpPipeline& pipeline);
int main()
{
try
{
// Both requests uses a body buffer to be uploaded that would produce responses with bodyBuffer
auto getRequest = createGetRequest();
auto putRequest = createPutRequest();
auto headRequest = createHeadRequest();
auto deleteRequest = createDeleteRequest();
auto patchRequest = createPatchRequest();
// Create the Transport
std::shared_ptr<HttpTransport> transport = std::make_unique<CurlTransport>();
std::vector<std::unique_ptr<HttpPolicy>> policies;
@ -47,28 +51,15 @@ int main()
// Add the transport policy
policies.push_back(std::make_unique<TransportPolicy>(std::move(transport)));
auto httpPipeline = Http::HttpPipeline(policies);
auto context = Context();
cout << endl << "GET:";
std::unique_ptr<Http::Response> getResponse = httpPipeline.Send(context, getRequest);
printRespose(std::move(getResponse));
cout << endl << "PUT:";
std::unique_ptr<Http::Response> putResponse = httpPipeline.Send(context, putRequest);
printRespose(std::move(putResponse));
cout << endl << "HEAD:";
std::unique_ptr<Http::Response> headResponse = httpPipeline.Send(context, headRequest);
printRespose(std::move(headResponse));
cout << endl << "DELETE:";
std::unique_ptr<Http::Response> deleteResponse = httpPipeline.Send(context, deleteRequest);
printRespose(std::move(deleteResponse));
cout << endl << "PATCH:";
std::unique_ptr<Http::Response> patchResponse = httpPipeline.Send(context, patchRequest);
printRespose(std::move(patchResponse));
// Both requests uses a body buffer to be uploaded that would produce responses with bodyBuffer
doFileRequest(context, httpPipeline);
doGetRequest(context, httpPipeline);
doPutRequest(context, httpPipeline);
doHeadRequest(context, httpPipeline);
doDeleteRequest(context, httpPipeline);
doPatchRequest(context, httpPipeline);
}
catch (Http::CouldNotResolveHostException& e)
{
@ -82,23 +73,82 @@ int main()
return 0;
}
Http::Request createGetRequest()
#ifdef POSIX
void doFileRequest(Context context, HttpPipeline& pipeline)
{
string host("https://httpbin.org/put");
cout << "Creating a Put From File request to" << endl << "Host: " << host << endl;
// Open a file that contains: {{"key":"value"}, {"key2":"value2"}, {"key3":"value3"}}
int fd = open("/home/vagrant/workspace/a", O_RDONLY);
// Create Stream from file starting with offset 18 to 100
auto requestBodyStream = FileBodyStream(fd, 18, 100);
// Limit stream to read up to 17 postions ( {"key2","value2"} )
auto limitedStream = LimitBodyStream(&requestBodyStream, 17);
// Send request
auto request = Http::Request(Http::HttpMethod::Put, host, &limitedStream);
request.AddHeader("Content-Length", std::to_string(limitedStream.Length()));
auto response = pipeline.Send(context, request);
// File can be closed at this point
close(fd);
// Response Stream
auto bodyStreamResponse = response->GetBodyStream();
// limit to read response
auto limitedResponse = LimitBodyStream(bodyStreamResponse.get(), 300);
auto body = Http::BodyStream::ReadToEnd(context, limitedResponse);
cout << body.data() << endl << body.size() << endl;
}
#endif // Posix
#ifdef WINDOWS
void doFileRequest(Context context, HttpPipeline& pipeline)
{
(void)pipeline;
string host("https://httpbin.org/put");
cout << "Creating a File request to" << endl << "Host: " << host << endl;
// NOTE: To run the sample: Create folder 'home' on main hard drive (like C:/) and then add a file
// `a` in there
//
HANDLE hFile = CreateFile(
"/home/a",
GENERIC_READ,
FILE_SHARE_READ,
NULL,
OPEN_EXISTING,
FILE_FLAG_SEQUENTIAL_SCAN,
NULL);
auto requestBodyStream = std::make_unique<FileBodyStream>(hFile, 20, 200);
auto body = Http::BodyStream::ReadToEnd(context, *requestBodyStream);
cout << body.data() << endl << body.size() << endl;
CloseHandle(hFile);
}
#endif // Windows
void doGetRequest(Context context, HttpPipeline& pipeline)
{
string host("https://httpbin.org/get");
cout << "Creating a GET request to" << endl << "Host: " << host << endl;
auto request
= Http::Request(Http::HttpMethod::Get, host, std::make_unique<MemoryBodyStream>(buffer));
auto requestBodyStream = std::make_unique<MemoryBodyStream>(buffer.data(), buffer.size());
auto request = Http::Request(Http::HttpMethod::Get, host, requestBodyStream.get());
request.AddHeader("one", "header");
request.AddHeader("other", "header2");
request.AddHeader("header", "value");
request.AddHeader("Host", "httpbin.org");
return request;
cout << endl << "GET:";
printRespose(std::move(pipeline.Send(context, request)));
}
Http::Request createPutRequest()
void doPutRequest(Context context, HttpPipeline& pipeline)
{
string host("https://httpbin.org/put");
cout << "Creating a PUT request to" << endl << "Host: " << host << endl;
@ -112,8 +162,8 @@ Http::Request createPutRequest()
buffer[BufferSize - 2] = '\"';
buffer[BufferSize - 1] = '}'; // set buffer to look like a Json `{"x":"xxx...xxx"}`
auto request
= Http::Request(Http::HttpMethod::Put, host, std::make_unique<MemoryBodyStream>(buffer));
auto requestBodyStream = std::make_unique<MemoryBodyStream>(buffer.data(), buffer.size());
auto request = Http::Request(Http::HttpMethod::Put, host, requestBodyStream.get());
request.AddHeader("one", "header");
request.AddHeader("other", "header2");
request.AddHeader("header", "value");
@ -121,7 +171,7 @@ Http::Request createPutRequest()
request.AddHeader("Host", "httpbin.org");
request.AddHeader("Content-Length", std::to_string(BufferSize));
return request;
printRespose(std::move(pipeline.Send(context, request)));
}
void printRespose(std::unique_ptr<Http::Response> response)
@ -149,19 +199,17 @@ void printRespose(std::unique_ptr<Http::Response> response)
// No body in response
return;
}
auto responseBodyVector = Http::Response::ConstructBodyBufferFromStream(bodyStream.get());
if (responseBodyVector != nullptr)
{
// print body only if response has a body. Head Response won't have body
auto bodyVector = *responseBodyVector.get();
cout << std::string(bodyVector.begin(), bodyVector.end()) << endl;
}
Context context;
auto responseBodyVector = Http::BodyStream::ReadToEnd(context, *bodyStream);
std::cin.ignore();
// print body only if response has a body. Head Response won't have body
cout << std::string(responseBodyVector.begin(), responseBodyVector.end()) << endl;
// std::cin.ignore();
return;
}
Http::Request createPatchRequest()
void doPatchRequest(Context context, HttpPipeline& pipeline)
{
string host("https://httpbin.org/patch");
cout << "Creating an PATCH request to" << endl << "Host: " << host << endl;
@ -169,10 +217,11 @@ Http::Request createPatchRequest()
auto request = Http::Request(Http::HttpMethod::Patch, host);
request.AddHeader("Host", "httpbin.org");
return request;
cout << endl << "PATCH:";
printRespose(std::move(pipeline.Send(context, request)));
}
Http::Request createDeleteRequest()
void doDeleteRequest(Context context, HttpPipeline& pipeline)
{
string host("https://httpbin.org/delete");
cout << "Creating an DELETE request to" << endl << "Host: " << host << endl;
@ -180,10 +229,11 @@ Http::Request createDeleteRequest()
auto request = Http::Request(Http::HttpMethod::Delete, host);
request.AddHeader("Host", "httpbin.org");
return request;
cout << endl << "DELETE:";
printRespose(std::move(pipeline.Send(context, request)));
}
Http::Request createHeadRequest()
void doHeadRequest(Context context, HttpPipeline& pipeline)
{
string host("https://httpbin.org/get");
cout << "Creating an HEAD request to" << endl << "Host: " << host << endl;
@ -191,5 +241,6 @@ Http::Request createHeadRequest()
auto request = Http::Request(Http::HttpMethod::Head, host);
request.AddHeader("Host", "httpbin.org");
return request;
cout << endl << "HEAD:";
printRespose(std::move(pipeline.Send(context, request)));
}

View File

@ -25,27 +25,19 @@ constexpr auto BufferSize = 50;
std::vector<uint8_t> buffer(BufferSize);
// For StreamBody
constexpr auto StreamSize = 200;
constexpr auto StreamSize = 1024; // 100 MB
std::array<uint8_t, StreamSize> bufferStream;
Http::Request createGetRequest();
Http::Request createNoPathGetRequest();
Http::Request createPutRequest();
Http::Request createPutStreamRequest();
void printStream(std::unique_ptr<Http::Response> response);
void doGetRequest(Context context, HttpPipeline& pipeline);
void doNoPathGetRequest(Context context, HttpPipeline& pipeline);
void doPutRequest(Context context, HttpPipeline& pipeline);
void doPutStreamRequest(Context context, HttpPipeline& pipeline);
void printStream(Azure::Core::Context& context, std::unique_ptr<Http::Response> response);
int main()
{
try
{
// GetRequest. No body, produces stream
auto getRequest = createGetRequest();
// no path request
auto noPathRequest = createNoPathGetRequest();
// PutRequest. buffer body, produces stream
auto putRequest = createPutRequest();
// PutRequest. Stream body, produces stream
auto putStreamRequest = createPutStreamRequest();
// Create the Transport
std::shared_ptr<HttpTransport> transport = std::make_unique<CurlTransport>();
@ -64,17 +56,10 @@ int main()
std::unique_ptr<Http::Response> response;
auto context = Context();
response = httpPipeline.Send(context, getRequest);
printStream(std::move(response));
response = httpPipeline.Send(context, putRequest);
printStream(std::move(response));
response = httpPipeline.Send(context, putStreamRequest);
printStream(std::move(response));
response = httpPipeline.Send(context, noPathRequest);
printStream(std::move(response));
doGetRequest(context, httpPipeline);
doPutStreamRequest(context, httpPipeline);
doNoPathGetRequest(context, httpPipeline);
doPutRequest(context, httpPipeline);
}
catch (Http::CouldNotResolveHostException& e)
{
@ -89,7 +74,7 @@ int main()
}
// Request GET with no path
Http::Request createNoPathGetRequest()
void doNoPathGetRequest(Context context, HttpPipeline& pipeline)
{
string host("https://httpbin.org");
cout << "Creating a GET request to" << endl << "Host: " << host << endl;
@ -97,11 +82,11 @@ Http::Request createNoPathGetRequest()
auto request = Http::Request(Http::HttpMethod::Get, host);
request.AddHeader("Host", "httpbin.org");
return request;
printStream(context, std::move(pipeline.Send(context, request)));
}
// Request GET with no body that produces stream response
Http::Request createGetRequest()
void doGetRequest(Context context, HttpPipeline& pipeline)
{
string host("https://httpbin.org/get//////?arg=1&arg2=2");
cout << "Creating a GET request to" << endl << "Host: " << host << endl;
@ -116,11 +101,12 @@ Http::Request createGetRequest()
request.AddQueryParameter("dinamicArg", "3");
request.AddQueryParameter("dinamicArg2", "4");
return request;
auto response = pipeline.Send(context, request);
printStream(context, std::move(response));
}
// Put Request with bodyBufferBody that produces stream
Http::Request createPutRequest()
void doPutRequest(Context context, HttpPipeline& pipeline)
{
string host("https://httpbin.org/put/?a=1");
cout << "Creating a PUT request to" << endl << "Host: " << host << endl;
@ -134,21 +120,21 @@ Http::Request createPutRequest()
buffer[BufferSize - 2] = '\"';
buffer[BufferSize - 1] = '}'; // set buffer to look like a Json `{"x":"xxx...xxx"}`
auto request
= Http::Request(Http::HttpMethod::Put, host, std::make_unique<MemoryBodyStream>(buffer));
MemoryBodyStream requestBodyStream(buffer.data(), buffer.size());
auto request = Http::Request(Http::HttpMethod::Put, host, &requestBodyStream);
request.AddHeader("one", "header");
request.AddHeader("other", "header2");
request.AddHeader("header", "value");
request.AddHeader("Content-Length", std::to_string(BufferSize));
return request;
printStream(context, std::move(pipeline.Send(context, request)));
}
// Put Request with stream body that produces stream
Http::Request createPutStreamRequest()
void doPutStreamRequest(Context context, HttpPipeline& pipeline)
{
string host("https://httpbin.org/put");
string host("https://putsreq.com/SDywlz7z6j90bJFNvyTO");
cout << "Creating a PUT request to" << endl << "Host: " << host << endl;
bufferStream.fill('1');
@ -160,8 +146,9 @@ Http::Request createPutStreamRequest()
bufferStream[StreamSize - 2] = '\"';
bufferStream[StreamSize - 1] = '}'; // set buffer to look like a Json `{"1":"111...111"}`
auto request
= Http::Request(Http::HttpMethod::Put, host, std::make_unique<MemoryBodyStream>(buffer));
auto requestBodyStream
= std::make_unique<MemoryBodyStream>(bufferStream.data(), bufferStream.size());
auto request = Http::Request(Http::HttpMethod::Put, host, requestBodyStream.get());
request.AddHeader("one", "header");
request.AddHeader("other", "header2");
request.AddHeader("header", "value");
@ -172,15 +159,15 @@ Http::Request createPutStreamRequest()
request.AddQueryParameter("dinamicArg2", "1");
request.AddQueryParameter("dinamicArg3", "1");
return request;
printStream(context, std::move(pipeline.Send(context, request)));
}
void printStream(std::unique_ptr<Http::Response> response)
void printStream(Context& context, std::unique_ptr<Http::Response> response)
{
if (response == nullptr)
{
cout << "Error. Response returned as null";
std::cin.ignore();
// std::cin.ignore();
return;
}
@ -197,17 +184,16 @@ void printStream(std::unique_ptr<Http::Response> response)
uint8_t b[100];
auto bodyStream = response->GetBodyStream();
uint64_t readCount;
int64_t readCount;
do
{
readCount = bodyStream->Read(b, 10);
readCount = bodyStream->Read(context, b, 10);
cout << std::string(b, b + readCount);
} while (readCount > 0);
cout << endl << "Press any key to continue..." << endl;
std::cin.ignore();
// std::cin.ignore();
bodyStream->Close();
return;
}

View File

@ -18,7 +18,6 @@ set (AZURE_STORAGE_BLOB_HEADER
inc/common/shared_key_policy.hpp
inc/common/crypt.hpp
inc/common/xml_wrapper.hpp
inc/common/memory_stream.hpp
inc/blobs/blob.hpp
inc/blobs/blob_service_client.hpp
inc/blobs/blob_container_client.hpp
@ -58,7 +57,6 @@ set (AZURE_STORAGE_BLOB_SOURCE
src/common/storage_error.cpp
src/common/crypt.cpp
src/common/xml_wrapper.cpp
src/common/memory_stream.cpp
src/blobs/blob_service_client.cpp
src/blobs/blob_container_client.cpp
src/blobs/blob_client.cpp
@ -77,12 +75,12 @@ set (AZURE_STORAGE_DATALAKE_SOURCE
set(AZURE_STORAGE_HEADER
${AZURE_STORAGE_BLOB_HEADER}
${AZURE_STORAGE_DATALAKE_HEADER}
# ${AZURE_STORAGE_DATALAKE_HEADER}
)
set(AZURE_STORAGE_SOURCE
${AZURE_STORAGE_BLOB_SOURCE}
${AZURE_STORAGE_DATALAKE_SOURCE}
# ${AZURE_STORAGE_DATALAKE_SOURCE}
)
add_library(azure-storage ${AZURE_STORAGE_HEADER} ${AZURE_STORAGE_SOURCE})

View File

@ -117,7 +117,7 @@ namespace Azure { namespace Storage { namespace Blobs {
* @return A BlobAppendInfo describing the state of the updated append blob.
*/
BlobAppendInfo AppendBlock(
std::unique_ptr<Azure::Core::Http::BodyStream> content,
Azure::Core::Http::BodyStream& content,
const AppendBlockOptions& options = AppendBlockOptions());
/**

View File

@ -9,4 +9,3 @@
#include "blobs/blob_service_client.hpp"
#include "blobs/block_blob_client.hpp"
#include "blobs/page_blob_client.hpp"
#include "common/memory_stream.hpp"

View File

@ -110,7 +110,7 @@ namespace Azure { namespace Storage { namespace Blobs {
* @return A BlobContentInfo describing the state of the updated block blob.
*/
BlobContentInfo Upload(
std::unique_ptr<Azure::Core::Http::BodyStream> content,
Azure::Core::Http::BodyStream& content,
const UploadBlockBlobOptions& options = UploadBlockBlobOptions()) const;
/**
@ -125,7 +125,7 @@ namespace Azure { namespace Storage { namespace Blobs {
*/
BlockInfo StageBlock(
const std::string& blockId,
std::unique_ptr<Azure::Core::Http::BodyStream> content,
Azure::Core::Http::BodyStream& content,
const StageBlockOptions& options = StageBlockOptions()) const;
/**

File diff suppressed because it is too large Load Diff

View File

@ -141,7 +141,7 @@ namespace Azure { namespace Storage { namespace Blobs {
* PageInfo describing the state of the updated pages.
*/
PageInfo UploadPages(
std::unique_ptr<Azure::Core::Http::BodyStream> content,
Azure::Core::Http::BodyStream& content,
int64_t offset,
const UploadPagesOptions& options = UploadPagesOptions());

View File

@ -1,40 +0,0 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
#pragma once
#include "http/stream.hpp"
#include <memory>
#include <vector>
namespace Azure { namespace Storage {
class MemoryStream : public Azure::Core::Http::BodyStream {
public:
explicit MemoryStream(const uint8_t* data, std::size_t length) : m_data(data), m_length(length)
{
}
~MemoryStream() override {}
uint64_t Length() const override { return m_length; }
void Rewind() override { m_offset = 0; }
uint64_t Read(uint8_t* buffer, uint64_t count) override;
void Close() override {}
private:
const uint8_t* m_data;
std::size_t m_length;
std::size_t m_offset = 0;
};
inline std::unique_ptr<MemoryStream> CreateMemoryStream(const uint8_t* data, std::size_t length)
{
return std::make_unique<MemoryStream>(data, length);
}
}} // namespace Azure::Storage

View File

@ -8,7 +8,7 @@ add_executable (
main.cpp
samples_common.hpp
blob_getting_started.cpp
datalake_getting_started.cpp
# datalake_getting_started.cpp
)
target_link_libraries(azure-storage-sample PRIVATE azure-storage)

View File

@ -29,26 +29,21 @@ void BlobsGettingStarted()
BlockBlobClient blobClient = containerClient.GetBlockBlobClient(blobName);
auto blobContentStream = Azure::Storage::CreateMemoryStream(
auto blobContentStream = Azure::Core::Http::MemoryBodyStream(
reinterpret_cast<const uint8_t*>(blobContent.data()), blobContent.length());
blobClient.Upload(std::move(blobContentStream));
blobClient.Upload(blobContentStream);
std::map<std::string, std::string> blobMetadata = {{"key1", "value1"}, {"key2", "value2"}};
blobClient.SetMetadata(blobMetadata);
auto blobDownloadContent = blobClient.Download();
blobContent.resize(static_cast<std::size_t>(blobDownloadContent.BodyStream->Length()));
std::size_t offset = 0;
while (true)
{
auto bytesRead = blobDownloadContent.BodyStream->Read(
reinterpret_cast<uint8_t*>(&blobContent[offset]), blobContent.length() - offset);
offset += static_cast<std::size_t>(bytesRead);
if (bytesRead == 0 || offset == blobContent.length())
{
break;
}
}
Azure::Core::Context context;
Azure::Core::Http::BodyStream::ReadToCount(
context,
*blobDownloadContent.BodyStream,
reinterpret_cast<uint8_t*>(&blobContent[0]),
blobDownloadContent.BodyStream->Length());
std::cout << blobContent << std::endl;

View File

@ -71,11 +71,10 @@ namespace Azure { namespace Storage { namespace Blobs {
}
BlobAppendInfo AppendBlobClient::AppendBlock(
std::unique_ptr<Azure::Core::Http::BodyStream> content,
Azure::Core::Http::BodyStream& content,
const AppendBlockOptions& options)
{
BlobRestClient::AppendBlob::AppendBlockOptions protocolLayerOptions;
protocolLayerOptions.BodyStream = std::move(content);
protocolLayerOptions.ContentMD5 = options.ContentMD5;
protocolLayerOptions.ContentCRC64 = options.ContentCRC64;
protocolLayerOptions.LeaseId = options.LeaseId;
@ -86,7 +85,7 @@ namespace Azure { namespace Storage { namespace Blobs {
protocolLayerOptions.IfMatch = options.IfMatch;
protocolLayerOptions.IfNoneMatch = options.IfNoneMatch;
return BlobRestClient::AppendBlob::AppendBlock(
options.Context, *m_pipeline, m_blobUrl.ToString(), protocolLayerOptions);
options.Context, *m_pipeline, m_blobUrl.ToString(), content, protocolLayerOptions);
}
BlobAppendInfo AppendBlobClient::AppendBlockFromUri(

View File

@ -58,11 +58,10 @@ namespace Azure { namespace Storage { namespace Blobs {
}
BlobContentInfo BlockBlobClient::Upload(
std::unique_ptr<Azure::Core::Http::BodyStream> content,
Azure::Core::Http::BodyStream& content,
const UploadBlockBlobOptions& options) const
{
BlobRestClient::BlockBlob::UploadOptions protocolLayerOptions;
protocolLayerOptions.BodyStream = std::move(content);
protocolLayerOptions.ContentMD5 = options.ContentMD5;
protocolLayerOptions.ContentCRC64 = options.ContentCRC64;
protocolLayerOptions.Properties = options.Properties;
@ -73,21 +72,20 @@ namespace Azure { namespace Storage { namespace Blobs {
protocolLayerOptions.IfMatch = options.IfMatch;
protocolLayerOptions.IfNoneMatch = options.IfNoneMatch;
return BlobRestClient::BlockBlob::Upload(
options.Context, *m_pipeline, m_blobUrl.ToString(), protocolLayerOptions);
options.Context, *m_pipeline, m_blobUrl.ToString(), content, protocolLayerOptions);
}
BlockInfo BlockBlobClient::StageBlock(
const std::string& blockId,
std::unique_ptr<Azure::Core::Http::BodyStream> content,
Azure::Core::Http::BodyStream& content,
const StageBlockOptions& options) const
{
BlobRestClient::BlockBlob::StageBlockOptions protocolLayerOptions;
protocolLayerOptions.BodyStream = std::move(content);
protocolLayerOptions.BlockId = blockId;
protocolLayerOptions.ContentMD5 = options.ContentMD5;
protocolLayerOptions.ContentCRC64 = options.ContentCRC64;
return BlobRestClient::BlockBlob::StageBlock(
options.Context, *m_pipeline, m_blobUrl.ToString(), protocolLayerOptions);
options.Context, *m_pipeline, m_blobUrl.ToString(), content, protocolLayerOptions);
}
BlockInfo BlockBlobClient::StageBlockFromUri(

View File

@ -74,14 +74,12 @@ namespace Azure { namespace Storage { namespace Blobs {
}
PageInfo PageBlobClient::UploadPages(
std::unique_ptr<Azure::Core::Http::BodyStream> content,
Azure::Core::Http::BodyStream& content,
int64_t offset,
const UploadPagesOptions& options)
{
BlobRestClient::PageBlob::UploadPagesOptions protocolLayerOptions;
protocolLayerOptions.BodyStream = std::move(content);
protocolLayerOptions.Range
= std::make_pair(offset, offset + protocolLayerOptions.BodyStream->Length() - 1);
protocolLayerOptions.Range = std::make_pair(offset, offset + content.Length() - 1);
protocolLayerOptions.ContentMD5 = options.ContentMD5;
protocolLayerOptions.ContentCRC64 = options.ContentCRC64;
protocolLayerOptions.LeaseId = options.LeaseId;
@ -90,7 +88,7 @@ namespace Azure { namespace Storage { namespace Blobs {
protocolLayerOptions.IfMatch = options.IfMatch;
protocolLayerOptions.IfNoneMatch = options.IfNoneMatch;
return BlobRestClient::PageBlob::UploadPages(
options.Context, *m_pipeline, m_blobUrl.ToString(), protocolLayerOptions);
options.Context, *m_pipeline, m_blobUrl.ToString(), content, protocolLayerOptions);
}
PageInfo PageBlobClient::UploadPagesFromUri(

View File

@ -1,18 +0,0 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
#include "common/memory_stream.hpp"
#include <algorithm>
namespace Azure { namespace Storage {
uint64_t MemoryStream::Read(uint8_t* buffer, uint64_t count)
{
std::size_t readSize = static_cast<std::size_t>(std::min(count, static_cast<uint64_t>(m_length - m_offset)));
std::copy(m_data + m_offset, m_data + m_offset + readSize, buffer);
m_offset += readSize;
return readSize;
}
}} // namespace Azure::Storage

View File

@ -12,10 +12,12 @@ namespace Azure { namespace Storage {
std::unique_ptr<Azure::Core::Http::Response> response)
{
auto bodyStream = response->GetBodyStream();
auto bodyBuffer = std::make_unique<std::vector<uint8_t>>();
std::vector<uint8_t> bodyBuffer;
if (bodyStream != nullptr)
{
bodyBuffer = Azure::Core::Http::Response::ConstructBodyBufferFromStream(bodyStream.get());
// TODO: get the real context somewhere
Azure::Core::Context context;
bodyBuffer = Azure::Core::Http::BodyStream::ReadToEnd(context, *bodyStream);
}
auto httpStatusCode = response->GetStatusCode();
@ -40,7 +42,7 @@ namespace Azure { namespace Storage {
if (response->GetHeaders().at("Content-Type").find("xml") != std::string::npos)
{
auto xmlReader
= XmlReader(reinterpret_cast<const char*>(bodyBuffer->data()), bodyBuffer->size());
= XmlReader(reinterpret_cast<const char*>(bodyBuffer.data()), bodyBuffer.size());
enum class XmlTagName
{
@ -101,12 +103,12 @@ namespace Azure { namespace Storage {
else if (response->GetHeaders().at("Content-Type").find("html") != std::string::npos)
{
// TODO: add a refined message parsed from result.
message = std::string(bodyBuffer->begin(), bodyBuffer->end());
message = std::string(bodyBuffer.begin(), bodyBuffer.end());
}
else
{
// TODO: add a refined message parsed from result.
message = std::string(bodyBuffer->begin(), bodyBuffer->end());
message = std::string(bodyBuffer.begin(), bodyBuffer.end());
}
}

View File

@ -29,8 +29,9 @@ namespace Azure { namespace Storage { namespace Test {
m_blobUploadOptions.Properties.ContentEncoding = "identify";
m_blobUploadOptions.Properties.ContentMD5 = "";
m_appendBlobClient->Create(m_blobUploadOptions);
m_appendBlobClient->AppendBlock(
Azure::Storage::CreateMemoryStream(m_blobContent.data(), m_blobContent.size()));
auto blockContent
= Azure::Core::Http::MemoryBodyStream(m_blobContent.data(), m_blobContent.size());
m_appendBlobClient->AppendBlock(blockContent);
m_blobUploadOptions.Properties.ContentMD5 = m_appendBlobClient->GetProperties().ContentMD5;
}
@ -47,34 +48,29 @@ namespace Azure { namespace Storage { namespace Test {
EXPECT_EQ(properties.CommittedBlockCount.GetValue(), 0);
EXPECT_EQ(properties.ContentLength, 0);
appendBlobClient.AppendBlock(
Azure::Storage::CreateMemoryStream(m_blobContent.data(), m_blobContent.size()));
auto blockContent
= Azure::Core::Http::MemoryBodyStream(m_blobContent.data(), m_blobContent.size());
appendBlobClient.AppendBlock(blockContent);
properties = appendBlobClient.GetProperties();
EXPECT_EQ(properties.CommittedBlockCount.GetValue(), 1);
EXPECT_EQ(properties.ContentLength, static_cast<int64_t>(m_blobContent.size()));
Azure::Storage::Blobs::AppendBlockOptions options;
options.AppendPosition = 1_MB;
EXPECT_THROW(
appendBlobClient.AppendBlock(
Azure::Storage::CreateMemoryStream(m_blobContent.data(), m_blobContent.size()),
options),
std::runtime_error);
blockContent = Azure::Core::Http::MemoryBodyStream(m_blobContent.data(), m_blobContent.size());
EXPECT_THROW(appendBlobClient.AppendBlock(blockContent, options), std::runtime_error);
options.AppendPosition = properties.ContentLength;
appendBlobClient.AppendBlock(
Azure::Storage::CreateMemoryStream(m_blobContent.data(), m_blobContent.size()), options);
blockContent = Azure::Core::Http::MemoryBodyStream(m_blobContent.data(), m_blobContent.size());
appendBlobClient.AppendBlock(blockContent, options);
properties = appendBlobClient.GetProperties();
options = Azure::Storage::Blobs::AppendBlockOptions();
options.MaxSize = properties.ContentLength + m_blobContent.size() - 1;
EXPECT_THROW(
appendBlobClient.AppendBlock(
Azure::Storage::CreateMemoryStream(m_blobContent.data(), m_blobContent.size()),
options),
std::runtime_error);
blockContent = Azure::Core::Http::MemoryBodyStream(m_blobContent.data(), m_blobContent.size());
EXPECT_THROW(appendBlobClient.AppendBlock(blockContent, options), std::runtime_error);
options.MaxSize = properties.ContentLength + m_blobContent.size();
appendBlobClient.AppendBlock(
Azure::Storage::CreateMemoryStream(m_blobContent.data(), m_blobContent.size()), options);
blockContent = Azure::Core::Http::MemoryBodyStream(m_blobContent.data(), m_blobContent.size());
appendBlobClient.AppendBlock(blockContent, options);
// TODO: AppendBlockFromUri must be authorized with SAS, but we don't have SAS for now.

View File

@ -85,7 +85,8 @@ namespace Azure { namespace Storage { namespace Test {
{
std::string blobName = prefix1 + baseName + std::to_string(i);
auto blobClient = m_blobContainerClient->GetBlockBlobClient(blobName);
blobClient.Upload(Azure::Storage::CreateMemoryStream(nullptr, 0));
auto emptyContent = Azure::Core::Http::MemoryBodyStream(nullptr, 0);
blobClient.Upload(emptyContent);
p1Blobs.insert(blobName);
p1p2Blobs.insert(blobName);
}
@ -93,7 +94,8 @@ namespace Azure { namespace Storage { namespace Test {
{
std::string blobName = prefix2 + baseName + std::to_string(i);
auto blobClient = m_blobContainerClient->GetBlockBlobClient(blobName);
blobClient.Upload(Azure::Storage::CreateMemoryStream(nullptr, 0));
auto emptyContent = Azure::Core::Http::MemoryBodyStream(nullptr, 0);
blobClient.Upload(emptyContent);
p2Blobs.insert(blobName);
p1p2Blobs.insert(blobName);
}
@ -151,7 +153,8 @@ namespace Azure { namespace Storage { namespace Test {
{
blobName = blobName + delimiter + RandomString();
auto blobClient = m_blobContainerClient->GetBlockBlobClient(blobName);
blobClient.Upload(Azure::Storage::CreateMemoryStream(nullptr, 0));
auto emptyContent = Azure::Core::Http::MemoryBodyStream(nullptr, 0);
blobClient.Upload(emptyContent);
blobs.insert(blobName);
}

View File

@ -44,9 +44,9 @@ namespace Azure { namespace Storage { namespace Test {
m_blobUploadOptions.Properties.ContentEncoding = "identity";
m_blobUploadOptions.Properties.ContentMD5 = "";
m_blobUploadOptions.Tier = Azure::Storage::Blobs::AccessTier::Hot;
m_blockBlobClient->Upload(
Azure::Storage::CreateMemoryStream(m_blobContent.data(), m_blobContent.size()),
m_blobUploadOptions);
auto blobContent
= Azure::Core::Http::MemoryBodyStream(m_blobContent.data(), m_blobContent.size());
m_blockBlobClient->Upload(blobContent, m_blobUploadOptions);
m_blobUploadOptions.Properties.ContentMD5 = m_blockBlobClient->GetProperties().ContentMD5;
}
@ -56,9 +56,9 @@ namespace Azure { namespace Storage { namespace Test {
{
auto blockBlobClient = Azure::Storage::Blobs::BlockBlobClient::CreateFromConnectionString(
StandardStorageConnectionString(), m_containerName, RandomString());
blockBlobClient.Upload(
Azure::Storage::CreateMemoryStream(m_blobContent.data(), m_blobContent.size()),
m_blobUploadOptions);
auto blobContent
= Azure::Core::Http::MemoryBodyStream(m_blobContent.data(), m_blobContent.size());
blockBlobClient.Upload(blobContent, m_blobUploadOptions);
blockBlobClient.Delete();
EXPECT_THROW(blockBlobClient.Delete(), std::runtime_error);
@ -131,8 +131,8 @@ namespace Azure { namespace Storage { namespace Test {
auto snapshotClient = m_blockBlobClient->WithSnapshot(res.Snapshot);
EXPECT_EQ(ReadBodyStream(snapshotClient.Download().BodyStream), m_blobContent);
EXPECT_EQ(snapshotClient.GetProperties().Metadata, m_blobUploadOptions.Metadata);
EXPECT_THROW(
snapshotClient.Upload(Azure::Storage::CreateMemoryStream(nullptr, 0)), std::runtime_error);
auto emptyContent = Azure::Core::Http::MemoryBodyStream(nullptr, 0);
EXPECT_THROW(snapshotClient.Upload(emptyContent), std::runtime_error);
EXPECT_THROW(snapshotClient.SetMetadata({}), std::runtime_error);
EXPECT_THROW(
snapshotClient.SetAccessTier(Azure::Storage::Blobs::AccessTier::Cool), std::runtime_error);
@ -150,8 +150,9 @@ namespace Azure { namespace Storage { namespace Test {
{
auto blockBlobClient = Azure::Storage::Blobs::BlockBlobClient::CreateFromConnectionString(
StandardStorageConnectionString(), m_containerName, RandomString());
blockBlobClient.Upload(
Azure::Storage::CreateMemoryStream(m_blobContent.data(), m_blobContent.size()));
auto blobContent
= Azure::Core::Http::MemoryBodyStream(m_blobContent.data(), m_blobContent.size());
blockBlobClient.Upload(blobContent);
blockBlobClient.SetMetadata(m_blobUploadOptions.Metadata);
blockBlobClient.SetAccessTier(Azure::Storage::Blobs::AccessTier::Cool);
Azure::Storage::Blobs::SetBlobHttpHeadersOptions options;
@ -191,8 +192,9 @@ namespace Azure { namespace Storage { namespace Test {
std::vector<uint8_t> block1Content;
block1Content.resize(100);
RandomBuffer(reinterpret_cast<char*>(&block1Content[0]), block1Content.size());
blockBlobClient.StageBlock(
blockId1, Azure::Storage::CreateMemoryStream(block1Content.data(), block1Content.size()));
auto blockContent
= Azure::Core::Http::MemoryBodyStream(block1Content.data(), block1Content.size());
blockBlobClient.StageBlock(blockId1, blockContent);
Azure::Storage::Blobs::CommitBlockListOptions options;
options.Properties = m_blobUploadOptions.Properties;
options.Metadata = m_blobUploadOptions.Metadata;

View File

@ -29,8 +29,9 @@ namespace Azure { namespace Storage { namespace Test {
m_blobUploadOptions.Properties.ContentEncoding = "identity";
m_blobUploadOptions.Properties.ContentMD5 = "";
m_pageBlobClient->Create(m_blobContent.size(), m_blobUploadOptions);
m_pageBlobClient->UploadPages(
Azure::Storage::CreateMemoryStream(m_blobContent.data(), m_blobContent.size()), 0);
auto pageContent
= Azure::Core::Http::MemoryBodyStream(m_blobContent.data(), m_blobContent.size());
m_pageBlobClient->UploadPages(pageContent, 0);
m_blobUploadOptions.Properties.ContentMD5 = m_pageBlobClient->GetProperties().ContentMD5;
}
@ -68,8 +69,8 @@ namespace Azure { namespace Storage { namespace Test {
auto pageBlobClient = Azure::Storage::Blobs::PageBlobClient::CreateFromConnectionString(
StandardStorageConnectionString(), m_containerName, RandomString());
pageBlobClient.Create(8_KB, m_blobUploadOptions);
pageBlobClient.UploadPages(
Azure::Storage::CreateMemoryStream(blobContent.data(), blobContent.size()), 2_KB);
auto pageContent = Azure::Core::Http::MemoryBodyStream(blobContent.data(), blobContent.size());
pageBlobClient.UploadPages(pageContent, 2_KB);
// |_|_|x|x| |x|x|_|_|
blobContent.insert(blobContent.begin(), static_cast<std::size_t>(2_KB), '\x00');
blobContent.resize(static_cast<std::size_t>(8_KB), '\x00');
@ -101,8 +102,8 @@ namespace Azure { namespace Storage { namespace Test {
auto snapshot = pageBlobClient.CreateSnapshot().Snapshot;
// |_|_|_|x| |x|x|_|_| This is what's in snapshot
blobContent.resize(static_cast<std::size_t>(1_KB));
pageBlobClient.UploadPages(
Azure::Storage::CreateMemoryStream(blobContent.data(), blobContent.size()), 0);
auto pageClient = Azure::Core::Http::MemoryBodyStream(blobContent.data(), blobContent.size());
pageBlobClient.UploadPages(pageClient, 0);
pageBlobClient.ClearPages(3_KB, 1_KB);
// |x|_|_|_| |x|x|_|_|

View File

@ -132,42 +132,4 @@ namespace Azure { namespace Storage { namespace Test {
}
}
std::vector<uint8_t> ReadBodyStream(std::unique_ptr<Azure::Core::Http::BodyStream>& stream)
{
std::vector<uint8_t> body;
if (stream->Length() == static_cast<decltype(stream->Length())>(-1))
{
std::size_t bufferSize = static_cast<std::size_t>(16_KB);
auto readBuffer = std::make_unique<uint8_t[]>(bufferSize);
while (true)
{
auto bytesRead = stream->Read(readBuffer.get(), bufferSize);
if (bytesRead == 0)
{
break;
}
body.insert(body.end(), readBuffer.get(), readBuffer.get() + bytesRead);
}
}
else
{
body.resize(static_cast<std::size_t>(stream->Length()));
std::size_t offset = 0;
while (true)
{
auto bytesRead = stream->Read(&body[offset], body.size() - offset);
offset += static_cast<std::size_t>(bytesRead);
if (bytesRead == 0 || offset == body.size())
{
break;
}
}
if (offset != body.size())
{
throw std::runtime_error("failed to read all content from body stream");
}
}
return body;
}
}}} // namespace Azure::Storage::Test

View File

@ -3,7 +3,7 @@
#pragma once
#include "http/stream.hpp"
#include "http/body_stream.hpp"
#include "gtest/gtest.h"
@ -29,9 +29,14 @@ namespace Azure { namespace Storage { namespace Test {
void RandomBuffer(char* buffer, std::size_t length);
std::vector<uint8_t> ReadBodyStream(std::unique_ptr<Azure::Core::Http::BodyStream>& stream);
inline std::vector<uint8_t> ReadBodyStream(std::unique_ptr<Azure::Core::Http::BodyStream>& stream)
{
Azure::Core::Context context;
return Azure::Core::Http::BodyStream::ReadToEnd(context, *stream);
}
inline std::vector<uint8_t> ReadBodyStream(std::unique_ptr<Azure::Core::Http::BodyStream>&& stream)
inline std::vector<uint8_t> ReadBodyStream(
std::unique_ptr<Azure::Core::Http::BodyStream>&& stream)
{
return ReadBodyStream(stream);
}