Reworked EventHubs Stress test to export information to OpenTelemetry (#5370)

# Significant rewrite of Eventhubs stress test.

## Cleaned up implementation of EventHubs stress/reliability test

* Reworked stress test layout to isolate stress test implementation from stress deployment elements.
* Added ability of stress test scenarios to be independently configured.
* Rewrote stress test app to be closer to the Go equivalent app.
* added apparg.hpp for command line parsing
* added scope_guard to test for scope_guard implementation.

## Added close methods to Consumer client and Producer client.

They don't do much currently, but will eventually.

## AI Generated summary of pull request

This pull request primarily focuses on the `azure-core-amqp` and `azure-messaging-eventhubs` packages, introducing changes to improve code efficiency and maintainability. The most significant changes include the removal of unnecessary `#include` directives in various files in the `azure-core-amqp` package, the addition of `Close` methods in `consumer_client.hpp` and `producer_client.hpp` in the `azure-messaging-eventhubs` package, and modifications to the `cgmanifest.json` files in the `azure-messaging-eventhubs-checkpointstore-blob` and `azure-messaging-eventhubs` directories.

Removal of unnecessary `#include` directives:

* [`sdk/core/azure-core-amqp/src/amqp/claim_based_security.cpp`](diffhunk://#diff-5acd7049cef5955540cc4253f264207e5a7d2701612e148c736ca5781e69d224L12-L14): Removed unnecessary `#include` directives for `iostream` and `sstream`.
* [`sdk/core/azure-core-amqp/src/amqp/connection.cpp`](diffhunk://#diff-fc3a6e5b11f1254c4fd344bcd1846c83ad4ca8e2a8a23b7db0657c2846f43937L18): Removed unnecessary `#include` directive for `azure/core/url.hpp`.
* [`sdk/core/azure-core-amqp/src/amqp/connection_string_credential.cpp`](diffhunk://#diff-14cf130dc5f0b51f698cca57724b733591d48fad0e5beb4745fc1cd78cbdaa72L7-L20): Removed unnecessary `#include` directives for `azure/core/amqp/internal/models/amqp_protocol.hpp`, `azure_c_shared_utility/azure_base64.h`, `azure_c_shared_utility/buffer_.h`, and `iostream`.
* [`sdk/core/azure-core-amqp/src/amqp/link.cpp`](diffhunk://#diff-249643f29ca2c0b1226e9d22ce90be83c77105f55221a1092fb605a5c7ead356L10-L21): Removed unnecessary `#include` directives for `azure/core/amqp/internal/message_receiver.hpp`, `azure/core/amqp/internal/message_sender.hpp`, `azure/core/amqp/internal/models/messaging_values.hpp`, and `azure_uamqp_c/amqp_definitions_sequence_no.h`.
* [`sdk/core/azure-core-amqp/src/amqp/management.cpp`](diffhunk://#diff-b03544340ff264961e045648c2c274dde9fa7ceb33b07b0c41a708128e28d40fL6-L16): Removed unnecessary `#include` directives for `azure/core/amqp/internal/connection.hpp`, `azure/core/amqp/internal/session.hpp`, and `azure_uamqp_c/amqp_management.h`.

Addition of `Close` methods:

* [`sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/consumer_client.hpp`](diffhunk://#diff-fad00d7bec0f12ef5e7d36387e866ab5291c6f5f57568845dfabe3f23320c899R152-R159): Added a `Close` method to the `ConsumerClient` class.
* [`sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/producer_client.hpp`](diffhunk://#diff-cba1fbd1d786b763c27123a2284476e1cc8093753abe6e3a6af9ca78aae4d594R96-R113): Added a `Close` method to the `ProducerClient` class.

Modification of `cgmanifest.json` files:

* [`sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/cgmanifest.json`](diffhunk://#diff-465b096c14ed240a8f14180f16e0924e3df69bd1c95934dca435c0b3c97468a0R1-R37): Added a new `cgmanifest.json` file specifying development dependencies.
* [`sdk/eventhubs/azure-messaging-eventhubs/cgmanifest.json`](diffhunk://#diff-f3e86a1f7be148625ac80b2151504461f3faff4d0564db588c6df9d0d9eb0986R1-R48): Added a new `cgmanifest.json` file specifying development dependencies.
* 

---------

Co-authored-by: Rick Winter <rick.winter@microsoft.com>
This commit is contained in:
Larry Osterman 2024-02-22 20:41:18 -08:00 committed by GitHub
parent 1bb65192d4
commit aadd25dcd1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
40 changed files with 2907 additions and 327 deletions

14
.vscode/cspell.json vendored
View File

@ -343,6 +343,17 @@
"Sylvain"
]
},
{
"filename": "**/sdk/eventhubs/azure-messaging-eventhubs/**/*",
"words": [
"otlp",
"Otlp",
"OTLP",
"exportings",
"ricab",
"noarg"
]
},
{
"filename": "**/sdk/identity/azure-identity/**",
"words": [
@ -413,7 +424,8 @@
{
"filename": "**/vcpkg.json",
"words": [
"umock"
"umock",
"otlp"
]
}
],

View File

@ -9,9 +9,6 @@
#include <azure/core/diagnostics/logger.hpp>
#include <azure/core/internal/diagnostics/log.hpp>
#include <iostream>
#include <sstream>
using namespace Azure::Core::Diagnostics::_internal;
using namespace Azure::Core::Diagnostics;

View File

@ -15,7 +15,6 @@
#include <azure/core/diagnostics/logger.hpp>
#include <azure/core/internal/diagnostics/log.hpp>
#include <azure/core/url.hpp>
#include <azure/core/uuid.hpp>
#include <azure_uamqp_c/connection.h>

View File

@ -4,20 +4,16 @@
#include "azure/core/amqp/internal/connection_string_credential.hpp"
#include "azure/core/amqp/internal/connection.hpp"
#include "azure/core/amqp/internal/models/amqp_protocol.hpp"
#include "azure/core/amqp/internal/network/socket_transport.hpp"
#include <azure/core/base64.hpp>
#include <azure/core/url.hpp>
#include <azure_c_shared_utility/azure_base64.h>
#include <azure_c_shared_utility/buffer_.h>
#include <azure_c_shared_utility/sastoken.h>
#include <azure_c_shared_utility/strings.h>
#include <azure_c_shared_utility/urlencode.h>
#include <algorithm>
#include <iostream>
#include <iterator>
#include <unordered_map>
#include <vector>

View File

@ -7,18 +7,12 @@
#include "../models/private/performatives/transfer_impl.hpp"
#include "../models/private/value_impl.hpp"
#include "azure/core/amqp/internal/common/completion_operation.hpp"
#include "azure/core/amqp/internal/message_receiver.hpp"
#include "azure/core/amqp/internal/message_sender.hpp"
#include "azure/core/amqp/internal/models/message_source.hpp"
#include "azure/core/amqp/internal/models/message_target.hpp"
#include "azure/core/amqp/internal/models/messaging_values.hpp"
#include "private/link_impl.hpp"
#include "private/session_impl.hpp"
#include <azure_uamqp_c/amqp_definitions_sequence_no.h>
#include <azure_uamqp_c/link.h>
#include <azure_uamqp_c/session.h>
namespace Azure { namespace Core { namespace Amqp { namespace _detail {
#if defined(TESTING_BUILD)

View File

@ -3,9 +3,7 @@
#include "azure/core/amqp/internal/management.hpp"
#include "azure/core/amqp/internal/connection.hpp"
#include "azure/core/amqp/internal/models/messaging_values.hpp"
#include "azure/core/amqp/internal/session.hpp"
#include "azure/core/amqp/models/amqp_message.hpp"
#include "private/connection_impl.hpp"
#include "private/management_impl.hpp"
@ -13,8 +11,6 @@
#include <azure/core/diagnostics/logger.hpp>
#include <azure/core/internal/diagnostics/log.hpp>
#include <azure_uamqp_c/amqp_management.h>
#include <iostream>
#include <memory>
#include <string>

View File

@ -8,24 +8,18 @@
#include "../models/private/message_impl.hpp"
#include "../models/private/value_impl.hpp"
#include "azure/core/amqp/internal/connection.hpp"
#include "azure/core/amqp/internal/connection_string_credential.hpp"
#include "azure/core/amqp/internal/link.hpp"
#include "azure/core/amqp/internal/models/messaging_values.hpp"
#include "azure/core/amqp/internal/session.hpp"
#include "azure/core/amqp/models/amqp_message.hpp"
#include "private/message_receiver_impl.hpp"
#include <azure/core/credentials/credentials.hpp>
#include <azure/core/diagnostics/logger.hpp>
#include <azure/core/internal/diagnostics/log.hpp>
#include <azure/core/platform.hpp>
#include <azure_uamqp_c/message_receiver.h>
#include <iostream>
#include <memory>
#include <sstream>
using namespace Azure::Core::Diagnostics::_internal;
using namespace Azure::Core::Diagnostics;

View File

@ -7,16 +7,12 @@
#include "../models/private/error_impl.hpp"
#include "../models/private/message_impl.hpp"
#include "../models/private/value_impl.hpp"
#include "azure/core/amqp/internal/claims_based_security.hpp"
#include "azure/core/amqp/internal/common/completion_operation.hpp"
#include "azure/core/amqp/internal/models/messaging_values.hpp"
#include "azure/core/amqp/internal/session.hpp"
#include "azure/core/amqp/models/amqp_message.hpp"
#include "private/connection_impl.hpp"
#include "private/message_sender_impl.hpp"
#include "private/session_impl.hpp"
#include <azure/core/credentials/credentials.hpp>
#include <azure/core/diagnostics/logger.hpp>
#include <azure/core/internal/diagnostics/log.hpp>
#include <azure/core/platform.hpp>

View File

@ -9,7 +9,6 @@
#include "unique_handle.hpp"
#include <azure/core/credentials/credentials.hpp>
#include <azure/core/platform.hpp>
#include <azure_uamqp_c/connection.h>

View File

@ -13,7 +13,6 @@
#include <azure_uamqp_c/link.h>
#include <chrono>
#include <memory>
#include <string>
#include <vector>

View File

@ -4,21 +4,15 @@
#pragma once
#include "azure/core/amqp/internal/message_receiver.hpp"
#include "claims_based_security_impl.hpp"
#include "connection_impl.hpp"
#include "link_impl.hpp"
#include "message_receiver_impl.hpp"
#include "session_impl.hpp"
#include "unique_handle.hpp"
#include <azure/core/credentials/credentials.hpp>
#include <azure_uamqp_c/amqpvalue.h>
#include <azure_uamqp_c/message.h>
#include <azure_uamqp_c/message_receiver.h>
#include <memory>
#include <vector>
namespace Azure { namespace Core { namespace Amqp { namespace _detail {
template <> struct UniqueHandleHelper<MESSAGE_RECEIVER_INSTANCE_TAG>

View File

@ -4,14 +4,11 @@
#pragma once
#include "azure/core/amqp/internal/message_sender.hpp"
#include "claims_based_security_impl.hpp"
#include "link_impl.hpp"
#include "unique_handle.hpp"
#include <azure_uamqp_c/message_sender.h>
#include <tuple>
namespace Azure { namespace Core { namespace Amqp { namespace _detail {
template <> struct UniqueHandleHelper<MESSAGE_SENDER_INSTANCE_TAG>
{

View File

@ -10,10 +10,8 @@
#include <azure_uamqp_c/session.h>
#include <chrono>
#include <memory>
#include <string>
#include <vector>
namespace Azure { namespace Core { namespace Amqp { namespace _detail {
template <> struct UniqueHandleHelper<SESSION_INSTANCE_TAG>

View File

@ -4,10 +4,7 @@
"supports": "!(windows & !static)",
"dependencies": [
"azure-core-cpp",
{
"name": "opentelemetry-cpp",
"platform": "!(windows & !static)"
},
"opentelemetry-cpp",
{
"name": "vcpkg-cmake",
"host": true

View File

@ -0,0 +1,37 @@
{
"$schema": "https://json.schemastore.org/component-detection-manifest.json",
"Registrations": [
{
"Component": {
"Type": "git",
"git": {
"RepositoryUrl": "https://github.com/google/googletest",
"CommitHash": "703bd9caab50b139428cea1aaff9974ebee5742e"
}
},
"DevelopmentDependency": true
},
{
"Component": {
"Type": "other",
"Other": {
"Name": "clang-format",
"Version": "9.0.0-2",
"DownloadUrl": "https://ubuntu.pkgs.org/18.04/ubuntu-updates-universe-amd64/clang-format-9_9-2~ubuntu18.04.2_amd64.deb.html"
}
},
"DevelopmentDependency": true
},
{
"Component": {
"Type": "other",
"Other": {
"Name": "doxygen",
"Version": "1.8.20",
"DownloadUrl": "http://doxygen.nl/files/doxygen-1.8.20-setup.exe"
}
},
"DevelopmentDependency": true
}
]
}

View File

@ -0,0 +1,48 @@
{
"$schema": "https://json.schemastore.org/component-detection-manifest.json",
"Registrations": [
{
"Component": {
"Type": "git",
"git": {
"RepositoryUrl": "https://github.com/google/googletest",
"CommitHash": "703bd9caab50b139428cea1aaff9974ebee5742e"
}
},
"DevelopmentDependency": true
},
{
"Component": {
"Type": "git",
"git": {
"RepositoryUrl": "https://github.com/ricab/scope_guard",
"CommitHash": "71a04528184db1749dd08ebbbf4daf3b5dca21fd",
"Branch": "master"
}
},
"DevelopmentDependency": true
},
{
"Component": {
"Type": "other",
"Other": {
"Name": "clang-format",
"Version": "9.0.0-2",
"DownloadUrl": "https://ubuntu.pkgs.org/18.04/ubuntu-updates-universe-amd64/clang-format-9_9-2~ubuntu18.04.2_amd64.deb.html"
}
},
"DevelopmentDependency": true
},
{
"Component": {
"Type": "other",
"Other": {
"Name": "doxygen",
"Version": "1.8.20",
"DownloadUrl": "http://doxygen.nl/files/doxygen-1.8.20-setup.exe"
}
},
"DevelopmentDependency": true
}
]
}

View File

@ -149,6 +149,14 @@ namespace Azure { namespace Messaging { namespace EventHubs {
PartitionClientOptions const& options = {},
Azure::Core::Context const& context = {});
/**
* @brief Closes the consumer client canceling any operations outstanding on any of the existing
* partition clients.
*
* @param context The context for the operation can be used for request cancellation.
*/
void Close(Azure::Core::Context const& context);
/**@brief GetEventHubProperties gets properties of an eventHub. This includes data
* like name, and partitions.
*

View File

@ -92,13 +92,29 @@ namespace Azure { namespace Messaging { namespace EventHubs {
std::shared_ptr<Azure::Core::Credentials::TokenCredential> credential,
ProducerClientOptions options = {});
~ProducerClient()
~ProducerClient() {}
/** @brief Close all the connections and sessions.
*
* @param context Context for the operation can be used for request cancellation.
*/
void Close(Azure::Core::Context const& context = {})
{
for (auto& sender : m_senders)
{
sender.second.Close();
sender.second.Close(context);
}
m_senders.clear();
// Other possible things we might want to do in close, but cannot quite do yet because it
// doesn't necessarily work correctly.
// for (auto& session : m_sessions)
// {
// session.second.Close(context);
// }
// for (auto& connection : m_connections)
// {
// connection.second.Close(context);
// }
}
/** @brief Create a new EventDataBatch to be sent to the Event Hub.

View File

@ -67,6 +67,15 @@ namespace Azure { namespace Messaging { namespace EventHubs {
};
}
void ConsumerClient::Close(Azure::Core::Context const& context)
{
for (auto& sender : m_receivers)
{
sender.second.Close(context);
}
m_receivers.clear();
}
Azure::Core::Amqp::_internal::Connection ConsumerClient::CreateConnection(
std::string const& partitionId) const
{

View File

@ -1,19 +1,13 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
cmake_minimum_required(VERSION 3.13)
# CMakeList.txt : Top-level CMake project file, do global configuration
# and include sub-projects here.
#
cmake_minimum_required (VERSION 3.13)
project(azure-messaging-eventhubs-stress-test LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED True)
add_executable(
azure-messaging-eventhubs-stress-test
eventhubs_stress_test.cpp
)
target_link_libraries(azure-messaging-eventhubs-stress-test PRIVATE azure-messaging-eventhubs azure-core azure-identity)
create_map_file(azure-messaging-eventhubs-stress-test azure-messaging-eventhubs-stress-test.map)
file(COPY ${CMAKE_CURRENT_BINARY_DIR}
DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/bin)
# Include sub-projects.
if (BUILD_TESTING)
# stress tests are categorized as normal tests.
add_subdirectory ("src")
endif()

View File

@ -1,6 +0,0 @@
dependencies:
- name: stress-test-addons
repository: https://stresstestcharts.blob.core.windows.net/helm/
version: 0.2.0
digest: sha256:59fff3930e78c4ca9f9c0120433c7695d31db63f36ac61d50abcc91b1f1835a0
generated: "2023-07-24T13:06:32.3351603-07:00"

View File

@ -6,7 +6,7 @@ FROM mcr.microsoft.com/mirror/docker/library/ubuntu:22.04 as build
# install the mem check tool along side the other deps
RUN apt-get update -y
RUN apt-get install -y gcc cmake make g++ git zip unzip build-essential pkg-config wget curl valgrind
RUN apt-get install -y gcc cmake make g++ git zip unzip build-essential pkg-config wget curl valgrind python3
RUN wget -O vcpkg.tar.gz https://github.com/microsoft/vcpkg/archive/master.tar.gz
RUN mkdir /opt/vcpkg
RUN tar xf vcpkg.tar.gz --strip-components=1 -C /opt/vcpkg
@ -22,7 +22,7 @@ WORKDIR /build
# version of the packages that will be fetched.
# So when building from root we need to match the two values. When not building from root if the vcpkg file
# does not specify a baseline the value set in the cmake file will ensure that we are at the desired level.
RUN cmake -DCMAKE_BUILD_TYPE=Release -DBUILD_TESTING=ON -DBUILD_TRANSPORT_CURL=ON /src
RUN --mount=type=cache,target=/root/.cache/vcpkg/archives --mount=type=cache,target=/root/.m2 cmake -DCMAKE_BUILD_TYPE=Release -DBUILD_TESTING=ON -DBUILD_TRANSPORT_CURL=ON /src
RUN cmake --build . --target azure-messaging-eventhubs-stress-test
FROM mcr.microsoft.com/mirror/docker/library/ubuntu:22.04
@ -32,7 +32,7 @@ RUN apt-get install -y valgrind
WORKDIR /
# copy the target binary
COPY --from=build ./build/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/azure-messaging-eventhubs-stress-test ./azure-messaging-eventhubs-stress-test
COPY --from=build ./build/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/azure-messaging-eventhubs-stress-test ./azure-messaging-eventhubs-stress-test
RUN chmod +x ./azure-messaging-eventhubs-stress-test
CMD ./azure-messaging-eventhubs-stress-test

View File

@ -26,4 +26,4 @@ Obviously after logging in to the acr "az acr login -n <acr>"
To use another image you will need to go to line 12 in deploy job and update with your new file.
Once the deploy succeeds run " kubectl logs -n azuresdkforcpp -f libcurl-stress-test" to grab the logs in real time .
After deployment succeeds, run " kubectl logs -n azuresdkforcpp -f eventhubs-stress-test" to grab the logs in real time .

View File

@ -1,241 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
/**
* @brief Validates the Azure Core transport adapters with fault responses from server.
*
* @note This test requires the Http-fault-injector
* (https://github.com/Azure/azure-sdk-tools/tree/main/tools/http-fault-injector) running. Follow
* the instructions to install and run the server before running this test.
*
*/
#define REQUESTS 100
#define WARMUP 100
#define ROUNDS 100
#include <azure/core.hpp>
#include <azure/core/internal/environment.hpp>
#include <azure/identity/client_secret_credential.hpp>
#include <azure/messaging/eventhubs/consumer_client.hpp>
#include <azure/messaging/eventhubs/producer_client.hpp>
#include <iostream>
using namespace Azure::Messaging::EventHubs;
class EventHubsStress {
public:
EventHubsStress()
{
m_eventHubName = Azure::Core::_internal::Environment::GetVariable("EVENTHUB_NAME");
m_eventHubConnectionString
= Azure::Core::_internal::Environment::GetVariable("EVENTHUB_CONNECTION_STRING");
m_checkpointStoreConnectionString
= Azure::Core::_internal::Environment::GetVariable("CHECKPOINT_STORE_CONNECTION_STRING");
m_numberToSend = 100;
m_batchSize = 100;
m_prefetchCount = 10;
m_messageBodySize = 1024;
m_tenantId = Azure::Core::_internal::Environment::GetVariable("AZURE_TENANT_ID");
m_clientId = Azure::Core::_internal::Environment::GetVariable("AZURE_CLIENT_ID");
m_secret = Azure::Core::_internal::Environment::GetVariable("AZURE_CLIENT_SECRET");
if (m_eventHubConnectionString.empty())
{
m_credential = std::make_shared<Azure::Identity::ClientSecretCredential>(
m_tenantId, m_clientId, m_secret);
m_client = std::make_unique<Azure::Messaging::EventHubs::ProducerClient>(
m_eventHubConnectionString, m_eventHubName, m_credential);
}
else
{
m_client = std::make_unique<Azure::Messaging::EventHubs::ProducerClient>(
m_eventHubConnectionString, m_eventHubName);
}
}
void Warmup(int repetitions)
{
for (int i = 0; i < repetitions; i++)
{
std::cout << "Warmup " << i << std::endl;
SendMessages();
ReceiveMessages();
}
}
void Run(int repetitions)
{
for (int i = 0; i < repetitions; i++)
{
std::cout << "Run " << i << std::endl;
SendMessages();
ReceiveMessages();
}
}
void Cleanup() {}
private:
std::string m_eventHubName;
std::string m_eventHubConnectionString;
std::string m_checkpointStoreConnectionString;
std::string m_partitionId{"0"};
std::string m_tenantId;
std::string m_clientId;
std::string m_secret;
uint32_t m_numberToSend;
uint32_t m_batchSize;
uint32_t m_prefetchCount;
size_t m_messageBodySize;
int m_rounds{10};
std::unique_ptr<Azure::Messaging::EventHubs::ProducerClient> m_client;
std::shared_ptr<Azure::Core::Credentials::TokenCredential> m_credential;
Models::StartPosition m_receiveStartPosition;
void SendEventsToPartition(Azure::Core::Context const& context)
{
auto beforeSendProps = m_client->GetPartitionProperties(m_partitionId, context);
std::vector<uint8_t> bodyData(m_messageBodySize, 'a');
Azure::Messaging::EventHubs::EventDataBatchOptions batchOptions;
batchOptions.PartitionId = m_partitionId;
Azure::Messaging::EventHubs::EventDataBatch batch(m_client->CreateBatch(batchOptions));
for (uint32_t j = 0; j < m_numberToSend; ++j)
{
Azure::Messaging::EventHubs::Models::EventData event;
event.Body = bodyData;
event.Properties["Number"] = j;
event.Properties["PartitionId"]
= static_cast<Azure::Core::Amqp::Models::AmqpValue>(m_partitionId);
AddEndProperty(event, m_numberToSend);
batch.TryAddMessage(event);
}
m_client->Send(batch, context);
auto afterSendProps = m_client->GetPartitionProperties(m_partitionId, context);
m_receiveStartPosition.Inclusive = false;
m_receiveStartPosition.SequenceNumber = beforeSendProps.LastEnqueuedSequenceNumber;
}
void AddEndProperty(Azure::Messaging::EventHubs::Models::EventData& event, uint64_t expectedCount)
{
event.Properties["End"] = expectedCount;
}
void SendMessages()
{
try
{
Azure::Core::Context context;
SendEventsToPartition(context);
}
catch (std::exception const& ex)
{
std::cerr << "Exception " << ex.what();
throw;
}
}
void ReceiveMessages()
{
try
{
Azure::Core::Context context;
ConsumerClientOptions clientOptions;
clientOptions.ApplicationID = "StressConsumerClient";
ConsumerClient consumerClient(
m_eventHubConnectionString, m_eventHubName, DefaultConsumerGroup, clientOptions);
auto consumerProperties = consumerClient.GetEventHubProperties(context);
std::cout << "Starting receive tests for partition " << m_partitionId << std::endl;
std::cout << " Start position: " << m_receiveStartPosition << std::endl;
for (auto round = 0; round < m_rounds; round += 1)
{
ConsumeForBatchTester(round, consumerClient, m_receiveStartPosition, context);
}
}
catch (std::exception const& ex)
{
std::cerr << "Exception " << ex.what();
throw;
}
}
void ConsumeForBatchTester(
uint32_t round,
ConsumerClient& client,
Models::StartPosition const& startPosition,
Azure::Core::Context const& context)
{
PartitionClientOptions partitionOptions;
partitionOptions.StartPosition = startPosition;
partitionOptions.Prefetch = m_prefetchCount;
PartitionClient partitionClient{client.CreatePartitionClient(m_partitionId, partitionOptions)};
std::cout << "[r: " << round << "/" << m_rounds << "p: " << m_partitionId
<< "] Starting to receive messages from partition" << std::endl;
size_t total = 0;
// uint32_t numCancels = 0;
// constexpr const uint32_t cancelLimit = 5;
auto events = partitionClient.ReceiveEvents(m_batchSize, context);
total += events.size();
std::cout << "Total: " << total << std::endl;
}
};
int main(int argc, char**)
{
try
{
EventHubsStress stressTest;
// some param was passed to the program, doesn't matter what it is,
// it is meant for the moment to just run a quick iteration to check for sanity of the test.
// since prototype TODO: pass in warmup/rounds/requests as params.
if (argc != 1)
{
std::cout << "--------------\tBUILD TEST\t--------------" << std::endl;
stressTest.Warmup(1);
stressTest.Run(5);
stressTest.Cleanup();
std::cout << "--------------\tEND BUILD TEST\t--------------" << std::endl;
return 0;
}
std::cout << "--------------\tSTARTING TEST\t--------------" << std::endl;
std::cout << "--------------\tPRE WARMUP\t--------------" << std::endl;
stressTest.Warmup(WARMUP);
std::cout << "--------------\tPOST WARMUP\t--------------" << std::endl;
for (int i = 0; i < ROUNDS; i++)
{
std::cout << "--------------\tTEST ITERATION:" << i << "\t--------------" << std::endl;
stressTest.Run(REQUESTS);
std::cout << "--------------\tDONE ITERATION:" << i << "\t--------------" << std::endl;
}
stressTest.Cleanup();
}
catch (std::exception const& ex)
{
std::cerr << "Test failed due to exception thrown: " << ex.what() << std::endl;
}
return 0;
}

View File

@ -10,7 +10,7 @@ matrix:
image: Dockerfile
imageBuildDir: "../../../../../"
scenarios:
constantDetach:
produceConsumeEvents:
testTarget: azure-messaging-eventhubs-stress-test
memory: "1.5Gi"

View File

@ -0,0 +1,60 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# cspell: words otlp
cmake_minimum_required(VERSION 3.13)
project(azure-messaging-eventhubs-stress-test LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED True)
set(WITH_STL ON)
find_package(opentelemetry-cpp CONFIG REQUIRED)
find_package(protobuf)
find_package(nlohmann_json)
find_package(CURL)
set(INCLUDE_FILES
inc/eventhubs_stress_scenarios.hpp
scenarios/inc/event_sender.hpp
scenarios/inc/opentelemetry_helpers.hpp
scenarios/inc/batch_stress_tests.hpp
)
set(SOURCE_FILES
eventhubs_stress_test.cpp
scenarios/src/opentelemetry_helpers.cpp
scenarios/src/batch_stress_tests.cpp
)
add_executable(
azure-messaging-eventhubs-stress-test
${SOURCE_FILES} ${INCLUDE_FILES}
)
# Include the headers from the project.
target_include_directories(
azure-messaging-eventhubs-stress-test
PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/inc>
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/scenarios/inc>
)
target_link_libraries(azure-messaging-eventhubs-stress-test
PRIVATE
azure-messaging-eventhubs
azure-identity
opentelemetry-cpp::ostream_span_exporter
opentelemetry-cpp::in_memory_span_exporter
opentelemetry-cpp::otlp_http_exporter
opentelemetry-cpp::otlp_http_log_record_exporter
opentelemetry-cpp::sdk
)
create_map_file(azure-messaging-eventhubs-stress-test azure-messaging-eventhubs-stress-test.map)
file(COPY ${CMAKE_CURRENT_BINARY_DIR}
DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/bin)

View File

@ -0,0 +1,296 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
/**
* @brief Stress framework for EventHubs service client.
*
*/
#include "argagg.hpp"
#include "batch_stress_tests.hpp"
#include "eventhubs_stress_scenarios.hpp"
#include "opentelemetry/sdk/logs/simple_log_record_processor_factory.h"
#include <algorithm>
#include <cctype>
#include <iostream>
#include <memory>
#include <opentelemetry/exporters/otlp/otlp_http_exporter_factory.h>
#include <opentelemetry/exporters/otlp/otlp_http_exporter_options.h>
#include <opentelemetry/exporters/otlp/otlp_http_log_record_exporter_factory.h>
#include <opentelemetry/exporters/otlp/otlp_http_log_record_exporter_options.h>
#include <opentelemetry/logs/provider.h>
#include <opentelemetry/sdk/common/global_log_handler.h>
#include <opentelemetry/sdk/logs/logger_context.h>
#include <opentelemetry/sdk/logs/logger_provider_factory.h>
#include <opentelemetry/sdk/trace/processor.h>
#include <opentelemetry/sdk/trace/simple_processor_factory.h>
#include <opentelemetry/sdk/trace/tracer_provider.h>
#include <opentelemetry/sdk/trace/tracer_provider_factory.h>
#include <opentelemetry/trace/provider.h>
namespace trace_sdk = opentelemetry::sdk::trace;
namespace trace = opentelemetry::trace;
namespace logs_sdk = opentelemetry::sdk::logs;
namespace logs = opentelemetry::logs;
namespace otlp = opentelemetry::exporter::otlp;
namespace internal_log = opentelemetry::sdk::common::internal_log;
void InitTracer()
{
opentelemetry::exporter::otlp::OtlpHttpExporterOptions opts;
// Create OTLP exporter instance
auto exporter = otlp::OtlpHttpExporterFactory::Create(opts);
auto processor = trace_sdk::SimpleSpanProcessorFactory::Create(std::move(exporter));
std::shared_ptr<opentelemetry::trace::TracerProvider> provider
= trace_sdk::TracerProviderFactory::Create(std::move(processor));
// Set the global trace provider
trace::Provider::SetTracerProvider(provider);
}
// On debug builds, we log to the console. On release builds, we log to OpenTelemetry.
#if defined(_DEBUG)
constexpr const bool LogDefault = true;
#else
constexpr const bool LogDefault = false;
#endif
bool LogToConsole{LogDefault};
void InitLogger()
{
if (LogToConsole)
{
std::cout << "Using console to export log records." << std::endl;
}
else
{
opentelemetry::exporter::otlp::OtlpHttpLogRecordExporterOptions logger_opts;
std::cout << "Using " << logger_opts.url << " to export log records." << std::endl;
logger_opts.console_debug = false;
// Create OTLP exporter instance
auto exporter = otlp::OtlpHttpLogRecordExporterFactory::Create(logger_opts);
auto processor = logs_sdk::SimpleLogRecordProcessorFactory::Create(std::move(exporter));
std::shared_ptr<logs::LoggerProvider> provider
= logs_sdk::LoggerProviderFactory::Create(std::move(processor));
// Set the global log provider.
logs::Provider::SetLoggerProvider(provider);
// Integrate the azure logging diagnostics with the OpenTelemetry logger provider we just
// created.
Azure::Core::Diagnostics::Logger::SetListener(
[](Azure::Core::Diagnostics::Logger::Level level, std::string const& message) {
logs::Severity logSeverity{logs::Severity::kInvalid};
switch (level)
{
case Azure::Core::Diagnostics::Logger::Level::Error:
logSeverity = logs::Severity::kError;
break;
case Azure::Core::Diagnostics::Logger::Level::Informational:
logSeverity = logs::Severity::kInfo;
break;
case Azure::Core::Diagnostics::Logger::Level::Verbose:
logSeverity = logs::Severity::kDebug;
break;
case Azure::Core::Diagnostics::Logger::Level::Warning:
logSeverity = logs::Severity::kWarn;
break;
}
logs::Provider::GetLoggerProvider()
->GetLogger(EventHubsLoggerName)
->Log(logSeverity, message);
});
internal_log::GlobalLogHandler::SetLogLevel(internal_log::LogLevel::Error);
}
}
void CleanupTracer()
{
// We call ForceFlush to prevent to cancel running exports, It's optional.
opentelemetry::nostd::shared_ptr<opentelemetry::trace::TracerProvider> provider
= trace::Provider::GetTracerProvider();
if (provider)
{
static_cast<trace_sdk::TracerProvider*>(provider.get())->ForceFlush();
}
std::shared_ptr<opentelemetry::trace::TracerProvider> none;
trace::Provider::SetTracerProvider(none);
}
void Usage(
argagg::parser const& argparser,
const std::vector<std::shared_ptr<EventHubsStressScenario>>& scenarios)
{
argagg::fmt_ostream fmt(std::cerr);
fmt << "Usage azure-messaging-eventhubs-stress-test [options] " << std::endl << argparser;
fmt << std::endl;
fmt << "Scenario Options:" << std::endl;
for (const auto& scenario : scenarios)
{
fmt << "Scenario: " << scenario->GetStressScenarioName() << std::endl;
for (const auto& option : scenario->GetScenarioOptions())
{
fmt << " ";
for (auto& arg : option.Activators)
{
fmt << arg;
if (arg != option.Activators.back())
{
fmt << ", ";
}
}
fmt << "\n " << option.HelpMessage << '\n';
}
}
}
bool CompareString(std::string lhs, std::string rhs)
{
std::transform(lhs.begin(), lhs.end(), lhs.begin(), tolower);
std::transform(rhs.begin(), rhs.end(), rhs.begin(), tolower);
return lhs == rhs;
}
int main(int argc, char** argv)
{
try
{
std::vector<std::shared_ptr<EventHubsStressScenario>> scenarios{
std::make_shared<BatchStressTest>(),
};
// Determine the stress scenario to run.
// Parse the command line in "positional only" mode. The first argument is the scenario name.
std::shared_ptr<EventHubsStressScenario> scenario;
{
argagg::parser argparser;
argagg::parser_results args;
try
{
args = argparser.parse(argc, argv, true);
}
catch (const std::exception& e)
{
std::cerr << "Exception thrown parsing command line: " << e.what();
Usage(argparser, scenarios);
return -1;
}
std::string scenarioName;
if (args.pos.size() > 0)
{
scenarioName = args.pos[0];
}
else
{
std::cerr << "No scenario name provided." << std::endl;
Usage(argparser, scenarios);
return -1;
}
for (const auto& scenarioToCheck : scenarios)
{
if (CompareString(scenarioToCheck->GetStressScenarioName(), scenarioName))
{
scenario = scenarioToCheck;
break;
}
}
if (!scenario)
{
std::cerr << "Unknown scenario name " << scenarioName << "." << std::endl;
std::cerr << "Known scenarios are:" << std::endl;
for (const auto& scenarioToCheck : scenarios)
{
std::cerr << " " << scenarioToCheck->GetStressScenarioName();
}
Usage(argparser, scenarios);
return -1;
}
}
std::cout << "Running stress scenario " << scenario->GetStressScenarioName() << std::endl;
// Now we know the scenario, reparse the command line using the scenario specific options. We
// also support
auto scenarioOptions{scenario->GetScenarioOptions()};
argagg::parser argparser{{
{"console", {"--console"}, "Log output traces to console", 0},
{"help", {"-?", "-h", "--help"}, "This help message.", 0},
{"verbose", {"-v", "--verbose"}, "Enable verbose logging", 0},
}};
// Add the scenario specific options to the parser.
for (const auto& option : scenarioOptions)
{
argparser.definitions.push_back(
{option.Name, option.Activators, option.HelpMessage, option.ExpectedArgs});
}
// Re-parse the command line with this scenario's options.
argagg::parser_results args;
try
{
args = argparser.parse(argc, argv);
}
catch (const std::exception& e)
{
std::cerr << "Exception thrown parsing command line: " << e.what();
Usage(argparser, scenarios);
return -1;
}
// Log to the console or to OpenTelemetry logs.
LogToConsole = args["console"].as<bool>(LogDefault);
if (args.has_option("help"))
{
Usage(argparser, scenarios);
return 0;
}
// Initialize OpenTelemetry Tracers and Loggers.
// TBD: Metrics.
InitTracer();
InitLogger();
if (args.has_option("verbose"))
{
Azure::Core::Diagnostics::Logger::SetLevel(Azure::Core::Diagnostics::Logger::Level::Verbose);
}
else
{
Azure::Core::Diagnostics::Logger::SetLevel(
Azure::Core::Diagnostics::Logger::Level::Informational);
}
std::cout << "===\tINITIALIZE TEST\t===" << std::endl;
scenario->Initialize(args);
std::cout << "===\tRUN TEST\t===" << std::endl;
scenario->Run();
std::cout << "===\tCLEANUP TEST\t===" << std::endl;
scenario->Cleanup();
}
catch (std::exception const& ex)
{
std::cerr << "Test failed due to exception thrown: " << ex.what() << std::endl;
}
CleanupTracer();
return 0;
}

View File

@ -0,0 +1,61 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#pragma once
#include "argagg.hpp"
constexpr const char* EventHubsLoggerName = "eventhubs_stress_test";
extern bool LogToConsole;
struct EventHubsScenarioOptions
{
/**
* @brief The name of the scenario option.
*
*/
std::string Name;
/**
* @brief The list of sentinels for parsing the option from command line. i. e. [`-o`,
* `--option`].
*
*/
std::vector<std::string> Activators;
/**
* @brief The message that is displayed in the command line when help is requested.
*
*/
std::string HelpMessage;
/**
* @brief The number of arguments expected after the sentinel for the test option.
*
*/
uint16_t ExpectedArgs;
/**
* @brief Make an option to be mandatory to run the test.
*
*/
bool Required = false;
/**
* @brief Make the option to be replaced with **** on all outputs
*
*/
bool SensitiveData = false;
};
class EventHubsStressScenario {
public:
EventHubsStressScenario(){};
virtual const std::string& GetStressScenarioName() = 0;
virtual const std::vector<EventHubsScenarioOptions>& GetScenarioOptions() = 0;
virtual void Initialize(argagg::parser_results const& parserResults) = 0;
virtual void Run() = 0;
virtual void Cleanup() = 0;
protected:
virtual ~EventHubsStressScenario(){};
};

View File

@ -0,0 +1,66 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "event_sender.hpp"
#include "eventhubs_stress_scenarios.hpp"
#include <azure/core/internal/environment.hpp>
#include <azure/identity/client_secret_credential.hpp>
#include <azure/messaging/eventhubs/consumer_client.hpp>
#include <azure/messaging/eventhubs/producer_client.hpp>
#include <opentelemetry/trace/tracer.h>
class BatchStressTest : public EventHubsStressScenario {
public:
BatchStressTest();
private:
std::string m_eventHubName;
std::string m_eventHubNamespace;
std::string m_eventHubConnectionString;
std::string m_checkpointStoreConnectionString;
std::string m_partitionId{"0"};
std::string m_tenantId;
std::string m_clientId;
std::string m_secret;
uint32_t m_numberToSend{10000000};
uint32_t m_batchSize{1000};
std::chrono::system_clock::duration m_batchDuration{std::chrono::seconds(60)};
uint32_t m_prefetchCount{0};
std::uint32_t m_rounds{100};
std::uint32_t m_paddingBytes{1024};
std::uint32_t m_maxTimeouts{10};
bool m_verbose{false};
bool m_useSasCredential{false};
std::string m_scenarioName{"BatchStressTest"};
void SendEventsToPartition(
std::unique_ptr<Azure::Messaging::EventHubs::ProducerClient> const& producerClient,
Azure::Core::Context const& context);
void AddEndProperty(
Azure::Messaging::EventHubs::Models::EventData& event,
uint64_t expectedCount);
std::pair<
Azure::Messaging::EventHubs::Models::StartPosition,
Azure::Messaging::EventHubs::Models::EventHubPartitionProperties>
SendMessages();
void ReceiveMessages(Azure::Messaging::EventHubs::Models::StartPosition const& startPosition);
void ConsumeForBatchTester(
uint32_t round,
Azure::Messaging::EventHubs::ConsumerClient& client,
Azure::Messaging::EventHubs::Models::StartPosition const& startPosition,
Azure::Core::Context const& context) const;
// Inherited via EventHubsStressScenario
const std::string& GetStressScenarioName() override;
const std::vector<EventHubsScenarioOptions>& GetScenarioOptions() override;
void Initialize(argagg::parser_results const& parserResults) override;
void Run() override;
void Cleanup() override;
};

View File

@ -0,0 +1,114 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#pragma once
#include "opentelemetry_helpers.hpp"
#include <azure/core/context.hpp>
#include <azure/messaging/eventhubs/models/event_data.hpp>
#include <azure/messaging/eventhubs/models/management_models.hpp>
#include <azure/messaging/eventhubs/models/partition_client_models.hpp>
#include <azure/messaging/eventhubs/producer_client.hpp>
#include <memory>
struct EventSenderOptions
{
std::string PartitionId;
std::uint32_t MessageLimit;
std::uint32_t NumberOfExtraBytes;
};
class EventSender {
public:
static std::pair<
Azure::Messaging::EventHubs::Models::StartPosition,
Azure::Messaging::EventHubs::Models::EventHubPartitionProperties>
SendEventsToPartition(
std::unique_ptr<Azure::Messaging::EventHubs::ProducerClient> const& producerClient,
EventSenderOptions const& senderOptions,
Azure::Core::Context const& context)
{
auto sendEventsScope{CreateStressSpan("SendEventsToPartition")};
std::cout << "[BEGIN] Sending " << senderOptions.MessageLimit << " messages to partition "
<< senderOptions.PartitionId << ", with messages of size "
<< senderOptions.NumberOfExtraBytes << std::endl;
Azure::Messaging::EventHubs::Models::EventHubPartitionProperties beforeSendProps;
{
auto getPropertiesSpan{
CreateStressSpan("SendEventsToPartition::GetPartitionProperties begin")};
beforeSendProps = producerClient->GetPartitionProperties(senderOptions.PartitionId, context);
}
std::vector<uint8_t> bodyData(senderOptions.NumberOfExtraBytes, 'a');
Azure::Messaging::EventHubs::EventDataBatchOptions batchOptions;
batchOptions.PartitionId = senderOptions.PartitionId;
Azure::Messaging::EventHubs::EventDataBatch batch{producerClient->CreateBatch(batchOptions)};
for (uint32_t j = 0; j < senderOptions.MessageLimit; ++j)
{
Azure::Messaging::EventHubs::Models::EventData event;
event.Body = bodyData;
event.Properties["Number"] = j;
event.Properties["PartitionID"]
= static_cast<Azure::Core::Amqp::Models::AmqpValue>(senderOptions.PartitionId);
if (j == senderOptions.MessageLimit)
{
AddEndProperty(event, senderOptions.MessageLimit);
}
{
auto batchAddMessageSpan{CreateStressSpan("SendEventsToPartition::BatchTryAddMessage")};
if (!batch.TryAddMessage(event))
{
if (batch.CurrentSize() == 0)
{
std::cerr << "Single message could not fit in batch";
throw std::runtime_error("Single message could not fit in batch");
}
auto sendBatchSpan{CreateStressSpan("SendBatch")};
{
producerClient->Send(batch, context);
}
batch = producerClient->CreateBatch(batchOptions);
j -= 1; // Retry adding the same message.
}
}
}
if (batch.CurrentSize() > 0)
{
auto sendBatchSpan{CreateStressSpan("SendBatch")};
{
sendBatchSpan.first->AddEvent("Send events", {{"event count", senderOptions.MessageLimit}});
producerClient->Send(batch, context);
}
}
{
auto getPartitionPropertiesSpan{CreateStressSpan("GetPartitionProperties")};
auto afterSendProps
= producerClient->GetPartitionProperties(senderOptions.PartitionId, context);
getPartitionPropertiesSpan.first->AddEvent(
"After Properties", {{"sequenceNumber", beforeSendProps.LastEnqueuedSequenceNumber}});
Azure::Messaging::EventHubs::Models::StartPosition afterStartPosition;
afterStartPosition.Inclusive = false;
afterStartPosition.SequenceNumber = beforeSendProps.LastEnqueuedSequenceNumber;
std::cout << "[END] Sending " << senderOptions.MessageLimit << " messages to partition "
<< senderOptions.PartitionId << " with messages of size "
<< senderOptions.NumberOfExtraBytes << "b" << std::endl;
return std::make_pair(afterStartPosition, afterSendProps);
}
}
private:
static void AddEndProperty(
Azure::Messaging::EventHubs::Models::EventData& event,
uint64_t expectedCount)
{
event.Properties["End"] = expectedCount;
}
};

View File

@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#pragma once
#include <utility>
#include <opentelemetry/logs/provider.h>
#include <opentelemetry/sdk/logs/logger.h>
#include <opentelemetry/sdk/trace/tracer.h>
#include <opentelemetry/trace/provider.h>
#include <opentelemetry/trace/semantic_conventions.h>
opentelemetry::nostd::shared_ptr<opentelemetry::logs::Logger> GetLogger();
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer> GetTracer();
std::pair<opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>, opentelemetry::trace::Scope>
CreateStressSpan(std::string const& name);

View File

@ -0,0 +1,184 @@
/*
* Created on: 13/02/2018
* Author: ricab
*
* See README.md for documentation of this header's public interface.
*/
#ifndef SCOPE_GUARD_HPP_
#define SCOPE_GUARD_HPP_
#include <type_traits>
#include <utility>
#if __cplusplus >= 201703L
#define SG_NODISCARD [[nodiscard]]
#ifdef SG_REQUIRE_NOEXCEPT_IN_CPP17
#define SG_REQUIRE_NOEXCEPT
#endif
#else
#define SG_NODISCARD
#endif
namespace sg {
namespace detail {
/* --- Some custom type traits --- */
// Type trait determining whether a type is callable with no arguments
template <typename T, typename = void> struct is_noarg_callable_t : public std::false_type
{
}; // in general, false
template <typename T>
struct is_noarg_callable_t<T, decltype(std::declval<T&&>()())> : public std::true_type
{
}; // only true when call expression valid
// Type trait determining whether a no-argument callable returns void
template <typename T>
struct returns_void_t : public std::is_same<void, decltype(std::declval<T&&>()())>
{
};
/* Type trait determining whether a no-arg callable is nothrow invocable if
required. This is where SG_REQUIRE_NOEXCEPT logic is encapsulated. */
template <typename T>
struct is_nothrow_invocable_if_required_t
: public
#ifdef SG_REQUIRE_NOEXCEPT
std::is_nothrow_invocable<T> /* Note: _r variants not enough to
confirm void return: any return can be
discarded so all returns are
compatible with void */
#else
std::true_type
#endif
{
};
// logic AND of two or more type traits
template <typename A, typename B, typename... C> struct and_t : public and_t<A, and_t<B, C...>>
{
}; // for more than two arguments
template <typename A, typename B>
struct and_t<A, B> : public std::conditional<A::value, B, A>::type
{
}; // for two arguments
// Type trait determining whether a type is a proper scope_guard callback.
template <typename T>
struct is_proper_sg_callback_t : public and_t<
is_noarg_callable_t<T>,
returns_void_t<T>,
is_nothrow_invocable_if_required_t<T>,
std::is_nothrow_destructible<T>>
{
};
/* --- The actual scope_guard template --- */
template <
typename Callback,
typename = typename std::enable_if<is_proper_sg_callback_t<Callback>::value>::type>
class scope_guard;
/* --- Now the friend maker --- */
template <typename Callback>
detail::scope_guard<Callback> make_scope_guard(Callback&& callback) noexcept(
std::is_nothrow_constructible<Callback, Callback&&>::value); /*
we need this in the inner namespace due to MSVC bugs preventing
sg::detail::scope_guard from befriending a sg::make_scope_guard
template instance in the parent namespace (see https://is.gd/xFfFhE). */
/* --- The template specialization that actually defines the class --- */
template <typename Callback> class SG_NODISCARD scope_guard<Callback> final {
public:
typedef Callback callback_type;
scope_guard(scope_guard&& other) noexcept(
std::is_nothrow_constructible<Callback, Callback&&>::value);
~scope_guard() noexcept; // highlight noexcept dtor
void dismiss() noexcept;
public:
scope_guard() = delete;
scope_guard(const scope_guard&) = delete;
scope_guard& operator=(const scope_guard&) = delete;
scope_guard& operator=(scope_guard&&) = delete;
private:
explicit scope_guard(Callback&& callback) noexcept(
std::is_nothrow_constructible<Callback, Callback&&>::value); /*
meant for friends only */
friend scope_guard<Callback> make_scope_guard<Callback>(Callback&&) noexcept(
std::is_nothrow_constructible<Callback, Callback&&>::value); /*
only make_scope_guard can create scope_guards from scratch (i.e. non-move)
*/
private:
Callback m_callback;
bool m_active;
};
} // namespace detail
/* --- Now the single public maker function --- */
using detail::make_scope_guard; // see comment on declaration above
} // namespace sg
////////////////////////////////////////////////////////////////////////////////
template <typename Callback>
sg::detail::scope_guard<Callback>::scope_guard(Callback&& callback) noexcept(
std::is_nothrow_constructible<Callback, Callback&&>::value)
: m_callback(std::forward<Callback>(callback)) /* use () instead of {} because
of DR 1467 (https://is.gd/WHmWuo), which still impacts older compilers
(e.g. GCC 4.x and clang <=3.6, see https://godbolt.org/g/TE9tPJ and
https://is.gd/Tsmh8G) */
,
m_active{true}
{
}
////////////////////////////////////////////////////////////////////////////////
template <typename Callback>
sg::detail::scope_guard<Callback>::scope_guard::~scope_guard() noexcept /*
need the extra injected-class-name here to make different compilers happy */
{
if (m_active)
m_callback();
}
////////////////////////////////////////////////////////////////////////////////
template <typename Callback>
sg::detail::scope_guard<Callback>::scope_guard(scope_guard&& other) noexcept(
std::is_nothrow_constructible<Callback, Callback&&>::value)
: m_callback(std::forward<Callback>(other.m_callback)) // idem
,
m_active{std::move(other.m_active)}
{
other.m_active = false;
}
////////////////////////////////////////////////////////////////////////////////
template <typename Callback> inline void sg::detail::scope_guard<Callback>::dismiss() noexcept
{
m_active = false;
}
////////////////////////////////////////////////////////////////////////////////
template <typename Callback>
inline auto sg::detail::make_scope_guard(Callback&& callback) noexcept(
std::is_nothrow_constructible<Callback, Callback&&>::value) -> detail::scope_guard<Callback>
{
return detail::scope_guard<Callback>{std::forward<Callback>(callback)};
}
#endif /* SCOPE_GUARD_HPP_ */

View File

@ -0,0 +1,357 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "batch_stress_tests.hpp"
#include "opentelemetry_helpers.hpp"
#include "scope_guard.hpp"
#include <azure/identity/environment_credential.hpp>
using namespace Azure::Messaging::EventHubs;
namespace trace_sdk = opentelemetry::sdk::trace;
namespace trace = opentelemetry::trace;
namespace logs_sdk = opentelemetry::sdk::logs;
namespace logs = opentelemetry::logs;
namespace {
} // namespace
BatchStressTest::BatchStressTest() {}
namespace argagg { namespace convert {
// Convert a string to a std::chrono::system_clock::duration
template <> std::chrono::system_clock::duration arg(const char* s)
{
std::string str(s);
std::string number;
std::string unit;
for (auto c : str)
{
if (std::isdigit(c))
{
number.push_back(c);
}
else
{
unit.push_back(c);
}
}
auto value = std::stoll(number);
if (unit == "s")
{
return std::chrono::seconds(value);
}
else if (unit == "m")
{
return std::chrono::minutes(value);
}
else if (unit == "h")
{
return std::chrono::hours(value);
}
else if (unit == "ms")
{
return std::chrono::milliseconds(value);
}
else if (unit == "us")
{
return std::chrono::microseconds(value);
}
else
{
throw std::invalid_argument("Invalid duration unit: " + unit);
}
}
}} // namespace argagg::convert
const std::string& BatchStressTest::GetStressScenarioName() { return m_scenarioName; }
std::vector<EventHubsScenarioOptions> BatchScenarioOptions{
{"NumberToSend", {"-c", "--send"}, "Number of events to send", 1},
{"BatchSize",
{"-r", "--receive"},
"Size to request each time we call ReceiveEvents(). Higher batch sizes will require higher "
"amounts of memory for this test",
1},
{"BatchDuration", {"-t", "--timeout"}, "Time to wait for each batch (ie: 1m, 30s, etc...)", 1},
{"Prefetch",
{"-f", "--prefetch"},
"Number of events to set for the prefetch. Negative numbers disable prefetch altogether. 0 "
"uses the default for the package",
1},
{"Rounds",
{"-n", "--rounds"},
"Number of rounds to run with these parameters. -1 means MAX_INT",
1},
{"PaddingBytes",
{"-P", "--padding"},
"Extra number of bytes to add onto each message body.",
1},
{"PartitionId",
{"-p", "--partition"},
"Partition ID to send events to and receive events from",
1},
{"MaxTimeouts",
{"-m", "--maxtimeouts"},
"Number of consecutive receive timeouts allowed before quitting",
0},
{"UseSasCredential",
{"-S", "--useSasCredential"},
"Use a SAS credential for authentication",
0},
{"SleepAfter", {"--sleepAfter"}, "Time to sleep after test completes", 1},
};
// Default option values.
// Note that the DefaultNumberToSend value is artificially reduced to 100 until the
// MessageSender::Open code fully supports Open.
constexpr const std::uint32_t DefaultNumberToSend = /*1000000*/ 100;
constexpr const std::uint32_t DefaultBatchSize = 1000;
constexpr const std::uint32_t DefaultPrefetch = 0;
constexpr const auto DefaultDuration = std::chrono::seconds(60);
constexpr const std::uint32_t DefaultRounds = 100;
constexpr const std::uint32_t DefaultPaddingBytes = 1024;
const std::string DefaultPartitionId{"0"}; // constexpr std::string is a c++17 feature.
constexpr const std::uint32_t DefaultMaxTimeouts = 10;
const std::vector<EventHubsScenarioOptions>& BatchStressTest::GetScenarioOptions()
{
return BatchScenarioOptions;
}
void BatchStressTest::Initialize(argagg::parser_results const& parserResults)
{
m_numberToSend = parserResults["NumberToSend"].as<uint32_t>(DefaultNumberToSend);
m_batchSize = parserResults["BatchSize"].as<uint32_t>(DefaultBatchSize);
m_batchDuration
= parserResults["BatchDuration"].as<std::chrono::system_clock::duration>(DefaultDuration);
m_prefetchCount = parserResults["Prefetch"].as<uint32_t>(DefaultPrefetch);
m_rounds = parserResults["Rounds"].as<uint32_t>(DefaultRounds);
m_paddingBytes = parserResults["PaddingBytes"].as<uint32_t>(DefaultPaddingBytes);
m_partitionId = parserResults["PartitionId"].as<std::string>(DefaultPartitionId);
m_maxTimeouts = parserResults["MaxTimeouts"].as<uint32_t>(DefaultMaxTimeouts);
m_verbose = parserResults["verbose"].as<bool>(false);
m_useSasCredential = parserResults["UseSasCredential"].as<bool>(true);
if (m_rounds == 0xffffffff)
{
m_rounds = (std::numeric_limits<uint32_t>::max)();
}
auto span{CreateStressSpan("Initialize")};
span.first->SetAttribute("NumberToSend", m_numberToSend);
span.first->SetAttribute("BatchSize", m_batchSize);
span.first->SetAttribute("BatchDuration", m_batchDuration.count());
span.first->SetAttribute("Prefetch", m_prefetchCount);
span.first->SetAttribute("Rounds", m_rounds);
span.first->SetAttribute("PaddingBytes", m_paddingBytes);
span.first->SetAttribute("PartitionId", m_partitionId);
span.first->SetAttribute("MaxTimeouts", m_maxTimeouts);
span.first->SetAttribute("Verbose", m_verbose);
span.first->SetAttribute("UseSasCredential", m_useSasCredential);
// m_sleepAfter =
// parserResults["SleepAfter"].as<std::chrono::system_clock::duration>(std::chrono::seconds(0));
m_eventHubName = Azure::Core::_internal::Environment::GetVariable("EVENTHUB_NAME");
if (m_eventHubName.empty())
{
GetLogger()->Fatal("Could not find required environment variable EVENTHUB_NAME");
std::cerr << "Missing required environment variable EVENTHUB_NAME" << std::endl;
throw std::runtime_error("Missing environment variable, aborting.");
}
m_eventHubNamespace = Azure::Core::_internal::Environment::GetVariable("EVENTHUBS_HOST");
if (m_eventHubNamespace.empty())
{
GetLogger()->Fatal("Could not find required environment variable EVENTHUBS_HOST");
std::cerr << "Missing required environment variable EVENTHUBS_HOST" << std::endl;
throw std::runtime_error("Missing environment variable, aborting.");
}
if (m_useSasCredential)
{
m_eventHubConnectionString
= Azure::Core::_internal::Environment::GetVariable("EVENTHUB_CONNECTION_STRING");
if (m_eventHubConnectionString.empty())
{
std::cerr << "Missing required environment variable EVENTHUB_CONNECTION_STRING" << std::endl;
GetLogger()->Fatal("Could not find required environment variable EVENTHUB_NAME");
throw std::runtime_error("Missing environment variable, aborting.");
}
m_checkpointStoreConnectionString
= Azure::Core::_internal::Environment::GetVariable("CHECKPOINT_STORE_CONNECTION_STRING");
}
}
void BatchStressTest::Run()
{
std::cout << "Run " << std::endl;
auto sendOutput = SendMessages();
std::cout << "Starting receive tests for partition " << m_partitionId << std::endl;
std::cout << " Start position: " << sendOutput.first << std::endl
<< " End position: " << sendOutput.second.LastEnqueuedSequenceNumber << std::endl;
ReceiveMessages(sendOutput.first);
}
void BatchStressTest::Cleanup() {}
std::pair<
Azure::Messaging::EventHubs::Models::StartPosition,
Azure::Messaging::EventHubs::Models::EventHubPartitionProperties>
BatchStressTest::SendMessages()
{
std::unique_ptr<Azure::Messaging::EventHubs::ProducerClient> producerClient;
if (m_useSasCredential)
{
producerClient = std::make_unique<Azure::Messaging::EventHubs::ProducerClient>(
m_eventHubConnectionString, m_eventHubName);
}
else
{
producerClient = std::make_unique<Azure::Messaging::EventHubs::ProducerClient>(
m_eventHubNamespace,
m_eventHubName,
std::make_shared<Azure::Identity::EnvironmentCredential>());
}
Azure::Core::Context context;
auto scopeGuard{
sg::make_scope_guard([&context, &producerClient]() { producerClient->Close(context); })};
try
{
EventSenderOptions senderOptions;
senderOptions.PartitionId = m_partitionId;
senderOptions.MessageLimit = m_numberToSend;
senderOptions.NumberOfExtraBytes = m_paddingBytes;
auto sendEventsResult
= EventSender::SendEventsToPartition(producerClient, senderOptions, context);
producerClient->Close(context);
return std::make_pair(sendEventsResult.first, sendEventsResult.second);
}
catch (std::exception const& ex)
{
GetTracer()->GetCurrentSpan()->AddEvent(
"Exception received", {{trace::SemanticConventions::kExceptionMessage, ex.what()}});
std::cerr << "Exception " << ex.what();
throw;
}
}
void BatchStressTest::ReceiveMessages(
Azure::Messaging::EventHubs::Models::StartPosition const& startPosition)
{
auto span{CreateStressSpan("ReceiveMessages")};
try
{
Azure::Core::Context context;
ConsumerClientOptions clientOptions;
clientOptions.ApplicationID = "StressConsumerClient";
std::unique_ptr<ConsumerClient> consumerClient;
if (m_useSasCredential)
{
consumerClient = std::make_unique<ConsumerClient>(
m_eventHubConnectionString, m_eventHubName, DefaultConsumerGroup, clientOptions);
}
else
{
consumerClient = std::make_unique<ConsumerClient>(
m_eventHubNamespace,
m_eventHubName,
std::make_shared<Azure::Identity::EnvironmentCredential>());
}
{
auto getPartitionPropertiesSpan{
CreateStressSpan("ReceiveMessages::GetPartitionProperties to warm up connection")};
auto consumerProperties = consumerClient->GetEventHubProperties(context);
}
for (auto round = 0u; round < m_rounds; round += 1)
{
std::cout << "Round " << round << std::endl;
auto consumeForTesterSpan{CreateStressSpan("ConsumeForBatchTester")};
consumeForTesterSpan.first->SetAttribute("Round", round);
ConsumeForBatchTester(round, *consumerClient, startPosition, context);
}
consumerClient->Close(context);
}
catch (std::exception const& ex)
{
GetTracer()->GetCurrentSpan()->AddEvent(
"Exception received", {{trace::SemanticConventions::kExceptionMessage, ex.what()}});
std::cerr << "Exception " << ex.what();
throw;
}
}
void BatchStressTest::ConsumeForBatchTester(
uint32_t round,
ConsumerClient& client,
Models::StartPosition const& startPosition,
Azure::Core::Context const& context) const
{
std::unique_ptr<PartitionClient> partitionClient;
{
auto span{CreateStressSpan("ConsumeForBatchTester::CreatePartitionClient")};
PartitionClientOptions partitionOptions;
partitionOptions.StartPosition = startPosition;
partitionOptions.Prefetch = m_prefetchCount;
partitionClient = std::make_unique<PartitionClient>(
client.CreatePartitionClient(m_partitionId, partitionOptions));
std::cout << "[r: " << round << "/" << m_rounds << "p: " << m_partitionId
<< "] Starting to receive messages from partition" << std::endl;
}
size_t total = 0;
uint32_t numCancels = 0;
constexpr const uint32_t cancelLimit = 5;
do
{
auto duration = std::chrono::system_clock::now() + m_batchDuration;
auto receiveContext = context.WithDeadline(duration);
try
{
auto span{CreateStressSpan("ConsumeForBatchTester::ReceiveEvents")};
auto events = partitionClient->ReceiveEvents(m_batchSize, receiveContext);
total += events.size();
std::cout << "Total: " << total << std::endl;
if (total >= m_numberToSend)
{
break;
}
}
catch (Azure::Messaging::EventHubs::EventHubsException& ex)
{
std::cerr << "Exception thrown while receiving messages." << ex.what() << std::endl;
if (!ex.IsTransient)
{
std::cerr << "Error is not transient, aborting test." << std::endl;
throw;
}
}
catch (Azure::Core::OperationCancelledException&)
{
numCancels += 1;
if (numCancels > cancelLimit)
{
std::cerr << "cancellation errors were received " << numCancels
<< " times in a row. Stoping test." << std::endl;
throw std::runtime_error("Too many cancellations received in a row, aborting test.");
}
else
{
std::cout << "received " << total << "/" << m_numberToSend << "then received error";
}
}
} while (true);
}

View File

@ -0,0 +1,33 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "opentelemetry_helpers.hpp"
#include "eventhubs_stress_scenarios.hpp"
opentelemetry::nostd::shared_ptr<opentelemetry::logs::Logger> GetLogger()
{
auto logger{opentelemetry::logs::Provider::GetLoggerProvider()->GetLogger(EventHubsLoggerName)};
return logger;
}
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer> GetTracer()
{
return opentelemetry::trace::Provider::GetTracerProvider()->GetTracer(EventHubsLoggerName);
}
std::pair<opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>, opentelemetry::trace::Scope>
CreateStressSpan(std::string const& name)
{
auto tracer = GetTracer();
opentelemetry::trace::StartSpanOptions options;
options.parent = tracer->GetCurrentSpan()->GetContext();
options.kind = opentelemetry::trace::SpanKind::kClient;
auto newSpan = tracer->StartSpan(name, options);
auto scope{tracer->WithActiveSpan(newSpan)};
return std::make_pair<
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>,
opentelemetry::trace::Scope>(std::move(newSpan), std::move(scope));
}

View File

@ -124,5 +124,5 @@ resource storageAccountName_default_container 'Microsoft.Storage/storageAccounts
}
output EVENTHUB_NAME string = eventHubName
output EVENTHUB_CONNECTION_STRING string = listKeys(resourceId('Microsoft.EventHub/namespaces/authorizationRules', namespaceName, 'RootManageSharedAccessKey'), apiVersion).primaryConnectionString
output CHECKPOINTSTORE_STORAGE_CONNECTION_STRING string = 'DefaultEndpointsProtocol=https;AccountName=${storageAccountName};AccountKey=${listKeys(storageAccount.id, storageApiVersion).keys[0].value};EndpointSuffix=${storageEndpointSuffix}'
output EVENTHUB_CONNECTION_STRING string = '"${listKeys(resourceId('Microsoft.EventHub/namespaces/authorizationRules', namespaceName, 'RootManageSharedAccessKey'), apiVersion).primaryConnectionString}"'
output CHECKPOINTSTORE_STORAGE_CONNECTION_STRING string = '"DefaultEndpointsProtocol=https;AccountName=${storageAccountName};AccountKey=${listKeys(storageAccount.id, storageApiVersion).keys[0].value};EndpointSuffix=${storageEndpointSuffix}"'

View File

@ -6,17 +6,29 @@ metadata:
labels:
chaos: "{{ default false .Stress.chaos }}"
spec:
shareProcessNamespace: true
containers:
- name: otel-collector
# image: mcr.microsoft.com/oss/otel/opentelemetry-collector-contrib:0.94.0
image: stresspgs7b6dif73rup6.azurecr.io/stress/opentelemetry-collector-contrib-shell:0.94.0
imagePullPolicy: Always
resources:
limits:
memory: 500Mi
cpu: "0.5"
env:
- name: APPLICATIONINSIGHTS_CONNECTION_STRING
value: "TO BE FILLED IN LATER."
- name: main
image: {{ .Stress.imageTag }}
imagePullPolicy: Always
command:
[
"valgrind",
"--tool=memcheck",
"-s",
"./azure-messaging-eventhubs-stress-test",
]
command: ['bash', '-c']
args:
- |
set -a;
source $ENV_FILE;
./azure-messaging-eventhubs-stress-test produce BatchStressTest;
kill `pidof otelcol-contrib`;
{{- include "stress-test-addons.container-env" . | nindent 6 }}
resources:
limits:

View File

@ -3,6 +3,10 @@
"version-string": "1.0.0",
"dependencies": [
"azure-core-cpp",
"azure-core-amqp-cpp"
"azure-core-amqp-cpp",
{
"name": "opentelemetry-cpp",
"features": [ "otlp-http" ]
}
]
}

View File

@ -30,8 +30,8 @@ stages:
CtestRegex: "azure-messaging-eventhubs.*"
LiveTestCtestRegex: "azure-messaging-eventhubs.*"
LiveTestTimeoutInMinutes: 120
LineCoverageTarget: 40
BranchCoverageTarget: 20
LineCoverageTarget: 43
BranchCoverageTarget: 22
Artifacts:
- Name: azure-messaging-eventhubs
Path: azure-messaging-eventhubs

View File

@ -16,6 +16,7 @@
},
{
"name": "opentelemetry-cpp",
"features": ["otlp-http"],
"platform": "!(windows & !static)",
"version>=": "1.3.0"
},