Perf fix windows (#686)
* sample for repro * adding repro app * reset win socket for perf windows
This commit is contained in:
parent
21d4e4c5b4
commit
7c32578798
@ -9,9 +9,11 @@
|
||||
|
||||
#include "azure/core/http/http.hpp"
|
||||
#include "azure/core/http/policy.hpp"
|
||||
#include "azure/core/internal/log.hpp"
|
||||
|
||||
#include <chrono>
|
||||
#include <curl/curl.h>
|
||||
#include <functional>
|
||||
#include <list>
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
@ -523,13 +525,25 @@ namespace Azure { namespace Core { namespace Http {
|
||||
: this->m_contentLength == this->m_sessionTotalRead;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief function to log
|
||||
*
|
||||
*/
|
||||
std::function<void(std::string const& message)> m_logger;
|
||||
|
||||
public:
|
||||
/**
|
||||
* @brief Construct a new Curl Session object. Init internal libcurl handler.
|
||||
*
|
||||
* @param request reference to an HTTP Request.
|
||||
*/
|
||||
CurlSession(Request& request) : m_request(request)
|
||||
CurlSession(Request& request)
|
||||
: CurlSession(request, [](std::string const& message) { (void)message; })
|
||||
{
|
||||
}
|
||||
|
||||
CurlSession(Request& request, std::function<void(std::string const& message)> logger)
|
||||
: m_request(request), m_logger(logger)
|
||||
{
|
||||
this->m_connection = CurlConnectionPool::GetCurlConnection(this->m_request);
|
||||
this->m_bodyStartInBuffer = -1;
|
||||
|
||||
@ -312,6 +312,9 @@ namespace Azure { namespace Core { namespace Http {
|
||||
|
||||
/// HTTP retry attempt.
|
||||
static constexpr auto const Retry = Classification(3);
|
||||
|
||||
/// HTTP Transport adapter.
|
||||
static constexpr auto const HttpTransportAdapter = Classification(4);
|
||||
};
|
||||
|
||||
}}} // namespace Azure::Core::Http
|
||||
|
||||
@ -84,6 +84,39 @@ int pollSocketUntilEventOrTimeout(
|
||||
return WSAPoll(&poller, 1, timeout);
|
||||
#endif
|
||||
}
|
||||
|
||||
#ifdef WINDOWS
|
||||
// Windows needs this after every write to socket or peformance would be reduced to 1/4 for
|
||||
// uploading operation.
|
||||
// https://github.com/Azure/azure-sdk-for-cpp/issues/644
|
||||
void WinSocketSetBuffSize(curl_socket_t socket, std::function<void(std::string const& message)> logger)
|
||||
{
|
||||
ULONG ideal;
|
||||
DWORD ideallen;
|
||||
// WSAloctl would get the ideal size for the socket buffer.
|
||||
if (WSAIoctl(socket, SIO_IDEAL_SEND_BACKLOG_QUERY, 0, 0, &ideal, sizeof(ideal), &ideallen, 0, 0)
|
||||
== 0)
|
||||
{
|
||||
// if WSAloctl succeded (returned 0), set the socket buffer size.
|
||||
// Specifies the total per-socket buffer space reserved for sends.
|
||||
// https://docs.microsoft.com/en-us/windows/win32/api/winsock/nf-winsock-setsockopt
|
||||
auto result = setsockopt(socket, SOL_SOCKET, SO_SNDBUF, (const char*)&ideal, sizeof(ideal));
|
||||
logger("Windows - calling setsockopt after uploading chunk. ideal = " + std::to_string(ideal) + " result = " + std::to_string(result));
|
||||
}
|
||||
}
|
||||
#endif // WINDOWS
|
||||
|
||||
// Can be used from anywhere a little simpler
|
||||
inline void LogThis(std::string const& msg)
|
||||
{
|
||||
if (Azure::Core::Logging::Details::ShouldWrite(
|
||||
Azure::Core::Http::LogClassification::HttpTransportAdapter))
|
||||
{
|
||||
Azure::Core::Logging::Details::Write(
|
||||
Azure::Core::Http::LogClassification::HttpTransportAdapter,
|
||||
"[CURL Transport Adapter]: " + msg);
|
||||
}
|
||||
}
|
||||
} // namespace
|
||||
|
||||
using namespace Azure::Core::Http;
|
||||
@ -91,7 +124,8 @@ using namespace Azure::Core::Http;
|
||||
std::unique_ptr<RawResponse> CurlTransport::Send(Context const& context, Request& request)
|
||||
{
|
||||
// Create CurlSession to perform request
|
||||
auto session = std::make_unique<CurlSession>(request);
|
||||
LogThis("Creating a new session.");
|
||||
auto session = std::make_unique<CurlSession>(request, LogThis);
|
||||
CURLcode performing;
|
||||
|
||||
// Try to send the request. If we get CURLE_UNSUPPORTED_PROTOCOL back it means the connection is
|
||||
@ -126,6 +160,7 @@ std::unique_ptr<RawResponse> CurlTransport::Send(Context const& context, Request
|
||||
}
|
||||
}
|
||||
|
||||
LogThis("Request completed. Moving respone out of session and session to response.");
|
||||
// Move Response out of the session
|
||||
auto response = session->GetResponse();
|
||||
// Move the ownership of the CurlSession (bodyStream) to the response
|
||||
@ -152,11 +187,13 @@ CURLcode CurlSession::Perform(Context const& context)
|
||||
auto hostHeader = headers.find("Host");
|
||||
if (hostHeader == headers.end())
|
||||
{
|
||||
this->m_logger("No Host in request headers. Adding it");
|
||||
this->m_request.AddHeader("Host", this->m_request.GetUrl().GetHost());
|
||||
}
|
||||
auto isContentLengthHeaderInRequest = headers.find("content-length");
|
||||
if (isContentLengthHeaderInRequest == headers.end())
|
||||
{
|
||||
this->m_logger("No content-length in headers. Adding it");
|
||||
this->m_request.AddHeader(
|
||||
"content-length", std::to_string(this->m_request.GetBodyStream()->Length()));
|
||||
}
|
||||
@ -165,18 +202,21 @@ CURLcode CurlSession::Perform(Context const& context)
|
||||
// use expect:100 for PUT requests. Server will decide if it can take our request
|
||||
if (this->m_request.GetMethod() == HttpMethod::Put)
|
||||
{
|
||||
this->m_logger("Using 100-continue for PUT request");
|
||||
this->m_request.AddHeader("expect", "100-continue");
|
||||
}
|
||||
|
||||
// Send request. If the connection assigned to this curlSession is closed or the socket is
|
||||
// somehow lost, libcurl will return CURLE_UNSUPPORTED_PROTOCOL
|
||||
// (https://curl.haxx.se/libcurl/c/curl_easy_send.html). Return the error back.
|
||||
this->m_logger("Send request without payload");
|
||||
result = SendRawHttp(context);
|
||||
if (result != CURLE_OK)
|
||||
{
|
||||
return result;
|
||||
}
|
||||
|
||||
this->m_logger("Parse server response");
|
||||
ReadStatusLineAndHeadersFromRawResponse();
|
||||
|
||||
// Upload body for PUT
|
||||
@ -185,13 +225,17 @@ CURLcode CurlSession::Perform(Context const& context)
|
||||
return result;
|
||||
}
|
||||
|
||||
this->m_logger("Check server response before upload starts");
|
||||
|
||||
// Check server response from Expect:100-continue for PUT;
|
||||
// This help to prevent us from start uploading data when Server can't handle it
|
||||
if (this->m_lastStatusCode != HttpStatusCode::Continue)
|
||||
{
|
||||
this->m_logger("Server rejected the upload request");
|
||||
return result; // Won't upload.
|
||||
}
|
||||
|
||||
this->m_logger("Upload payload");
|
||||
if (this->m_bodyStartInBuffer > 0)
|
||||
{
|
||||
// If internal buffer has more data after the 100-continue means Server return an error.
|
||||
@ -206,6 +250,8 @@ CURLcode CurlSession::Perform(Context const& context)
|
||||
{
|
||||
return result; // will throw transport exception before trying to read
|
||||
}
|
||||
|
||||
this->m_logger("Upload completed. Parse server response");
|
||||
ReadStatusLineAndHeadersFromRawResponse();
|
||||
return result;
|
||||
}
|
||||
@ -298,7 +344,9 @@ CURLcode CurlSession::SendBuffer(uint8_t const* buffer, size_t bufferSize)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#ifdef WINDOWS
|
||||
WinSocketSetBuffSize(this->m_curlSocket, this->m_logger);
|
||||
#endif // WINDOWS
|
||||
return CURLE_OK;
|
||||
}
|
||||
|
||||
@ -644,6 +692,9 @@ int64_t CurlSession::ReadFromSocket(uint8_t* buffer, int64_t bufferSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
#ifdef WINDOWS
|
||||
WinSocketSetBuffSize(this->m_curlSocket, this->m_logger);
|
||||
#endif // WINDOWS
|
||||
return readBytes;
|
||||
}
|
||||
|
||||
|
||||
@ -1,8 +1,8 @@
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
#include "azure/core/http/http.hpp"
|
||||
#include "azure/core/http/policy.hpp"
|
||||
#include "azure/core/http/http.hpp"
|
||||
|
||||
using namespace Azure::Core::Http;
|
||||
|
||||
@ -12,6 +12,8 @@ using namespace Azure::Core::Http;
|
||||
Azure::Core::Logging::LogClassification const Azure::Core::Http::LogClassification::Request;
|
||||
Azure::Core::Logging::LogClassification const Azure::Core::Http::LogClassification::Response;
|
||||
Azure::Core::Logging::LogClassification const Azure::Core::Http::LogClassification::Retry;
|
||||
Azure::Core::Logging::LogClassification const
|
||||
Azure::Core::Http::LogClassification::HttpTransportAdapter;
|
||||
#endif
|
||||
|
||||
std::unique_ptr<RawResponse> NextHttpPolicy::Send(Context const& ctx, Request& req)
|
||||
|
||||
@ -8,6 +8,7 @@ set(TARGET_NAME "azure_core_with_curl")
|
||||
set(TARGET_NAME_STREAM "azure_core_with_curl_stream")
|
||||
set(TARGET_NAME_STORAGE_ISSUE_249 "azure_core_storage_issue_249")
|
||||
set(TARGET_NAME_STORAGE_ISSUE_248 "azure_core_storage_issue_248")
|
||||
set(TARGET_NAME_STORAGE_REUSE_CONNECTION "azure_hang")
|
||||
|
||||
project(${TARGET_NAME} LANGUAGES CXX)
|
||||
set(CMAKE_CXX_STANDARD 14)
|
||||
@ -33,9 +34,15 @@ add_executable (
|
||||
azure_core_storage_list_containers_sample
|
||||
)
|
||||
|
||||
add_executable (
|
||||
${TARGET_NAME_STORAGE_REUSE_CONNECTION}
|
||||
azure_hang
|
||||
)
|
||||
|
||||
target_link_libraries(${TARGET_NAME} PRIVATE azure-core)
|
||||
target_link_libraries(${TARGET_NAME_STREAM} PRIVATE azure-core)
|
||||
target_link_libraries(${TARGET_NAME_STORAGE_ISSUE_249} PRIVATE azure-core)
|
||||
target_link_libraries(${TARGET_NAME_STORAGE_ISSUE_248} PRIVATE azure-core)
|
||||
target_link_libraries(${TARGET_NAME_STORAGE_REUSE_CONNECTION} PRIVATE azure-core azure::storage::files::datalake azure-storage-common)
|
||||
|
||||
endif()
|
||||
|
||||
72
sdk/core/azure-core/test/e2e/azure_hang.cpp
Normal file
72
sdk/core/azure-core/test/e2e/azure_hang.cpp
Normal file
@ -0,0 +1,72 @@
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
// SPDX-License-Identifier: MIT
|
||||
|
||||
/**
|
||||
* @brief Performs upload and download for n times trying to catch long running bugs.
|
||||
* Originally set to use 50Mb for 2000 times, which takes ~2.5hours to complete.
|
||||
*
|
||||
*/
|
||||
|
||||
#ifdef _MSC_VER
|
||||
#define _CRT_SECURE_NO_WARNINGS
|
||||
#endif
|
||||
|
||||
#include <iostream>
|
||||
|
||||
#include <azure/storage/blobs/blob.hpp>
|
||||
|
||||
#define BLOB_SIZE 50 * 1024ULL * 1024
|
||||
#define REPEAT_FOR 2000
|
||||
#define CONCURRENCY 16
|
||||
|
||||
int main()
|
||||
{
|
||||
using namespace Azure::Storage::Blobs;
|
||||
|
||||
std::string containerName = "sample-container";
|
||||
std::string blobName = "sample-blob";
|
||||
std::string blobContent;
|
||||
blobContent.resize(BLOB_SIZE, 'c');
|
||||
|
||||
std::string connString(std::getenv("STORAGE_CONNECTION_STRING"));
|
||||
|
||||
auto containerClient = BlobContainerClient::CreateFromConnectionString(connString, containerName);
|
||||
try
|
||||
{
|
||||
containerClient.Create();
|
||||
}
|
||||
catch (std::runtime_error& e)
|
||||
{
|
||||
// The container may already exist
|
||||
std::cout << e.what() << std::endl;
|
||||
}
|
||||
|
||||
BlockBlobClient blobClient = containerClient.GetBlockBlobClient(blobName);
|
||||
|
||||
for (int i = 0; i < REPEAT_FOR; ++i)
|
||||
{
|
||||
{
|
||||
UploadBlockBlobFromOptions options;
|
||||
options.Concurrency = CONCURRENCY;
|
||||
blobClient.UploadFrom(
|
||||
reinterpret_cast<const uint8_t*>(blobContent.data()), blobContent.size(), options);
|
||||
}
|
||||
|
||||
{
|
||||
std::string download;
|
||||
download.resize(BLOB_SIZE, '.');
|
||||
DownloadBlobToOptions options;
|
||||
options.Concurrency = CONCURRENCY;
|
||||
blobClient.DownloadTo(reinterpret_cast<uint8_t*>(&download[0]), download.size(), options);
|
||||
|
||||
// make sure download content is the one expected
|
||||
if (download != blobContent)
|
||||
{
|
||||
std::cout << "Downloaded content is not the same" << std::endl;
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
std::cout << i << std::endl;
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user