From aadd25dcd1b229c14d53a17726c7721658a3e89a Mon Sep 17 00:00:00 2001 From: Larry Osterman Date: Thu, 22 Feb 2024 20:41:18 -0800 Subject: [PATCH] Reworked EventHubs Stress test to export information to OpenTelemetry (#5370) # Significant rewrite of Eventhubs stress test. ## Cleaned up implementation of EventHubs stress/reliability test * Reworked stress test layout to isolate stress test implementation from stress deployment elements. * Added ability of stress test scenarios to be independently configured. * Rewrote stress test app to be closer to the Go equivalent app. * added apparg.hpp for command line parsing * added scope_guard to test for scope_guard implementation. ## Added close methods to Consumer client and Producer client. They don't do much currently, but will eventually. ## AI Generated summary of pull request This pull request primarily focuses on the `azure-core-amqp` and `azure-messaging-eventhubs` packages, introducing changes to improve code efficiency and maintainability. The most significant changes include the removal of unnecessary `#include` directives in various files in the `azure-core-amqp` package, the addition of `Close` methods in `consumer_client.hpp` and `producer_client.hpp` in the `azure-messaging-eventhubs` package, and modifications to the `cgmanifest.json` files in the `azure-messaging-eventhubs-checkpointstore-blob` and `azure-messaging-eventhubs` directories. Removal of unnecessary `#include` directives: * [`sdk/core/azure-core-amqp/src/amqp/claim_based_security.cpp`](diffhunk://#diff-5acd7049cef5955540cc4253f264207e5a7d2701612e148c736ca5781e69d224L12-L14): Removed unnecessary `#include` directives for `iostream` and `sstream`. * [`sdk/core/azure-core-amqp/src/amqp/connection.cpp`](diffhunk://#diff-fc3a6e5b11f1254c4fd344bcd1846c83ad4ca8e2a8a23b7db0657c2846f43937L18): Removed unnecessary `#include` directive for `azure/core/url.hpp`. * [`sdk/core/azure-core-amqp/src/amqp/connection_string_credential.cpp`](diffhunk://#diff-14cf130dc5f0b51f698cca57724b733591d48fad0e5beb4745fc1cd78cbdaa72L7-L20): Removed unnecessary `#include` directives for `azure/core/amqp/internal/models/amqp_protocol.hpp`, `azure_c_shared_utility/azure_base64.h`, `azure_c_shared_utility/buffer_.h`, and `iostream`. * [`sdk/core/azure-core-amqp/src/amqp/link.cpp`](diffhunk://#diff-249643f29ca2c0b1226e9d22ce90be83c77105f55221a1092fb605a5c7ead356L10-L21): Removed unnecessary `#include` directives for `azure/core/amqp/internal/message_receiver.hpp`, `azure/core/amqp/internal/message_sender.hpp`, `azure/core/amqp/internal/models/messaging_values.hpp`, and `azure_uamqp_c/amqp_definitions_sequence_no.h`. * [`sdk/core/azure-core-amqp/src/amqp/management.cpp`](diffhunk://#diff-b03544340ff264961e045648c2c274dde9fa7ceb33b07b0c41a708128e28d40fL6-L16): Removed unnecessary `#include` directives for `azure/core/amqp/internal/connection.hpp`, `azure/core/amqp/internal/session.hpp`, and `azure_uamqp_c/amqp_management.h`. Addition of `Close` methods: * [`sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/consumer_client.hpp`](diffhunk://#diff-fad00d7bec0f12ef5e7d36387e866ab5291c6f5f57568845dfabe3f23320c899R152-R159): Added a `Close` method to the `ConsumerClient` class. * [`sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/producer_client.hpp`](diffhunk://#diff-cba1fbd1d786b763c27123a2284476e1cc8093753abe6e3a6af9ca78aae4d594R96-R113): Added a `Close` method to the `ProducerClient` class. Modification of `cgmanifest.json` files: * [`sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/cgmanifest.json`](diffhunk://#diff-465b096c14ed240a8f14180f16e0924e3df69bd1c95934dca435c0b3c97468a0R1-R37): Added a new `cgmanifest.json` file specifying development dependencies. * [`sdk/eventhubs/azure-messaging-eventhubs/cgmanifest.json`](diffhunk://#diff-f3e86a1f7be148625ac80b2151504461f3faff4d0564db588c6df9d0d9eb0986R1-R48): Added a new `cgmanifest.json` file specifying development dependencies. * --------- Co-authored-by: Rick Winter --- .vscode/cspell.json | 14 +- .../src/amqp/claim_based_security.cpp | 3 - .../azure-core-amqp/src/amqp/connection.cpp | 1 - .../src/amqp/connection_string_credential.cpp | 4 - sdk/core/azure-core-amqp/src/amqp/link.cpp | 6 - .../azure-core-amqp/src/amqp/management.cpp | 4 - .../src/amqp/message_receiver.cpp | 6 - .../src/amqp/message_sender.cpp | 4 - .../src/amqp/private/connection_impl.hpp | 1 - .../src/amqp/private/link_impl.hpp | 1 - .../amqp/private/message_receiver_impl.hpp | 6 - .../src/amqp/private/message_sender_impl.hpp | 3 - .../src/amqp/private/session_impl.hpp | 2 - .../vcpkg.json | 5 +- .../cgmanifest.json | 37 + .../azure-messaging-eventhubs/cgmanifest.json | 48 + .../messaging/eventhubs/consumer_client.hpp | 8 + .../messaging/eventhubs/producer_client.hpp | 20 +- .../src/consumer_client.cpp | 9 + .../test/eventhubs-stress-test/CMakeLists.txt | 24 +- .../test/eventhubs-stress-test/Chart.lock | 6 - .../test/eventhubs-stress-test/Dockerfile | 6 +- .../test/eventhubs-stress-test/README.md | 2 +- .../eventhubs_stress_test.cpp | 241 --- .../scenarios-matrix.yaml | 2 +- .../eventhubs-stress-test/src/CMakeLists.txt | 60 + .../src/eventhubs_stress_test.cpp | 296 ++++ .../eventhubs-stress-test/src/inc/argagg.hpp | 1541 +++++++++++++++++ .../src/inc/eventhubs_stress_scenarios.hpp | 61 + .../src/scenarios/inc/batch_stress_tests.hpp | 66 + .../src/scenarios/inc/event_sender.hpp | 114 ++ .../scenarios/inc/opentelemetry_helpers.hpp | 18 + .../src/scenarios/inc/scope_guard.hpp | 184 ++ .../src/scenarios/src/batch_stress_tests.cpp | 357 ++++ .../scenarios/src/opentelemetry_helpers.cpp | 33 + .../stress-test-resources.bicep | 4 +- .../templates/deploy-job.yaml | 26 +- .../azure-messaging-eventhubs/vcpkg.json | 6 +- sdk/eventhubs/ci.yml | 4 +- vcpkg.json | 1 + 40 files changed, 2907 insertions(+), 327 deletions(-) create mode 100644 sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/cgmanifest.json create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/cgmanifest.json delete mode 100644 sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/Chart.lock delete mode 100644 sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/eventhubs_stress_test.cpp create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/CMakeLists.txt create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/eventhubs_stress_test.cpp create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/inc/argagg.hpp create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/inc/eventhubs_stress_scenarios.hpp create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/inc/batch_stress_tests.hpp create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/inc/event_sender.hpp create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/inc/opentelemetry_helpers.hpp create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/inc/scope_guard.hpp create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/src/batch_stress_tests.cpp create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/src/opentelemetry_helpers.cpp diff --git a/.vscode/cspell.json b/.vscode/cspell.json index c389a7584..89c103fdd 100644 --- a/.vscode/cspell.json +++ b/.vscode/cspell.json @@ -343,6 +343,17 @@ "Sylvain" ] }, + { + "filename": "**/sdk/eventhubs/azure-messaging-eventhubs/**/*", + "words": [ + "otlp", + "Otlp", + "OTLP", + "exportings", + "ricab", + "noarg" + ] + }, { "filename": "**/sdk/identity/azure-identity/**", "words": [ @@ -413,7 +424,8 @@ { "filename": "**/vcpkg.json", "words": [ - "umock" + "umock", + "otlp" ] } ], diff --git a/sdk/core/azure-core-amqp/src/amqp/claim_based_security.cpp b/sdk/core/azure-core-amqp/src/amqp/claim_based_security.cpp index 425d89e16..35e78cfb3 100644 --- a/sdk/core/azure-core-amqp/src/amqp/claim_based_security.cpp +++ b/sdk/core/azure-core-amqp/src/amqp/claim_based_security.cpp @@ -9,9 +9,6 @@ #include #include -#include -#include - using namespace Azure::Core::Diagnostics::_internal; using namespace Azure::Core::Diagnostics; diff --git a/sdk/core/azure-core-amqp/src/amqp/connection.cpp b/sdk/core/azure-core-amqp/src/amqp/connection.cpp index 1111f2d55..e08ffee1e 100644 --- a/sdk/core/azure-core-amqp/src/amqp/connection.cpp +++ b/sdk/core/azure-core-amqp/src/amqp/connection.cpp @@ -15,7 +15,6 @@ #include #include -#include #include #include diff --git a/sdk/core/azure-core-amqp/src/amqp/connection_string_credential.cpp b/sdk/core/azure-core-amqp/src/amqp/connection_string_credential.cpp index 461a9245d..9e35d0c1c 100644 --- a/sdk/core/azure-core-amqp/src/amqp/connection_string_credential.cpp +++ b/sdk/core/azure-core-amqp/src/amqp/connection_string_credential.cpp @@ -4,20 +4,16 @@ #include "azure/core/amqp/internal/connection_string_credential.hpp" #include "azure/core/amqp/internal/connection.hpp" -#include "azure/core/amqp/internal/models/amqp_protocol.hpp" #include "azure/core/amqp/internal/network/socket_transport.hpp" #include #include -#include -#include #include #include #include #include -#include #include #include #include diff --git a/sdk/core/azure-core-amqp/src/amqp/link.cpp b/sdk/core/azure-core-amqp/src/amqp/link.cpp index f5eedf236..44a05d594 100644 --- a/sdk/core/azure-core-amqp/src/amqp/link.cpp +++ b/sdk/core/azure-core-amqp/src/amqp/link.cpp @@ -7,18 +7,12 @@ #include "../models/private/performatives/transfer_impl.hpp" #include "../models/private/value_impl.hpp" #include "azure/core/amqp/internal/common/completion_operation.hpp" -#include "azure/core/amqp/internal/message_receiver.hpp" -#include "azure/core/amqp/internal/message_sender.hpp" #include "azure/core/amqp/internal/models/message_source.hpp" #include "azure/core/amqp/internal/models/message_target.hpp" -#include "azure/core/amqp/internal/models/messaging_values.hpp" #include "private/link_impl.hpp" #include "private/session_impl.hpp" -#include - #include -#include namespace Azure { namespace Core { namespace Amqp { namespace _detail { #if defined(TESTING_BUILD) diff --git a/sdk/core/azure-core-amqp/src/amqp/management.cpp b/sdk/core/azure-core-amqp/src/amqp/management.cpp index 2b1f4fa5c..517deb4bc 100644 --- a/sdk/core/azure-core-amqp/src/amqp/management.cpp +++ b/sdk/core/azure-core-amqp/src/amqp/management.cpp @@ -3,9 +3,7 @@ #include "azure/core/amqp/internal/management.hpp" -#include "azure/core/amqp/internal/connection.hpp" #include "azure/core/amqp/internal/models/messaging_values.hpp" -#include "azure/core/amqp/internal/session.hpp" #include "azure/core/amqp/models/amqp_message.hpp" #include "private/connection_impl.hpp" #include "private/management_impl.hpp" @@ -13,8 +11,6 @@ #include #include -#include - #include #include #include diff --git a/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp b/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp index 06973936d..2762bb5d3 100644 --- a/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp +++ b/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp @@ -8,24 +8,18 @@ #include "../models/private/message_impl.hpp" #include "../models/private/value_impl.hpp" -#include "azure/core/amqp/internal/connection.hpp" -#include "azure/core/amqp/internal/connection_string_credential.hpp" #include "azure/core/amqp/internal/link.hpp" #include "azure/core/amqp/internal/models/messaging_values.hpp" -#include "azure/core/amqp/internal/session.hpp" #include "azure/core/amqp/models/amqp_message.hpp" #include "private/message_receiver_impl.hpp" -#include #include #include #include #include -#include #include -#include using namespace Azure::Core::Diagnostics::_internal; using namespace Azure::Core::Diagnostics; diff --git a/sdk/core/azure-core-amqp/src/amqp/message_sender.cpp b/sdk/core/azure-core-amqp/src/amqp/message_sender.cpp index 5240bf7ce..b58c2ff28 100644 --- a/sdk/core/azure-core-amqp/src/amqp/message_sender.cpp +++ b/sdk/core/azure-core-amqp/src/amqp/message_sender.cpp @@ -7,16 +7,12 @@ #include "../models/private/error_impl.hpp" #include "../models/private/message_impl.hpp" #include "../models/private/value_impl.hpp" -#include "azure/core/amqp/internal/claims_based_security.hpp" #include "azure/core/amqp/internal/common/completion_operation.hpp" -#include "azure/core/amqp/internal/models/messaging_values.hpp" -#include "azure/core/amqp/internal/session.hpp" #include "azure/core/amqp/models/amqp_message.hpp" #include "private/connection_impl.hpp" #include "private/message_sender_impl.hpp" #include "private/session_impl.hpp" -#include #include #include #include diff --git a/sdk/core/azure-core-amqp/src/amqp/private/connection_impl.hpp b/sdk/core/azure-core-amqp/src/amqp/private/connection_impl.hpp index 7a512a614..413e8f6f3 100644 --- a/sdk/core/azure-core-amqp/src/amqp/private/connection_impl.hpp +++ b/sdk/core/azure-core-amqp/src/amqp/private/connection_impl.hpp @@ -9,7 +9,6 @@ #include "unique_handle.hpp" #include -#include #include diff --git a/sdk/core/azure-core-amqp/src/amqp/private/link_impl.hpp b/sdk/core/azure-core-amqp/src/amqp/private/link_impl.hpp index 1a1d81fb6..8b55565ef 100644 --- a/sdk/core/azure-core-amqp/src/amqp/private/link_impl.hpp +++ b/sdk/core/azure-core-amqp/src/amqp/private/link_impl.hpp @@ -13,7 +13,6 @@ #include -#include #include #include #include diff --git a/sdk/core/azure-core-amqp/src/amqp/private/message_receiver_impl.hpp b/sdk/core/azure-core-amqp/src/amqp/private/message_receiver_impl.hpp index cd7cd1266..0f4c479bb 100644 --- a/sdk/core/azure-core-amqp/src/amqp/private/message_receiver_impl.hpp +++ b/sdk/core/azure-core-amqp/src/amqp/private/message_receiver_impl.hpp @@ -4,21 +4,15 @@ #pragma once #include "azure/core/amqp/internal/message_receiver.hpp" -#include "claims_based_security_impl.hpp" -#include "connection_impl.hpp" #include "link_impl.hpp" -#include "message_receiver_impl.hpp" #include "session_impl.hpp" #include "unique_handle.hpp" -#include - #include #include #include #include -#include namespace Azure { namespace Core { namespace Amqp { namespace _detail { template <> struct UniqueHandleHelper diff --git a/sdk/core/azure-core-amqp/src/amqp/private/message_sender_impl.hpp b/sdk/core/azure-core-amqp/src/amqp/private/message_sender_impl.hpp index c61f91904..f33df78d6 100644 --- a/sdk/core/azure-core-amqp/src/amqp/private/message_sender_impl.hpp +++ b/sdk/core/azure-core-amqp/src/amqp/private/message_sender_impl.hpp @@ -4,14 +4,11 @@ #pragma once #include "azure/core/amqp/internal/message_sender.hpp" -#include "claims_based_security_impl.hpp" #include "link_impl.hpp" #include "unique_handle.hpp" #include -#include - namespace Azure { namespace Core { namespace Amqp { namespace _detail { template <> struct UniqueHandleHelper { diff --git a/sdk/core/azure-core-amqp/src/amqp/private/session_impl.hpp b/sdk/core/azure-core-amqp/src/amqp/private/session_impl.hpp index 4ff8440e2..af3b09f69 100644 --- a/sdk/core/azure-core-amqp/src/amqp/private/session_impl.hpp +++ b/sdk/core/azure-core-amqp/src/amqp/private/session_impl.hpp @@ -10,10 +10,8 @@ #include -#include #include #include -#include namespace Azure { namespace Core { namespace Amqp { namespace _detail { template <> struct UniqueHandleHelper diff --git a/sdk/core/azure-core-tracing-opentelemetry/vcpkg.json b/sdk/core/azure-core-tracing-opentelemetry/vcpkg.json index d1166606e..e79bbd4c3 100644 --- a/sdk/core/azure-core-tracing-opentelemetry/vcpkg.json +++ b/sdk/core/azure-core-tracing-opentelemetry/vcpkg.json @@ -4,10 +4,7 @@ "supports": "!(windows & !static)", "dependencies": [ "azure-core-cpp", - { - "name": "opentelemetry-cpp", - "platform": "!(windows & !static)" - }, + "opentelemetry-cpp", { "name": "vcpkg-cmake", "host": true diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/cgmanifest.json b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/cgmanifest.json new file mode 100644 index 000000000..1b872bc72 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/cgmanifest.json @@ -0,0 +1,37 @@ +{ + "$schema": "https://json.schemastore.org/component-detection-manifest.json", + "Registrations": [ + { + "Component": { + "Type": "git", + "git": { + "RepositoryUrl": "https://github.com/google/googletest", + "CommitHash": "703bd9caab50b139428cea1aaff9974ebee5742e" + } + }, + "DevelopmentDependency": true + }, + { + "Component": { + "Type": "other", + "Other": { + "Name": "clang-format", + "Version": "9.0.0-2", + "DownloadUrl": "https://ubuntu.pkgs.org/18.04/ubuntu-updates-universe-amd64/clang-format-9_9-2~ubuntu18.04.2_amd64.deb.html" + } + }, + "DevelopmentDependency": true + }, + { + "Component": { + "Type": "other", + "Other": { + "Name": "doxygen", + "Version": "1.8.20", + "DownloadUrl": "http://doxygen.nl/files/doxygen-1.8.20-setup.exe" + } + }, + "DevelopmentDependency": true + } + ] +} diff --git a/sdk/eventhubs/azure-messaging-eventhubs/cgmanifest.json b/sdk/eventhubs/azure-messaging-eventhubs/cgmanifest.json new file mode 100644 index 000000000..e0eee0b7f --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/cgmanifest.json @@ -0,0 +1,48 @@ +{ + "$schema": "https://json.schemastore.org/component-detection-manifest.json", + "Registrations": [ + { + "Component": { + "Type": "git", + "git": { + "RepositoryUrl": "https://github.com/google/googletest", + "CommitHash": "703bd9caab50b139428cea1aaff9974ebee5742e" + } + }, + "DevelopmentDependency": true + }, + { + "Component": { + "Type": "git", + "git": { + "RepositoryUrl": "https://github.com/ricab/scope_guard", + "CommitHash": "71a04528184db1749dd08ebbbf4daf3b5dca21fd", + "Branch": "master" + } + }, + "DevelopmentDependency": true + }, + { + "Component": { + "Type": "other", + "Other": { + "Name": "clang-format", + "Version": "9.0.0-2", + "DownloadUrl": "https://ubuntu.pkgs.org/18.04/ubuntu-updates-universe-amd64/clang-format-9_9-2~ubuntu18.04.2_amd64.deb.html" + } + }, + "DevelopmentDependency": true + }, + { + "Component": { + "Type": "other", + "Other": { + "Name": "doxygen", + "Version": "1.8.20", + "DownloadUrl": "http://doxygen.nl/files/doxygen-1.8.20-setup.exe" + } + }, + "DevelopmentDependency": true + } + ] +} diff --git a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/consumer_client.hpp b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/consumer_client.hpp index 81b706308..f06db1574 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/consumer_client.hpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/consumer_client.hpp @@ -149,6 +149,14 @@ namespace Azure { namespace Messaging { namespace EventHubs { PartitionClientOptions const& options = {}, Azure::Core::Context const& context = {}); + /** + * @brief Closes the consumer client canceling any operations outstanding on any of the existing + * partition clients. + * + * @param context The context for the operation can be used for request cancellation. + */ + void Close(Azure::Core::Context const& context); + /**@brief GetEventHubProperties gets properties of an eventHub. This includes data * like name, and partitions. * diff --git a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/producer_client.hpp b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/producer_client.hpp index b683904af..e5f9c0dfa 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/producer_client.hpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/producer_client.hpp @@ -92,13 +92,29 @@ namespace Azure { namespace Messaging { namespace EventHubs { std::shared_ptr credential, ProducerClientOptions options = {}); - ~ProducerClient() + ~ProducerClient() {} + + /** @brief Close all the connections and sessions. + * + * @param context Context for the operation can be used for request cancellation. + */ + void Close(Azure::Core::Context const& context = {}) { for (auto& sender : m_senders) { - sender.second.Close(); + sender.second.Close(context); } m_senders.clear(); + // Other possible things we might want to do in close, but cannot quite do yet because it + // doesn't necessarily work correctly. + // for (auto& session : m_sessions) + // { + // session.second.Close(context); + // } + // for (auto& connection : m_connections) + // { + // connection.second.Close(context); + // } } /** @brief Create a new EventDataBatch to be sent to the Event Hub. diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/consumer_client.cpp b/sdk/eventhubs/azure-messaging-eventhubs/src/consumer_client.cpp index 47979b1d9..1a79d8c21 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/consumer_client.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/consumer_client.cpp @@ -67,6 +67,15 @@ namespace Azure { namespace Messaging { namespace EventHubs { }; } + void ConsumerClient::Close(Azure::Core::Context const& context) + { + for (auto& sender : m_receivers) + { + sender.second.Close(context); + } + m_receivers.clear(); + } + Azure::Core::Amqp::_internal::Connection ConsumerClient::CreateConnection( std::string const& partitionId) const { diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/CMakeLists.txt b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/CMakeLists.txt index ab6ec741a..2f9c80ff0 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/CMakeLists.txt +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/CMakeLists.txt @@ -1,19 +1,13 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. -cmake_minimum_required(VERSION 3.13) +# CMakeList.txt : Top-level CMake project file, do global configuration +# and include sub-projects here. +# +cmake_minimum_required (VERSION 3.13) -project(azure-messaging-eventhubs-stress-test LANGUAGES CXX) -set(CMAKE_CXX_STANDARD 14) -set(CMAKE_CXX_STANDARD_REQUIRED True) - -add_executable( - azure-messaging-eventhubs-stress-test - eventhubs_stress_test.cpp -) - -target_link_libraries(azure-messaging-eventhubs-stress-test PRIVATE azure-messaging-eventhubs azure-core azure-identity) - -create_map_file(azure-messaging-eventhubs-stress-test azure-messaging-eventhubs-stress-test.map) -file(COPY ${CMAKE_CURRENT_BINARY_DIR} - DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/bin) +# Include sub-projects. +if (BUILD_TESTING) +# stress tests are categorized as normal tests. +add_subdirectory ("src") +endif() diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/Chart.lock b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/Chart.lock deleted file mode 100644 index acdc794c7..000000000 --- a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/Chart.lock +++ /dev/null @@ -1,6 +0,0 @@ -dependencies: -- name: stress-test-addons - repository: https://stresstestcharts.blob.core.windows.net/helm/ - version: 0.2.0 -digest: sha256:59fff3930e78c4ca9f9c0120433c7695d31db63f36ac61d50abcc91b1f1835a0 -generated: "2023-07-24T13:06:32.3351603-07:00" diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/Dockerfile b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/Dockerfile index cecca18e2..704020e39 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/Dockerfile +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/Dockerfile @@ -6,7 +6,7 @@ FROM mcr.microsoft.com/mirror/docker/library/ubuntu:22.04 as build # install the mem check tool along side the other deps RUN apt-get update -y -RUN apt-get install -y gcc cmake make g++ git zip unzip build-essential pkg-config wget curl valgrind +RUN apt-get install -y gcc cmake make g++ git zip unzip build-essential pkg-config wget curl valgrind python3 RUN wget -O vcpkg.tar.gz https://github.com/microsoft/vcpkg/archive/master.tar.gz RUN mkdir /opt/vcpkg RUN tar xf vcpkg.tar.gz --strip-components=1 -C /opt/vcpkg @@ -22,7 +22,7 @@ WORKDIR /build # version of the packages that will be fetched. # So when building from root we need to match the two values. When not building from root if the vcpkg file # does not specify a baseline the value set in the cmake file will ensure that we are at the desired level. -RUN cmake -DCMAKE_BUILD_TYPE=Release -DBUILD_TESTING=ON -DBUILD_TRANSPORT_CURL=ON /src +RUN --mount=type=cache,target=/root/.cache/vcpkg/archives --mount=type=cache,target=/root/.m2 cmake -DCMAKE_BUILD_TYPE=Release -DBUILD_TESTING=ON -DBUILD_TRANSPORT_CURL=ON /src RUN cmake --build . --target azure-messaging-eventhubs-stress-test FROM mcr.microsoft.com/mirror/docker/library/ubuntu:22.04 @@ -32,7 +32,7 @@ RUN apt-get install -y valgrind WORKDIR / # copy the target binary -COPY --from=build ./build/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/azure-messaging-eventhubs-stress-test ./azure-messaging-eventhubs-stress-test +COPY --from=build ./build/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/azure-messaging-eventhubs-stress-test ./azure-messaging-eventhubs-stress-test RUN chmod +x ./azure-messaging-eventhubs-stress-test CMD ./azure-messaging-eventhubs-stress-test diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/README.md b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/README.md index e6f70026b..93adf4429 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/README.md +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/README.md @@ -26,4 +26,4 @@ Obviously after logging in to the acr "az acr login -n " To use another image you will need to go to line 12 in deploy job and update with your new file. -Once the deploy succeeds run " kubectl logs -n azuresdkforcpp -f libcurl-stress-test" to grab the logs in real time . \ No newline at end of file +After deployment succeeds, run " kubectl logs -n azuresdkforcpp -f eventhubs-stress-test" to grab the logs in real time . \ No newline at end of file diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/eventhubs_stress_test.cpp b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/eventhubs_stress_test.cpp deleted file mode 100644 index c666d21ec..000000000 --- a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/eventhubs_stress_test.cpp +++ /dev/null @@ -1,241 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -/** - * @brief Validates the Azure Core transport adapters with fault responses from server. - * - * @note This test requires the Http-fault-injector - * (https://github.com/Azure/azure-sdk-tools/tree/main/tools/http-fault-injector) running. Follow - * the instructions to install and run the server before running this test. - * - */ - -#define REQUESTS 100 -#define WARMUP 100 -#define ROUNDS 100 - -#include -#include -#include -#include -#include - -#include - -using namespace Azure::Messaging::EventHubs; - -class EventHubsStress { -public: - EventHubsStress() - { - m_eventHubName = Azure::Core::_internal::Environment::GetVariable("EVENTHUB_NAME"); - m_eventHubConnectionString - = Azure::Core::_internal::Environment::GetVariable("EVENTHUB_CONNECTION_STRING"); - m_checkpointStoreConnectionString - = Azure::Core::_internal::Environment::GetVariable("CHECKPOINT_STORE_CONNECTION_STRING"); - - m_numberToSend = 100; - m_batchSize = 100; - m_prefetchCount = 10; - m_messageBodySize = 1024; - - m_tenantId = Azure::Core::_internal::Environment::GetVariable("AZURE_TENANT_ID"); - m_clientId = Azure::Core::_internal::Environment::GetVariable("AZURE_CLIENT_ID"); - m_secret = Azure::Core::_internal::Environment::GetVariable("AZURE_CLIENT_SECRET"); - - if (m_eventHubConnectionString.empty()) - { - m_credential = std::make_shared( - m_tenantId, m_clientId, m_secret); - - m_client = std::make_unique( - m_eventHubConnectionString, m_eventHubName, m_credential); - } - else - { - m_client = std::make_unique( - m_eventHubConnectionString, m_eventHubName); - } - } - - void Warmup(int repetitions) - { - for (int i = 0; i < repetitions; i++) - { - std::cout << "Warmup " << i << std::endl; - SendMessages(); - ReceiveMessages(); - } - } - void Run(int repetitions) - { - for (int i = 0; i < repetitions; i++) - { - std::cout << "Run " << i << std::endl; - SendMessages(); - ReceiveMessages(); - } - } - void Cleanup() {} - -private: - std::string m_eventHubName; - std::string m_eventHubConnectionString; - std::string m_checkpointStoreConnectionString; - std::string m_partitionId{"0"}; - - std::string m_tenantId; - std::string m_clientId; - std::string m_secret; - - uint32_t m_numberToSend; - uint32_t m_batchSize; - uint32_t m_prefetchCount; - size_t m_messageBodySize; - - int m_rounds{10}; - - std::unique_ptr m_client; - std::shared_ptr m_credential; - - Models::StartPosition m_receiveStartPosition; - - void SendEventsToPartition(Azure::Core::Context const& context) - { - auto beforeSendProps = m_client->GetPartitionProperties(m_partitionId, context); - std::vector bodyData(m_messageBodySize, 'a'); - - Azure::Messaging::EventHubs::EventDataBatchOptions batchOptions; - batchOptions.PartitionId = m_partitionId; - Azure::Messaging::EventHubs::EventDataBatch batch(m_client->CreateBatch(batchOptions)); - for (uint32_t j = 0; j < m_numberToSend; ++j) - { - - Azure::Messaging::EventHubs::Models::EventData event; - event.Body = bodyData; - event.Properties["Number"] = j; - event.Properties["PartitionId"] - = static_cast(m_partitionId); - AddEndProperty(event, m_numberToSend); - batch.TryAddMessage(event); - } - m_client->Send(batch, context); - - auto afterSendProps = m_client->GetPartitionProperties(m_partitionId, context); - - m_receiveStartPosition.Inclusive = false; - m_receiveStartPosition.SequenceNumber = beforeSendProps.LastEnqueuedSequenceNumber; - } - - void AddEndProperty(Azure::Messaging::EventHubs::Models::EventData& event, uint64_t expectedCount) - { - event.Properties["End"] = expectedCount; - } - void SendMessages() - { - try - { - Azure::Core::Context context; - SendEventsToPartition(context); - } - catch (std::exception const& ex) - { - std::cerr << "Exception " << ex.what(); - throw; - } - } - void ReceiveMessages() - { - - try - { - Azure::Core::Context context; - ConsumerClientOptions clientOptions; - clientOptions.ApplicationID = "StressConsumerClient"; - - ConsumerClient consumerClient( - m_eventHubConnectionString, m_eventHubName, DefaultConsumerGroup, clientOptions); - - auto consumerProperties = consumerClient.GetEventHubProperties(context); - - std::cout << "Starting receive tests for partition " << m_partitionId << std::endl; - std::cout << " Start position: " << m_receiveStartPosition << std::endl; - - for (auto round = 0; round < m_rounds; round += 1) - { - ConsumeForBatchTester(round, consumerClient, m_receiveStartPosition, context); - } - } - catch (std::exception const& ex) - { - std::cerr << "Exception " << ex.what(); - throw; - } - } - void ConsumeForBatchTester( - uint32_t round, - ConsumerClient& client, - Models::StartPosition const& startPosition, - Azure::Core::Context const& context) - { - - PartitionClientOptions partitionOptions; - partitionOptions.StartPosition = startPosition; - partitionOptions.Prefetch = m_prefetchCount; - PartitionClient partitionClient{client.CreatePartitionClient(m_partitionId, partitionOptions)}; - std::cout << "[r: " << round << "/" << m_rounds << "p: " << m_partitionId - << "] Starting to receive messages from partition" << std::endl; - - size_t total = 0; - // uint32_t numCancels = 0; - // constexpr const uint32_t cancelLimit = 5; - - auto events = partitionClient.ReceiveEvents(m_batchSize, context); - total += events.size(); - std::cout << "Total: " << total << std::endl; - } -}; - -int main(int argc, char**) -{ - try - { - - EventHubsStress stressTest; - // some param was passed to the program, doesn't matter what it is, - // it is meant for the moment to just run a quick iteration to check for sanity of the test. - // since prototype TODO: pass in warmup/rounds/requests as params. - if (argc != 1) - { - std::cout << "--------------\tBUILD TEST\t--------------" << std::endl; - stressTest.Warmup(1); - stressTest.Run(5); - stressTest.Cleanup(); - std::cout << "--------------\tEND BUILD TEST\t--------------" << std::endl; - return 0; - } - - std::cout << "--------------\tSTARTING TEST\t--------------" << std::endl; - std::cout << "--------------\tPRE WARMUP\t--------------" << std::endl; - - stressTest.Warmup(WARMUP); - - std::cout << "--------------\tPOST WARMUP\t--------------" << std::endl; - - for (int i = 0; i < ROUNDS; i++) - { - std::cout << "--------------\tTEST ITERATION:" << i << "\t--------------" << std::endl; - - stressTest.Run(REQUESTS); - - std::cout << "--------------\tDONE ITERATION:" << i << "\t--------------" << std::endl; - } - - stressTest.Cleanup(); - } - catch (std::exception const& ex) - { - std::cerr << "Test failed due to exception thrown: " << ex.what() << std::endl; - } - return 0; -} diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/scenarios-matrix.yaml b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/scenarios-matrix.yaml index 8a1e49b48..260af35a2 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/scenarios-matrix.yaml +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/scenarios-matrix.yaml @@ -10,7 +10,7 @@ matrix: image: Dockerfile imageBuildDir: "../../../../../" scenarios: - constantDetach: + produceConsumeEvents: testTarget: azure-messaging-eventhubs-stress-test memory: "1.5Gi" diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/CMakeLists.txt b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/CMakeLists.txt new file mode 100644 index 000000000..9328d20d4 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/CMakeLists.txt @@ -0,0 +1,60 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +# cspell: words otlp + +cmake_minimum_required(VERSION 3.13) + +project(azure-messaging-eventhubs-stress-test LANGUAGES CXX) +set(CMAKE_CXX_STANDARD 14) +set(CMAKE_CXX_STANDARD_REQUIRED True) + +set(WITH_STL ON) + +find_package(opentelemetry-cpp CONFIG REQUIRED) +find_package(protobuf) +find_package(nlohmann_json) +find_package(CURL) + +set(INCLUDE_FILES + inc/eventhubs_stress_scenarios.hpp + scenarios/inc/event_sender.hpp + scenarios/inc/opentelemetry_helpers.hpp + scenarios/inc/batch_stress_tests.hpp + ) + + +set(SOURCE_FILES + eventhubs_stress_test.cpp + scenarios/src/opentelemetry_helpers.cpp + scenarios/src/batch_stress_tests.cpp + ) + + +add_executable( + azure-messaging-eventhubs-stress-test + ${SOURCE_FILES} ${INCLUDE_FILES} +) + +# Include the headers from the project. +target_include_directories( + azure-messaging-eventhubs-stress-test + PUBLIC + $ + $ +) + +target_link_libraries(azure-messaging-eventhubs-stress-test +PRIVATE +azure-messaging-eventhubs +azure-identity +opentelemetry-cpp::ostream_span_exporter +opentelemetry-cpp::in_memory_span_exporter +opentelemetry-cpp::otlp_http_exporter +opentelemetry-cpp::otlp_http_log_record_exporter +opentelemetry-cpp::sdk +) + +create_map_file(azure-messaging-eventhubs-stress-test azure-messaging-eventhubs-stress-test.map) +file(COPY ${CMAKE_CURRENT_BINARY_DIR} + DESTINATION ${CMAKE_CURRENT_SOURCE_DIR}/bin) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/eventhubs_stress_test.cpp b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/eventhubs_stress_test.cpp new file mode 100644 index 000000000..0a37e9a8a --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/eventhubs_stress_test.cpp @@ -0,0 +1,296 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +/** + * @brief Stress framework for EventHubs service client. + * + */ + +#include "argagg.hpp" +#include "batch_stress_tests.hpp" +#include "eventhubs_stress_scenarios.hpp" +#include "opentelemetry/sdk/logs/simple_log_record_processor_factory.h" + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace trace_sdk = opentelemetry::sdk::trace; +namespace trace = opentelemetry::trace; +namespace logs_sdk = opentelemetry::sdk::logs; +namespace logs = opentelemetry::logs; +namespace otlp = opentelemetry::exporter::otlp; +namespace internal_log = opentelemetry::sdk::common::internal_log; + +void InitTracer() +{ + opentelemetry::exporter::otlp::OtlpHttpExporterOptions opts; + + // Create OTLP exporter instance + auto exporter = otlp::OtlpHttpExporterFactory::Create(opts); + auto processor = trace_sdk::SimpleSpanProcessorFactory::Create(std::move(exporter)); + std::shared_ptr provider + = trace_sdk::TracerProviderFactory::Create(std::move(processor)); + // Set the global trace provider + trace::Provider::SetTracerProvider(provider); +} + +// On debug builds, we log to the console. On release builds, we log to OpenTelemetry. +#if defined(_DEBUG) +constexpr const bool LogDefault = true; +#else +constexpr const bool LogDefault = false; +#endif + +bool LogToConsole{LogDefault}; + +void InitLogger() +{ + if (LogToConsole) + { + std::cout << "Using console to export log records." << std::endl; + } + else + { + opentelemetry::exporter::otlp::OtlpHttpLogRecordExporterOptions logger_opts; + + std::cout << "Using " << logger_opts.url << " to export log records." << std::endl; + logger_opts.console_debug = false; + + // Create OTLP exporter instance + auto exporter = otlp::OtlpHttpLogRecordExporterFactory::Create(logger_opts); + auto processor = logs_sdk::SimpleLogRecordProcessorFactory::Create(std::move(exporter)); + std::shared_ptr provider + = logs_sdk::LoggerProviderFactory::Create(std::move(processor)); + + // Set the global log provider. + logs::Provider::SetLoggerProvider(provider); + + // Integrate the azure logging diagnostics with the OpenTelemetry logger provider we just + // created. + Azure::Core::Diagnostics::Logger::SetListener( + [](Azure::Core::Diagnostics::Logger::Level level, std::string const& message) { + logs::Severity logSeverity{logs::Severity::kInvalid}; + switch (level) + { + case Azure::Core::Diagnostics::Logger::Level::Error: + logSeverity = logs::Severity::kError; + break; + case Azure::Core::Diagnostics::Logger::Level::Informational: + logSeverity = logs::Severity::kInfo; + break; + case Azure::Core::Diagnostics::Logger::Level::Verbose: + logSeverity = logs::Severity::kDebug; + break; + case Azure::Core::Diagnostics::Logger::Level::Warning: + logSeverity = logs::Severity::kWarn; + break; + } + logs::Provider::GetLoggerProvider() + ->GetLogger(EventHubsLoggerName) + ->Log(logSeverity, message); + }); + + internal_log::GlobalLogHandler::SetLogLevel(internal_log::LogLevel::Error); + } +} + +void CleanupTracer() +{ + // We call ForceFlush to prevent to cancel running exports, It's optional. + opentelemetry::nostd::shared_ptr provider + = trace::Provider::GetTracerProvider(); + if (provider) + { + static_cast(provider.get())->ForceFlush(); + } + + std::shared_ptr none; + trace::Provider::SetTracerProvider(none); +} + +void Usage( + argagg::parser const& argparser, + const std::vector>& scenarios) +{ + argagg::fmt_ostream fmt(std::cerr); + fmt << "Usage azure-messaging-eventhubs-stress-test [options] " << std::endl << argparser; + + fmt << std::endl; + fmt << "Scenario Options:" << std::endl; + for (const auto& scenario : scenarios) + { + fmt << "Scenario: " << scenario->GetStressScenarioName() << std::endl; + for (const auto& option : scenario->GetScenarioOptions()) + { + fmt << " "; + for (auto& arg : option.Activators) + { + fmt << arg; + if (arg != option.Activators.back()) + { + fmt << ", "; + } + } + fmt << "\n " << option.HelpMessage << '\n'; + } + } +} + +bool CompareString(std::string lhs, std::string rhs) +{ + std::transform(lhs.begin(), lhs.end(), lhs.begin(), tolower); + std::transform(rhs.begin(), rhs.end(), rhs.begin(), tolower); + return lhs == rhs; +} + +int main(int argc, char** argv) +{ + try + { + std::vector> scenarios{ + std::make_shared(), + }; + + // Determine the stress scenario to run. + // Parse the command line in "positional only" mode. The first argument is the scenario name. + std::shared_ptr scenario; + { + argagg::parser argparser; + + argagg::parser_results args; + try + { + args = argparser.parse(argc, argv, true); + } + catch (const std::exception& e) + { + std::cerr << "Exception thrown parsing command line: " << e.what(); + + Usage(argparser, scenarios); + + return -1; + } + + std::string scenarioName; + if (args.pos.size() > 0) + { + scenarioName = args.pos[0]; + } + else + { + std::cerr << "No scenario name provided." << std::endl; + Usage(argparser, scenarios); + return -1; + } + + for (const auto& scenarioToCheck : scenarios) + { + if (CompareString(scenarioToCheck->GetStressScenarioName(), scenarioName)) + { + scenario = scenarioToCheck; + break; + } + } + if (!scenario) + { + std::cerr << "Unknown scenario name " << scenarioName << "." << std::endl; + std::cerr << "Known scenarios are:" << std::endl; + for (const auto& scenarioToCheck : scenarios) + { + std::cerr << " " << scenarioToCheck->GetStressScenarioName(); + } + + Usage(argparser, scenarios); + return -1; + } + } + + std::cout << "Running stress scenario " << scenario->GetStressScenarioName() << std::endl; + + // Now we know the scenario, reparse the command line using the scenario specific options. We + // also support + auto scenarioOptions{scenario->GetScenarioOptions()}; + argagg::parser argparser{{ + {"console", {"--console"}, "Log output traces to console", 0}, + {"help", {"-?", "-h", "--help"}, "This help message.", 0}, + {"verbose", {"-v", "--verbose"}, "Enable verbose logging", 0}, + }}; + + // Add the scenario specific options to the parser. + for (const auto& option : scenarioOptions) + { + argparser.definitions.push_back( + {option.Name, option.Activators, option.HelpMessage, option.ExpectedArgs}); + } + + // Re-parse the command line with this scenario's options. + argagg::parser_results args; + try + { + args = argparser.parse(argc, argv); + } + catch (const std::exception& e) + { + std::cerr << "Exception thrown parsing command line: " << e.what(); + + Usage(argparser, scenarios); + + return -1; + } + + // Log to the console or to OpenTelemetry logs. + LogToConsole = args["console"].as(LogDefault); + + if (args.has_option("help")) + { + Usage(argparser, scenarios); + return 0; + } + + // Initialize OpenTelemetry Tracers and Loggers. + // TBD: Metrics. + InitTracer(); + InitLogger(); + + if (args.has_option("verbose")) + { + Azure::Core::Diagnostics::Logger::SetLevel(Azure::Core::Diagnostics::Logger::Level::Verbose); + } + else + { + Azure::Core::Diagnostics::Logger::SetLevel( + Azure::Core::Diagnostics::Logger::Level::Informational); + } + + std::cout << "===\tINITIALIZE TEST\t===" << std::endl; + scenario->Initialize(args); + + std::cout << "===\tRUN TEST\t===" << std::endl; + scenario->Run(); + + std::cout << "===\tCLEANUP TEST\t===" << std::endl; + scenario->Cleanup(); + } + catch (std::exception const& ex) + { + std::cerr << "Test failed due to exception thrown: " << ex.what() << std::endl; + } + CleanupTracer(); + return 0; +} diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/inc/argagg.hpp b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/inc/argagg.hpp new file mode 100644 index 000000000..d54e3c33d --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/inc/argagg.hpp @@ -0,0 +1,1541 @@ +/* + * @file + * @brief + * Defines a very simple command line argument parser. + * + * @copyright + * Copyright (c) 2018 Viet The Nguyen + * + * @copyright + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * @copyright + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * @copyright + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/** + * @brief + * There are only two hard things in Computer Science: cache invalidation and + * naming things (Phil Karlton). + * + * The names of types have to be succinct and clear. This has turned out to be + * a more difficult thing than I expected. Here you'll find a quick overview of + * the type names you'll find in this namespace (and thus "library"). + * + * When a program is invoked it is passed a number of "command line arguments". + * Each of these "arguments" is a string (C-string to be more precise). An + * "option" is a command line argument that has special meaning. This library + * recognizes a command line argument as a potential option if it starts with a + * dash ('-') or double-dash ('--'). + * + * A "parser" is a set of "definitions" (not a literal std::set but rather a + * std::vector). A parser is represented by the argagg::parser struct. + * + * A "definition" is a structure with four components that define what + * "options" are recognized. The four components are the name of the option, + * the strings that represent the option, the option's help text, and how many + * arguments the option should expect. "Flags" are the individual strings that + * represent the option ("-v" and "--verbose" are flags for the "verbose" + * option). A definition is represented by the argagg::definition struct. + * + * Note at this point that the word "option" can be used interchangeably to + * mean the notion of an option and the actual instance of an option given a + * set of command line arguments. To be unambiguous we use a "definition" to + * represent the notion of an option and an "option result" to represent an + * actual option parsed from a set of command line arguments. An "option + * result" is represented by the argagg::option_result struct. + * + * There's one more wrinkle to this: an option can show up multiple times in a + * given set of command line arguments. For example, "-n 1 -n 2 -n 3". This + * will parse into three distinct argagg::option_result instances, but all of + * them correspond to the same argagg::definition. We aggregate these into the + * argagg::option_results struct which represents "all parser results for a + * given option definition". This argagg::option_results is basically a + * std::vector of argagg::option_result. + * + * Options aren't the only thing parsed though. Positional arguments are also + * parsed. Thus a parser produces a result that contains both option results + * and positional arguments. The parser results are represented by the + * argagg::parser_results struct. All option results are stored in a mapping + * from option name to the argagg::option_results. All positional arguments are + * simply stored in a vector of C-strings. + */ +namespace argagg { + +/** + * @brief + * This exception is thrown when a long option is parsed and is given an + * argument using the "=" syntax but the option doesn't expect an argument. + */ +struct unexpected_argument_error : public std::runtime_error +{ + using std::runtime_error::runtime_error; +}; + +/** + * @brief + * This exception is thrown when an option is parsed unexpectedly such as when + * an argument was expected for a previous option or if an option was found + * that has not been defined. + */ +struct unexpected_option_error : public std::runtime_error +{ + using std::runtime_error::runtime_error; +}; + +/** + * @brief + * This exception is thrown when an option requires an argument but is not + * provided one. This can happen if another flag was found after the option or + * if we simply reach the end of the command line arguments. + */ +struct option_lacks_argument_error : public std::runtime_error +{ + using std::runtime_error::runtime_error; +}; + +/** + * @brief + * This exception is thrown when an option's flag is invalid. This can be the + * case if the flag is not prefixed by one or two hyphens or contains non + * alpha-numeric characters after the hyphens. See is_valid_flag_definition() + * for more details. + */ +struct invalid_flag : public std::runtime_error +{ + using std::runtime_error::runtime_error; +}; + +/** + * @brief + * This exception is thrown when an unknown option is requested by name from an + * argagg::parser_results through the indexing operator ([]). + */ +struct unknown_option : public std::runtime_error +{ + using std::runtime_error::runtime_error; +}; + +/** + * @brief + * The set of template instantiations that convert C-strings to other types for + * the option_result::as(), option_results::as(), parser_results::as(), and + * parser_results::all_as() methods are placed in this namespace. + */ +namespace convert { + + /** + * @brief + * Explicit instantiations of this function are used to convert arguments to + * types. + */ + template T arg(const char* arg); + + /** + * @brief + * For simple types the main extension point for adding argument conversions + * is argagg::convert::arg(). However, for complex types such as templated + * types partial specialization of a helper struct is required. This struct + * provides that extension point. The default, generic implementation of + * argagg::convert::arg() calls converter::convert(). + * + * @ref argagg::csv + */ + template struct converter + { + static T convert(const char* arg); + }; + + /** + * @brief + * A utility function for parsing an argument as a delimited list. To use, + * initialize a const char* pointer to the start of argument string. Then + * call parse_next_component(), providing that pointer, a mutable reference + * to where the parsed argument will go, and optionally the delimiting + * character. The argument string will be read up to the next delimiting + * character and then converted using + * argagg::convert::arg(). The pointer is then + * incremented accordingly. If the delimiting character is no longer found + * then false is returned meaning that parsing the list can be considered + * finished. + * + * @code + #include + + struct position3 { + double x; + double y; + double z; + }; + + namespace argagg { + namespace convert { + template <> + position3 arg(const char* s) + { + position3 result {0.0, 0.0, 0.0}; + if (!parse_next_component(s, result.x)) { + // could potentially throw an error if you require that at least two + // components exist in the list + return result; + } + if (!parse_next_component(s, result.y)) { + return result; + } + if (!parse_next_component(s, result.z)) { + return result; + } + return result; + } + } // namespace convert + } // namespace argagg + + int main(int argc, char** argv) + { + argagg::parser argparser {{ + { "origin", {"-o", "--origin"}, + "origin as position3 specified as a comma separated list of " + "components (e.g. '1,2,3')", 1}, + }}; + argagg::parser_results args = argparser.parse(argc, argv); + auto my_position = args["origin"].as(); + // ... + return 0; + } + @endcode + */ + template + bool parse_next_component(const char*& s, T& out_arg, const char delim = ','); + +} // namespace convert + +/** + * @brief + * Represents a single option parse result. + * + * You can check if this has an argument by using the implicit boolean + * conversion. + */ +struct option_result +{ + + /** + * @brief + * Argument parsed for this single option. If no argument was parsed this + * will be set to nullptr. + */ + const char* arg; + + /** + * @brief + * Converts the argument parsed for this single option instance into the + * given type using the type matched conversion function + * argagg::convert::arg(). If there was not an argument parsed for this + * single option instance then a argagg::option_lacks_argument_error + * exception is thrown. The specific conversion function may throw other + * exceptions. + */ + template T as() const; + + /** + * @brief + * Converts the argument parsed for this single option instance into the + * given type using the type matched conversion function + * argagg::convert::arg(). If there was not an argument parsed for this + * single option instance then the provided default value is returned + * instead. If the conversion function throws an exception then it is ignored + * and the default value is returned. + */ + template T as(const T& t) const; + + /** + * @brief + * Since we have the argagg::option_result::as() API we might as well alias + * it as an implicit conversion operator. This performs implicit conversion + * using the argagg::option_result::as() method. + * + * @note + * An implicit boolean conversion specialization exists which returns false + * if there is no argument for this single option instance and true + * otherwise. This specialization DOES NOT convert the argument to a bool. If + * you need to convert the argument to a bool then use the as() API. + */ + template operator T() const; + + /** + * @brief + * Explicitly define a unary not operator that wraps the implicit boolean + * conversion specialization in case the compiler can't do it automatically. + */ + bool operator!() const; +}; + +/** + * @brief + * Represents multiple option parse results for a single option. If treated as + * a single parse result it defaults to the last parse result. Note that an + * instance of this struct is always created even if no option results are + * parsed for a given definition. In that case it will simply be empty. + * + * To check if the associated option showed up at all simply use the implicit + * boolean conversion or check if count() is greater than zero. + */ +struct option_results +{ + + /** + * @brief + * All option parse results for this option. + */ + std::vector all; + + /** + * @brief + * Gets the number of times the option shows up. + */ + size_t count() const; + + /** + * @brief + * Gets a single option parse result by index. + */ + option_result& operator[](size_t index); + + /** + * @brief + * Gets a single option result by index. + */ + const option_result& operator[](size_t index) const; + + /** + * @brief + * Converts the argument parsed for the LAST option parse result for the + * parent definition to the provided type. For example, if this was for "-f 1 + * -f 2 -f 3" then calling this method for an integer type will return 3. If + * there are no option parse results then a std::out_of_range exception is + * thrown. Any exceptions thrown by option_result::as() are not + * handled. + */ + template T as() const; + + /** + * @brief + * Converts the argument parsed for the LAST option parse result for the + * parent definition to the provided type. For example, if this was for "-f 1 + * -f 2 -f 3" then calling this method for an integer type will return 3. If + * there are no option parse results then the provided default value is + * returned instead. + */ + template T as(const T& t) const; + + /** + * @brief + * Since we have the option_results::as() API we might as well alias + * it as an implicit conversion operator. This performs implicit conversion + * using the option_results::as() method. + * + * @note + * An implicit boolean conversion specialization exists which returns false + * if there is no argument for this single option instance and true + * otherwise. This specialization DOES NOT convert the argument to a bool. If + * you need to convert the argument to a bool then use the as() API. + */ + template operator T() const; + + /** + * @brief + * Explicitly define a unary not operator that wraps the implicit boolean + * conversion specialization in case the compiler can't do it automatically. + */ + bool operator!() const; +}; + +/** + * @brief + * Represents all results of the parser including options and positional + * arguments. + */ +struct parser_results +{ + + /** + * @brief + * Returns the name of the program from the original arguments list. This is + * always the first argument. + */ + const char* program; + + /** + * @brief + * Maps from definition name to the structure which contains the parser + * results for that definition. + */ + std::unordered_map options; + + /** + * @brief + * Vector of positional arguments. + */ + std::vector pos; + + /** + * @brief + * Used to check if an option was specified at all. + */ + bool has_option(const std::string& name) const; + + /** + * @brief + * Get the parser results for the given definition. If the definition never + * showed up then the exception from the unordered_map access will bubble + * through so check if the flag exists in the first place with has_option(). + */ + option_results& operator[](const std::string& name); + + /** + * @brief + * Get the parser results for the given definition. If the definition never + * showed up then the exception from the unordered_map access will bubble + * through so check if the flag exists in the first place with has_option(). + */ + const option_results& operator[](const std::string& name) const; + + /** + * @brief + * Gets the number of positional arguments. + */ + size_t count() const; + + /** + * @brief + * Gets a positional argument by index. + */ + const char* operator[](size_t index) const; + + /** + * @brief + * Gets a positional argument converted to the given type. + */ + template T as(size_t i = 0) const; + + /** + * @brief + * Gets all positional arguments converted to the given type. + */ + template std::vector all_as() const; +}; + +/** + * @brief + * An option definition which essentially represents what an option is. + */ +struct definition +{ + + /** + * @brief + * Name of the option. Option parser results are keyed by this name. + */ + const std::string name; + + /** + * @brief + * List of strings to match that correspond to this option. Should be fully + * specified with hyphens (e.g. "-v" or "--verbose"). + */ + std::vector flags; + + /** + * @brief + * Help string for this option. + */ + std::string help; + + /** + * @brief + * Number of arguments this option requires. Must be 0 or 1. All other values + * have undefined behavior. Okay, the code actually works with positive + * values in general, but it's unorthodox command line behavior. + */ + unsigned int num_args; + + /** + * @brief + * Returns true if this option does not want any arguments. + */ + bool wants_no_arguments() const; + + /** + * @brief + * Returns true if this option requires arguments. + */ + bool requires_arguments() const; +}; + +/** + * @brief + * Checks whether or not a command line argument should be processed as an + * option flag. This is very similar to is_valid_flag_definition() but must + * allow for short flag groups (e.g. "-abc") and equal-assigned long flag + * arguments (e.g. "--output=foo.txt"). + */ +bool cmd_line_arg_is_option_flag(const char* s); + +/** + * @brief + * Checks whether a flag in an option definition is valid. I suggest reading + * through the function source to understand what dictates a valid. + */ +bool is_valid_flag_definition(const char* s); + +/** + * @brief + * Tests whether or not a valid flag is short. Assumes the provided cstring is + * already a valid flag. + */ +bool flag_is_short(const char* s); + +/** + * @brief + * Contains two maps which aid in option parsing. The first map, @ref + * short_map, maps from a short flag (just a character) to a pointer to the + * original @ref definition that the flag represents. The second map, @ref + * long_map, maps from a long flag (an std::string) to a pointer to the + * original @ref definition that the flag represents. + * + * This object is usually a temporary that only exists during the parsing + * operation. It is typically constructed using @ref validate_definitions(). + */ +struct parser_map +{ + + /** + * @brief + * Maps from a short flag (just a character) to a pointer to the original + * @ref definition that the flag represents. + */ + std::array short_map; + + /** + * @brief + * Maps from a long flag (an std::string) to a pointer to the original @ref + * definition that the flag represents. + */ + std::unordered_map long_map; + + /** + * @brief + * Returns true if the provided short flag exists in the map object. + */ + bool known_short_flag(const char flag) const; + + /** + * @brief + * If the short flag exists in the map object then it is returned by this + * method. If it doesn't then nullptr will be returned. + */ + const definition* get_definition_for_short_flag(const char flag) const; + + /** + * @brief + * Returns true if the provided long flag exists in the map object. + */ + bool known_long_flag(const std::string& flag) const; + + /** + * @brief + * If the long flag exists in the map object then it is returned by this + * method. If it doesn't then nullptr will be returned. + */ + const definition* get_definition_for_long_flag(const std::string& flag) const; +}; + +/** + * @brief + * Validates a collection (specifically an std::vector) of @ref definition + * objects by checking if the contained flags are valid. If the set of @ref + * definition objects is not valid then an exception is thrown. Upon successful + * validation a @ref parser_map object is returned. + */ +parser_map validate_definitions(const std::vector& definitions); + +/** + * @brief + * A list of option definitions used to inform how to parse arguments. + */ +struct parser +{ + + /** + * @brief + * Vector of the option definitions which inform this parser how to parse + * the command line arguments. + */ + std::vector definitions; + + /** + * @brief + * Parses the provided command line arguments and returns the results as + * @ref parser_results. + * + * @note + * This method is not thread-safe and assumes that no modifications are made + * to the definitions member field during the extent of this method call. + */ + parser_results parse(int argc, const char** argv, bool posOnly = true) const; + + /** + * @brief + * Through strict interpretation of pointer casting rules, despite this being + * a safe operation, C++ doesn't allow implicit casts from char** to + * const char** so here's an overload that performs a const_cast, + * which is typically frowned upon but is safe here. + */ + parser_results parse(int argc, char** argv, bool posOnly) const; +}; + +/** + * @brief + * A convenience output stream that will accumulate what is streamed to it and + * then, on destruction, format the accumulated string (via the + * argagg::fmt_string() function) to the provided std::ostream. + * + * Example use: + * + * @code + * { + * argagg::fmt_ostream f(std::cerr); + * f << "Usage: " << really_long_string << '\n'; + * } // on destruction here the formatted string will be streamed to std::cerr + * @endcode + */ +struct fmt_ostream : public std::ostringstream +{ + + /** + * @brief + * Reference to the final output stream that the formatted string will be + * streamed to. + */ + std::ostream& output; + + /** + * @brief + * Construct to output to the provided output stream when this object is + * destroyed. + */ + fmt_ostream(std::ostream& output); + + /** + * @brief + * Special destructor that will format the accumulated string using fmt (via + * the argagg::fmt_string() function) and stream it to the std::ostream + * stored. + */ + ~fmt_ostream(); +}; + +/** + * @brief + * Processes the provided string using the fmt utility and returns the + * resulting output as a string. Not the most efficient (in time or space) but + * gets the job done. + */ +std::string fmt_string(const std::string& s); + +} // namespace argagg + +/** + * @brief + * Writes the option help to the given stream. + */ +std::ostream& operator<<(std::ostream& os, const argagg::parser& x); + +// ---- end of declarations, header-only implementations follow ---- + +namespace argagg { + +template T option_result::as() const +{ + if (this->arg) + { + return convert::arg(this->arg); + } + else + { + throw option_lacks_argument_error("option has no argument"); + } +} + +template T option_result::as(const T& t) const +{ + if (this->arg) + { + try + { + return convert::arg(this->arg); + } + catch (...) + { + return t; + } + } + else + { + // I actually think this will never happen. To call this method you have + // to access a specific option_result for an option. If there's a + // specific option_result then the option was found. If the option + // requires an argument then it will definitely have an argument + // otherwise the parser would have complained. + return t; + } +} + +template option_result::operator T() const { return this->as(); } + +template <> inline option_result::operator bool() const { return this->arg != nullptr; } + +inline bool option_result::operator!() const { return !static_cast(*this); } + +inline size_t option_results::count() const { return this->all.size(); } + +inline option_result& option_results::operator[](size_t index) { return this->all[index]; } + +inline const option_result& option_results::operator[](size_t index) const +{ + return this->all[index]; +} + +template T option_results::as() const +{ + if (this->all.size() == 0) + { + throw std::out_of_range("no option arguments to convert"); + } + return this->all.back().as(); +} + +template T option_results::as(const T& t) const +{ + if (this->all.size() == 0) + { + return t; + } + return this->all.back().as(t); +} + +template option_results::operator T() const { return this->as(); } + +template <> inline option_results::operator bool() const { return this->all.size() > 0; } + +inline bool option_results::operator!() const { return !static_cast(*this); } + +inline bool parser_results::has_option(const std::string& name) const +{ + const auto it = this->options.find(name); + return (it != this->options.end()) && it->second.all.size() > 0; +} + +inline option_results& parser_results::operator[](const std::string& name) +try +{ + return this->options.at(name); +} +catch (std::out_of_range const& e) +{ + std::ostringstream msg; + msg << "no option named \"" << name << "\" in parser_results." << e.what(); + throw unknown_option(msg.str()); +} + +inline const option_results& parser_results::operator[](const std::string& name) const +try +{ + return this->options.at(name); +} +catch (const std::out_of_range& e) +{ + std::ostringstream msg; + msg << "no option named \"" << name << "\" in parser_results." << e.what(); + throw unknown_option(msg.str()); +} + +inline size_t parser_results::count() const { return this->pos.size(); } + +inline const char* parser_results::operator[](size_t index) const { return this->pos[index]; } + +template T parser_results::as(size_t i) const { return convert::arg(this->pos[i]); } + +template std::vector parser_results::all_as() const +{ + std::vector v(this->pos.size()); + std::transform(this->pos.begin(), this->pos.end(), v.begin(), [](const char* arg) { + return convert::arg(arg); + }); + return v; +} + +inline bool definition::wants_no_arguments() const { return this->num_args == 0; } + +inline bool definition::requires_arguments() const { return this->num_args > 0; } + +inline bool cmd_line_arg_is_option_flag(const char* s) +{ + auto len = std::strlen(s); + + // The shortest possible flag has two characters: a hyphen and an + // alpha-numeric character or ? + if (len < 2) + { + return false; + } + + // All flags must start with a hyphen. + if (s[0] != '-') + { + return false; + } + + // Shift the name forward by a character to account for the initial hyphen. + // This means if s was originally "-v" then name will be "v". + const char* name = s + 1; + + // Check if we're dealing with a long flag. + bool is_long = false; + if (s[1] == '-') + { + is_long = true; + + // Just -- is not a valid flag. + if (len == 2) + { + return false; + } + + // Shift the name forward to account for the extra hyphen. This means if s + // was originally "--output" then name will be "output". + name = s + 2; + } + + // The first character of the flag name must be alpha-numeric. This is to + // prevent things like "---a" from being valid flags. + len = std::strlen(name); + if (!std::isalnum(static_cast(name[0])) && name[0] != '?') + { + return false; + } + + // At this point in is_valid_flag_definition() we would check if the short + // flag has only one character. At command line specification you can group + // short flags together or even add an argument to a short flag without a + // space delimiter. Thus we don't check if this has only one character + // because it might not. + + // If this is a long flag then we expect all characters *up to* an equal sign + // to be alpha-numeric or a hyphen. After the equal sign you are specify the + // argument to a long flag which can be basically anything. + if (is_long) + { + bool encountered_equal = false; + return std::all_of(name, name + len, [&](const char& c) { + if (encountered_equal) + { + return true; + } + else + { + if (c == '=') + { + encountered_equal = true; + return true; + } + return std::isalnum(static_cast(c)) || c == '-'; + } + }); + } + + // At this point we are not dealing with a long flag. We already checked that + // the first character is alpha-numeric so we've got the case of a single + // short flag covered. This might be a short flag group though and we might + // be tempted to check that each character of the short flag group is + // alpha-numeric. However, you can specify the argument for a short flag + // without a space delimiter (e.g. "-I/usr/local/include") so you can't tell + // if the rest of a short flag group is part of the argument or not unless + // you know what is a defined flag or not. We leave that kind of processing + // to the parser. + return true; +} + +inline bool is_valid_flag_definition(const char* s) +{ + auto len = std::strlen(s); + + // The shortest possible flag has two characters: a hyphen and an + // alpha-numeric character. + if (len < 2) + { + return false; + } + + // All flags must start with a hyphen. + if (s[0] != '-') + { + return false; + } + + // Shift the name forward by a character to account for the initial hyphen. + // This means if s was originally "-v" then name will be "v". + const char* name = s + 1; + + // Check if we're dealing with a long flag. + bool is_long = false; + if (s[1] == '-') + { + is_long = true; + + // Just -- is not a valid flag. + if (len == 2) + { + return false; + } + + // Shift the name forward to account for the extra hyphen. This means if s + // was originally "--output" then name will be "output". + name = s + 2; + } + + // The first character of the flag name must be alpha-numeric. This is to + // prevent things like "---a" from being valid flags. + // We do want to allow '?' as the first character of a flag name for help. + len = std::strlen(name); + if (!std::isalnum(static_cast(name[0])) && name[0] != '?') + { + return false; + } + + // If this is a short flag then it must only have one character. + if (!is_long && len > 1) + { + return false; + } + + // The rest of the characters must be alpha-numeric, but long flags are + // allowed to have hyphens too. + return std::all_of(name + 1, name + len, [&](const char& c) { + return std::isalnum(static_cast(c)) || (c == '-' && is_long); + }); +} + +inline bool flag_is_short(const char* s) +{ + return s[0] == '-' && (std::isalnum(static_cast(s[1])) || s[1] == '?'); +} + +inline bool parser_map::known_short_flag(const char flag) const +{ + return this->short_map[flag] != nullptr; +} + +inline const definition* parser_map::get_definition_for_short_flag(const char flag) const +{ + return this->short_map[flag]; +} + +inline bool parser_map::known_long_flag(const std::string& flag) const +{ + const auto existing_long_flag = this->long_map.find(flag); + return existing_long_flag != long_map.end(); +} + +inline const definition* parser_map::get_definition_for_long_flag(const std::string& flag) const +{ + const auto existing_long_flag = this->long_map.find(flag); + if (existing_long_flag == long_map.end()) + { + return nullptr; + } + return existing_long_flag->second; +} + +inline parser_map validate_definitions(const std::vector& definitions) +{ + std::unordered_map long_map; + parser_map map{{{nullptr}}, std::move(long_map)}; + + for (auto& defn : definitions) + { + + if (defn.flags.size() == 0) + { + std::ostringstream msg; + msg << "option \"" << defn.name << "\" has no flag definitions"; + throw invalid_flag(msg.str()); + } + + for (auto& flag : defn.flags) + { + + if (!is_valid_flag_definition(flag.data())) + { + std::ostringstream msg; + msg << "flag \"" << flag << "\" specified for option \"" << defn.name << "\" is invalid"; + throw invalid_flag(msg.str()); + } + + if (flag_is_short(flag.data())) + { + const int short_flag_letter = flag[1]; + const auto existing_short_flag = map.short_map[short_flag_letter]; + bool short_flag_already_exists = (existing_short_flag != nullptr); + if (short_flag_already_exists) + { + std::ostringstream msg; + msg << "duplicate short flag \"" << flag << "\" found, specified by both option \"" + << defn.name << "\" and option \"" << existing_short_flag->name; + throw invalid_flag(msg.str()); + } + map.short_map[short_flag_letter] = &defn; + continue; + } + + // If we're here then this is a valid, long-style flag. + if (map.known_long_flag(flag)) + { + const auto existing_long_flag = map.get_definition_for_long_flag(flag); + std::ostringstream msg; + msg << "duplicate long flag \"" << flag << "\" found, specified by both option \"" + << defn.name << "\" and option \"" << existing_long_flag->name; + throw invalid_flag(msg.str()); + } + map.long_map.insert(std::make_pair(flag, &defn)); + } + } + + return map; +} + +inline parser_results parser::parse(int argc, const char** argv, bool posOnly) const +{ + // Inspect each definition to see if its valid. You may wonder "why don't + // you do this validation on construction?" I had thought about it but + // realized that since I've made the parser an aggregate type (granted it + // just "aggregates" a single vector) I would need to track any changes to + // the definitions vector and re-run the validity check in order to + // maintain this expected "validity invariant" on the object. That would + // then require hiding the definitions vector as a private entry and then + // turning the parser into a thin interface (by re-exposing setters and + // getters) to the vector methods just so that I can catch when the + // definition has been modified. It seems much simpler to just enforce the + // validity when you actually want to parse because it's at the moment of + // parsing that you know the definitions are complete. + parser_map map = validate_definitions(this->definitions); + + // Initialize the parser results that we'll be returning. Store the program + // name (assumed to be the first command line argument) and initialize + // everything else as empty. + std::unordered_map options{}; + std::vector pos; + parser_results results{argv[0], std::move(options), std::move(pos)}; + + // Add an empty option result for each definition. + for (const auto& defn : this->definitions) + { + option_results opt_results{{}}; + results.options.insert(std::make_pair(defn.name, opt_results)); + } + + // Don't start off ignoring flags. We only ignore flags after a -- shows up + // in the command line arguments. + bool ignore_flags = false; + + // Keep track of any options that are expecting arguments. + const char* last_flag_expecting_args = nullptr; + option_result* last_option_expecting_args = nullptr; + unsigned int num_option_args_to_consume = 0; + + // Get pointers to pointers so we can treat the raw pointer array as an + // iterator for standard library algorithms. This isn't used yet but can be + // used to template this function to work on iterators over strings or + // C-strings. + const char** arg_i = argv + 1; + const char** arg_end = argv + argc; + + while (arg_i != arg_end) + { + auto arg_i_cstr = *arg_i; + auto arg_i_len = std::strlen(arg_i_cstr); + + // Some behavior to note: if the previous option is expecting an argument + // then the next entry will be treated as a positional argument even if + // it looks like a flag. + bool treat_as_positional_argument + = (ignore_flags || num_option_args_to_consume > 0 + || !cmd_line_arg_is_option_flag(arg_i_cstr)); + if (treat_as_positional_argument) + { + + // If last option is expecting some specific positive number of + // arguments then give this argument to that option, *regardless of + // whether or not the argument looks like a flag or is the special "--" + // argument*. + if (num_option_args_to_consume > 0) + { + last_option_expecting_args->arg = arg_i_cstr; + --num_option_args_to_consume; + ++arg_i; + continue; + } + + // Now we check if this is just "--" which is a special argument that + // causes all following arguments to be treated as non-options and is + // itself discarded. + if (std::strncmp(arg_i_cstr, "--", 2) == 0 && arg_i_len == 2) + { + ignore_flags = true; + ++arg_i; + continue; + } + + // If there are no expectations for option arguments then simply use + // this argument as a positional argument. + results.pos.push_back(arg_i_cstr); + ++arg_i; + continue; + } + + // Reset the "expecting argument" state. + last_flag_expecting_args = nullptr; + last_option_expecting_args = nullptr; + num_option_args_to_consume = 0; + if (!posOnly) + { + // If we're at this point then we're definitely dealing with something + // that is flag-like and has hyphen as the first character and has a + // length of at least two characters. How we handle this potential flag + // depends on whether or not it is a long-option so we check that first. + bool is_long_flag = (arg_i_cstr[1] == '-'); + + if (is_long_flag) + { + + // Long flags have a complication: their arguments can be specified + // using an '=' character right inside the argument. That means an + // argument like "--output=foobar.txt" is actually an option with flag + // "--output" and argument "foobar.txt". So we look for the first + // instance of the '=' character and keep it in long_flag_arg. If + // long_flag_arg is nullptr then we didn't find '='. We need the + // flag_len to construct long_flag_str below. + auto long_flag_arg = std::strchr(arg_i_cstr, '='); + size_t flag_len = arg_i_len; + if (long_flag_arg != nullptr) + { + flag_len = long_flag_arg - arg_i_cstr; + } + std::string long_flag_str(arg_i_cstr, flag_len); + + if (!map.known_long_flag(long_flag_str)) + { + std::ostringstream msg; + msg << "found unexpected flag: " << long_flag_str; + throw unexpected_option_error(msg.str()); + } + + const auto defn = map.get_definition_for_long_flag(long_flag_str); + + if (long_flag_arg != nullptr && defn->num_args == 0) + { + std::ostringstream msg; + msg << "found argument for option not expecting an argument: " << arg_i_cstr; + throw unexpected_argument_error(msg.str()); + } + + // We've got a legitimate, known long flag option so we add an option + // result. This option result initially has an arg of nullptr, but that + // might change in the following block. + auto& opt_results = results.options[defn->name]; + option_result opt_result{nullptr}; + opt_results.all.push_back(std::move(opt_result)); + + if (defn->requires_arguments()) + { + bool there_is_an_equal_delimited_arg = (long_flag_arg != nullptr); + if (there_is_an_equal_delimited_arg) + { + // long_flag_arg would be "=foo" in the "--output=foo" case so we + // increment by 1 to get rid of the equal sign. + opt_results.all.back().arg = long_flag_arg + 1; + } + else + { + last_flag_expecting_args = arg_i_cstr; + last_option_expecting_args = &(opt_results.all.back()); + num_option_args_to_consume = defn->num_args; + } + } + + ++arg_i; + continue; + } + + // If we've made it here then we're looking at either a short flag or a + // group of short flags. Short flags can be grouped together so long as + // they don't require any arguments unless the option that does is the + // last in the group ("-o x -v" is okay, "-vo x" is okay, "-ov x" is + // not). So starting after the dash we're going to process each character + // as if it were a separate flag. Note "sf_idx" stands for "short flag + // index". + for (size_t sf_idx = 1; sf_idx < arg_i_len; ++sf_idx) + { + const auto short_flag = arg_i_cstr[sf_idx]; + + if (!std::isalnum(static_cast(short_flag)) && short_flag != '?') + { + std::ostringstream msg; + msg << "found non-alphanumeric character '" << arg_i_cstr[sf_idx] << "' in flag group '" + << arg_i_cstr << "'"; + throw std::domain_error(msg.str()); + } + + if (!map.known_short_flag(short_flag)) + { + std::ostringstream msg; + msg << "found unexpected flag '" << arg_i_cstr[sf_idx] << "' in flag group '" + << arg_i_cstr << "'"; + throw unexpected_option_error(msg.str()); + } + + auto defn = map.get_definition_for_short_flag(short_flag); + auto& opt_results = results.options[defn->name]; + + // Create an option result with an empty argument (for now) and add it + // to this option's results. + option_result opt_result{nullptr}; + opt_results.all.push_back(std::move(opt_result)); + + if (defn->requires_arguments()) + { + + // If this short flag's option requires an argument and we're the + // last flag in the short flag group then just put the parser into + // "expecting argument for last option" state and move onto the next + // command line argument. + bool is_last_short_flag_in_group = (sf_idx == arg_i_len - 1); + if (is_last_short_flag_in_group) + { + last_flag_expecting_args = arg_i_cstr; + last_option_expecting_args = &(opt_results.all.back()); + num_option_args_to_consume = defn->num_args; + break; + } + + // If this short flag's option requires an argument and we're NOT the + // last flag in the short flag group then we automatically consume + // the rest of the short flag group as the argument for this flag. + // This is how we get the POSIX behavior of being able to specify a + // flag's arguments without a white space delimiter (e.g. + // "-I/usr/local/include"). + opt_results.all.back().arg = arg_i_cstr + sf_idx + 1; + break; + } + } + } + ++arg_i; + continue; + } + + // If we're done with all of the arguments but are still expecting + // arguments for a previous option then we haven't satisfied that option. + // This is an error. + if (num_option_args_to_consume > 0) + { + std::ostringstream msg; + msg << "last option \"" << last_flag_expecting_args + << "\" expects an argument but the parser ran out of command line " + << "arguments to parse"; + throw option_lacks_argument_error(msg.str()); + } + + return results; +} + +inline parser_results parser::parse(int argc, char** argv, bool posOnly = false) const +{ + return parse(argc, const_cast(argv), posOnly); +} + +namespace convert { + + /** + * @brief + * Templated function for conversion to T using the @ref std::strtol() + * function. This is used for anything long length or shorter (long, int, + * short, char). + */ + template inline T long_(const char* arg) + { + char* endptr = nullptr; + errno = 0; + T ret = static_cast(std::strtol(arg, &endptr, 0)); + if (endptr == arg) + { + std::ostringstream msg; + msg << "unable to convert argument to integer: \"" << arg << "\""; + throw std::invalid_argument(msg.str()); + } + if (errno == ERANGE) + { + throw std::out_of_range("argument numeric value out of range"); + } + return ret; + } + + /** + * @brief + * Templated function for conversion to T using the @ref std::strtoll() + * function. This is used for anything long long length or shorter (long + * long). + */ + template inline T long_long_(const char* arg) + { + char* endptr = nullptr; + errno = 0; + T ret = static_cast(std::strtoll(arg, &endptr, 0)); + if (endptr == arg) + { + std::ostringstream msg; + msg << "unable to convert argument to integer: \"" << arg << "\""; + throw std::invalid_argument(msg.str()); + } + if (errno == ERANGE) + { + throw std::out_of_range("argument numeric value out of range"); + } + return ret; + } + +#define DEFINE_CONVERSION_FROM_LONG_(TYPE) \ + template <> inline TYPE arg(const char* arg) { return long_(arg); } + + DEFINE_CONVERSION_FROM_LONG_(char) + DEFINE_CONVERSION_FROM_LONG_(unsigned char) + DEFINE_CONVERSION_FROM_LONG_(signed char) + DEFINE_CONVERSION_FROM_LONG_(short) + DEFINE_CONVERSION_FROM_LONG_(unsigned short) + DEFINE_CONVERSION_FROM_LONG_(int) + DEFINE_CONVERSION_FROM_LONG_(unsigned int) + DEFINE_CONVERSION_FROM_LONG_(long) + DEFINE_CONVERSION_FROM_LONG_(unsigned long) + +#undef DEFINE_CONVERSION_FROM_LONG_ + +#define DEFINE_CONVERSION_FROM_LONG_LONG_(TYPE) \ + template <> inline TYPE arg(const char* arg) { return long_long_(arg); } + + DEFINE_CONVERSION_FROM_LONG_LONG_(long long) + DEFINE_CONVERSION_FROM_LONG_LONG_(unsigned long long) + +#undef DEFINE_CONVERSION_FROM_LONG_LONG_ + + template T arg(const char* arg) { return converter::convert(arg); } + + template <> inline bool arg(const char* arg) { return argagg::convert::arg(arg) != 0; } + + template <> inline float arg(const char* arg) + { + char* endptr = nullptr; + errno = 0; + float ret = std::strtof(arg, &endptr); + if (endptr == arg) + { + std::ostringstream msg; + msg << "unable to convert argument to integer: \"" << arg << "\""; + throw std::invalid_argument(msg.str()); + } + if (errno == ERANGE) + { + throw std::out_of_range("argument numeric value out of range"); + } + return ret; + } + + template <> inline double arg(const char* arg) + { + char* endptr = nullptr; + errno = 0; + double ret = std::strtod(arg, &endptr); + if (endptr == arg) + { + std::ostringstream msg; + msg << "unable to convert argument to integer: \"" << arg << "\""; + throw std::invalid_argument(msg.str()); + } + if (errno == ERANGE) + { + throw std::out_of_range("argument numeric value out of range"); + } + return ret; + } + + template <> inline const char* arg(const char* arg) { return arg; } + + template <> inline std::string arg(const char* arg) { return std::string(arg); } + + template bool parse_next_component(const char*& s, T& out_arg, const char delim) + { + const char* begin = s; + s = std::strchr(s, delim); + if (s == nullptr) + { + std::string arg_str(begin); + out_arg = argagg::convert::arg(arg_str.c_str()); + return false; + } + else + { + std::string arg_str(begin, s - begin); + out_arg = argagg::convert::arg(arg_str.c_str()); + s += 1; + return true; + } + } + +} // namespace convert + +inline fmt_ostream::fmt_ostream(std::ostream& output) : std::ostringstream(), output(output) {} + +inline fmt_ostream::~fmt_ostream() { output << fmt_string(this->str()); } + +inline std::string lstrip(const std::string& text) +{ + std::string result = text; + + result.erase(result.begin(), std::find_if(result.begin(), result.end(), [](int ch) { + return !std::isspace(ch); + })); + + return result; +} + +inline std::string rstrip(const std::string& text) +{ + std::string result = text; + + result.erase( + std::find_if(result.rbegin(), result.rend(), [](int ch) { return !std::isspace(ch); }).base(), + result.end()); + + return result; +} + +inline std::string construct_line(const std::string& indent, const std::string& contents) +{ + return indent + rstrip(contents) + "\n"; +} + +/** + * @brief + * Return a wrapped version of a single line of text. + */ +inline std::string wrap_line(const std::string& single_line, const size_t wrap_width) +{ + auto indentation_spaces = single_line.find_first_not_of(" "); + if (indentation_spaces == std::string::npos) + { + indentation_spaces = 0; + } + + const auto line = lstrip(single_line); + const auto indent = std::string(indentation_spaces, ' '); + + std::string result; + + size_t position = 0; + size_t line_start = 0; + while (true) + { + const auto new_position = line.find_first_of(" ", position); + if (new_position == std::string::npos) + { + break; + } + + if (new_position + indentation_spaces > line_start + wrap_width) + { + result += construct_line(indent, line.substr(line_start, position - line_start - 1)); + + line_start = position; + } + + position = new_position + 1; + } + + return result + construct_line(indent, line.substr(line_start)); +} + +inline std::string fmt_string(const std::string& s) +{ + std::stringstream ss(s); + std::string line; + + std::string result; + + // Use default width of `fmt`. + const auto column_width = 75; + + while (std::getline(ss, line, '\n')) + { + result += wrap_line(line, column_width); + } + + return result; +} + +} // namespace argagg + +inline std::ostream& operator<<(std::ostream& os, const argagg::parser& x) +{ + for (auto& definition : x.definitions) + { + os << " "; + for (auto& flag : definition.flags) + { + os << flag; + if (flag != definition.flags.back()) + { + os << ", "; + } + } + os << "\n " << definition.help << '\n'; + } + return os; +} diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/inc/eventhubs_stress_scenarios.hpp b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/inc/eventhubs_stress_scenarios.hpp new file mode 100644 index 000000000..a189af34d --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/inc/eventhubs_stress_scenarios.hpp @@ -0,0 +1,61 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#pragma once + +#include "argagg.hpp" + +constexpr const char* EventHubsLoggerName = "eventhubs_stress_test"; + +extern bool LogToConsole; + +struct EventHubsScenarioOptions +{ + /** + * @brief The name of the scenario option. + * + */ + std::string Name; + /** + * @brief The list of sentinels for parsing the option from command line. i. e. [`-o`, + * `--option`]. + * + */ + std::vector Activators; + + /** + * @brief The message that is displayed in the command line when help is requested. + * + */ + std::string HelpMessage; + /** + * @brief The number of arguments expected after the sentinel for the test option. + * + */ + uint16_t ExpectedArgs; + + /** + * @brief Make an option to be mandatory to run the test. + * + */ + bool Required = false; + + /** + * @brief Make the option to be replaced with **** on all outputs + * + */ + bool SensitiveData = false; +}; + +class EventHubsStressScenario { +public: + EventHubsStressScenario(){}; + virtual const std::string& GetStressScenarioName() = 0; + virtual const std::vector& GetScenarioOptions() = 0; + virtual void Initialize(argagg::parser_results const& parserResults) = 0; + virtual void Run() = 0; + virtual void Cleanup() = 0; + +protected: + virtual ~EventHubsStressScenario(){}; +}; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/inc/batch_stress_tests.hpp b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/inc/batch_stress_tests.hpp new file mode 100644 index 000000000..b49bcadc6 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/inc/batch_stress_tests.hpp @@ -0,0 +1,66 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#include "event_sender.hpp" +#include "eventhubs_stress_scenarios.hpp" + +#include +#include +#include +#include + +#include + +class BatchStressTest : public EventHubsStressScenario { +public: + BatchStressTest(); + +private: + std::string m_eventHubName; + std::string m_eventHubNamespace; + std::string m_eventHubConnectionString; + std::string m_checkpointStoreConnectionString; + std::string m_partitionId{"0"}; + + std::string m_tenantId; + std::string m_clientId; + std::string m_secret; + + uint32_t m_numberToSend{10000000}; + uint32_t m_batchSize{1000}; + std::chrono::system_clock::duration m_batchDuration{std::chrono::seconds(60)}; + uint32_t m_prefetchCount{0}; + std::uint32_t m_rounds{100}; + std::uint32_t m_paddingBytes{1024}; + std::uint32_t m_maxTimeouts{10}; + bool m_verbose{false}; + bool m_useSasCredential{false}; + + std::string m_scenarioName{"BatchStressTest"}; + + void SendEventsToPartition( + std::unique_ptr const& producerClient, + Azure::Core::Context const& context); + + void AddEndProperty( + Azure::Messaging::EventHubs::Models::EventData& event, + uint64_t expectedCount); + std::pair< + Azure::Messaging::EventHubs::Models::StartPosition, + Azure::Messaging::EventHubs::Models::EventHubPartitionProperties> + SendMessages(); + void ReceiveMessages(Azure::Messaging::EventHubs::Models::StartPosition const& startPosition); + void ConsumeForBatchTester( + uint32_t round, + Azure::Messaging::EventHubs::ConsumerClient& client, + Azure::Messaging::EventHubs::Models::StartPosition const& startPosition, + Azure::Core::Context const& context) const; + + // Inherited via EventHubsStressScenario + const std::string& GetStressScenarioName() override; + const std::vector& GetScenarioOptions() override; + void Initialize(argagg::parser_results const& parserResults) override; + + void Run() override; + void Cleanup() override; +}; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/inc/event_sender.hpp b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/inc/event_sender.hpp new file mode 100644 index 000000000..439718068 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/inc/event_sender.hpp @@ -0,0 +1,114 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#pragma once + +#include "opentelemetry_helpers.hpp" + +#include +#include +#include +#include +#include + +#include + +struct EventSenderOptions +{ + std::string PartitionId; + std::uint32_t MessageLimit; + std::uint32_t NumberOfExtraBytes; +}; + +class EventSender { +public: + static std::pair< + Azure::Messaging::EventHubs::Models::StartPosition, + Azure::Messaging::EventHubs::Models::EventHubPartitionProperties> + SendEventsToPartition( + std::unique_ptr const& producerClient, + EventSenderOptions const& senderOptions, + Azure::Core::Context const& context) + { + auto sendEventsScope{CreateStressSpan("SendEventsToPartition")}; + + std::cout << "[BEGIN] Sending " << senderOptions.MessageLimit << " messages to partition " + << senderOptions.PartitionId << ", with messages of size " + << senderOptions.NumberOfExtraBytes << std::endl; + + Azure::Messaging::EventHubs::Models::EventHubPartitionProperties beforeSendProps; + { + auto getPropertiesSpan{ + CreateStressSpan("SendEventsToPartition::GetPartitionProperties begin")}; + beforeSendProps = producerClient->GetPartitionProperties(senderOptions.PartitionId, context); + } + std::vector bodyData(senderOptions.NumberOfExtraBytes, 'a'); + + Azure::Messaging::EventHubs::EventDataBatchOptions batchOptions; + batchOptions.PartitionId = senderOptions.PartitionId; + Azure::Messaging::EventHubs::EventDataBatch batch{producerClient->CreateBatch(batchOptions)}; + + for (uint32_t j = 0; j < senderOptions.MessageLimit; ++j) + { + Azure::Messaging::EventHubs::Models::EventData event; + event.Body = bodyData; + event.Properties["Number"] = j; + event.Properties["PartitionID"] + = static_cast(senderOptions.PartitionId); + if (j == senderOptions.MessageLimit) + { + AddEndProperty(event, senderOptions.MessageLimit); + } + + { + auto batchAddMessageSpan{CreateStressSpan("SendEventsToPartition::BatchTryAddMessage")}; + if (!batch.TryAddMessage(event)) + { + if (batch.CurrentSize() == 0) + { + std::cerr << "Single message could not fit in batch"; + throw std::runtime_error("Single message could not fit in batch"); + } + auto sendBatchSpan{CreateStressSpan("SendBatch")}; + { + producerClient->Send(batch, context); + } + batch = producerClient->CreateBatch(batchOptions); + j -= 1; // Retry adding the same message. + } + } + } + if (batch.CurrentSize() > 0) + { + auto sendBatchSpan{CreateStressSpan("SendBatch")}; + { + sendBatchSpan.first->AddEvent("Send events", {{"event count", senderOptions.MessageLimit}}); + producerClient->Send(batch, context); + } + } + { + auto getPartitionPropertiesSpan{CreateStressSpan("GetPartitionProperties")}; + auto afterSendProps + = producerClient->GetPartitionProperties(senderOptions.PartitionId, context); + getPartitionPropertiesSpan.first->AddEvent( + "After Properties", {{"sequenceNumber", beforeSendProps.LastEnqueuedSequenceNumber}}); + + Azure::Messaging::EventHubs::Models::StartPosition afterStartPosition; + afterStartPosition.Inclusive = false; + afterStartPosition.SequenceNumber = beforeSendProps.LastEnqueuedSequenceNumber; + + std::cout << "[END] Sending " << senderOptions.MessageLimit << " messages to partition " + << senderOptions.PartitionId << " with messages of size " + << senderOptions.NumberOfExtraBytes << "b" << std::endl; + return std::make_pair(afterStartPosition, afterSendProps); + } + } + +private: + static void AddEndProperty( + Azure::Messaging::EventHubs::Models::EventData& event, + uint64_t expectedCount) + { + event.Properties["End"] = expectedCount; + } +}; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/inc/opentelemetry_helpers.hpp b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/inc/opentelemetry_helpers.hpp new file mode 100644 index 000000000..1c7153cd5 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/inc/opentelemetry_helpers.hpp @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#pragma once +#include + +#include +#include +#include +#include +#include + +opentelemetry::nostd::shared_ptr GetLogger(); + +opentelemetry::nostd::shared_ptr GetTracer(); + +std::pair, opentelemetry::trace::Scope> +CreateStressSpan(std::string const& name); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/inc/scope_guard.hpp b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/inc/scope_guard.hpp new file mode 100644 index 000000000..c9f0761bf --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/inc/scope_guard.hpp @@ -0,0 +1,184 @@ +/* + * Created on: 13/02/2018 + * Author: ricab + * + * See README.md for documentation of this header's public interface. + */ + +#ifndef SCOPE_GUARD_HPP_ +#define SCOPE_GUARD_HPP_ + +#include +#include + +#if __cplusplus >= 201703L +#define SG_NODISCARD [[nodiscard]] +#ifdef SG_REQUIRE_NOEXCEPT_IN_CPP17 +#define SG_REQUIRE_NOEXCEPT +#endif +#else +#define SG_NODISCARD +#endif + +namespace sg { +namespace detail { + /* --- Some custom type traits --- */ + + // Type trait determining whether a type is callable with no arguments + template struct is_noarg_callable_t : public std::false_type + { + }; // in general, false + + template + struct is_noarg_callable_t()())> : public std::true_type + { + }; // only true when call expression valid + + // Type trait determining whether a no-argument callable returns void + template + struct returns_void_t : public std::is_same()())> + { + }; + + /* Type trait determining whether a no-arg callable is nothrow invocable if + required. This is where SG_REQUIRE_NOEXCEPT logic is encapsulated. */ + template + struct is_nothrow_invocable_if_required_t + : public +#ifdef SG_REQUIRE_NOEXCEPT + std::is_nothrow_invocable /* Note: _r variants not enough to + confirm void return: any return can be + discarded so all returns are + compatible with void */ +#else + std::true_type +#endif + { + }; + + // logic AND of two or more type traits + template struct and_t : public and_t> + { + }; // for more than two arguments + + template + struct and_t : public std::conditional::type + { + }; // for two arguments + + // Type trait determining whether a type is a proper scope_guard callback. + template + struct is_proper_sg_callback_t : public and_t< + is_noarg_callable_t, + returns_void_t, + is_nothrow_invocable_if_required_t, + std::is_nothrow_destructible> + { + }; + + /* --- The actual scope_guard template --- */ + + template < + typename Callback, + typename = typename std::enable_if::value>::type> + class scope_guard; + + /* --- Now the friend maker --- */ + + template + detail::scope_guard make_scope_guard(Callback&& callback) noexcept( + std::is_nothrow_constructible::value); /* +we need this in the inner namespace due to MSVC bugs preventing +sg::detail::scope_guard from befriending a sg::make_scope_guard +template instance in the parent namespace (see https://is.gd/xFfFhE). */ + + /* --- The template specialization that actually defines the class --- */ + + template class SG_NODISCARD scope_guard final { + public: + typedef Callback callback_type; + + scope_guard(scope_guard&& other) noexcept( + std::is_nothrow_constructible::value); + + ~scope_guard() noexcept; // highlight noexcept dtor + + void dismiss() noexcept; + + public: + scope_guard() = delete; + scope_guard(const scope_guard&) = delete; + scope_guard& operator=(const scope_guard&) = delete; + scope_guard& operator=(scope_guard&&) = delete; + + private: + explicit scope_guard(Callback&& callback) noexcept( + std::is_nothrow_constructible::value); /* + meant for friends only */ + + friend scope_guard make_scope_guard(Callback&&) noexcept( + std::is_nothrow_constructible::value); /* +only make_scope_guard can create scope_guards from scratch (i.e. non-move) +*/ + + private: + Callback m_callback; + bool m_active; + }; + +} // namespace detail + +/* --- Now the single public maker function --- */ + +using detail::make_scope_guard; // see comment on declaration above + +} // namespace sg + +//////////////////////////////////////////////////////////////////////////////// +template +sg::detail::scope_guard::scope_guard(Callback&& callback) noexcept( + std::is_nothrow_constructible::value) + : m_callback(std::forward(callback)) /* use () instead of {} because + of DR 1467 (https://is.gd/WHmWuo), which still impacts older compilers + (e.g. GCC 4.x and clang <=3.6, see https://godbolt.org/g/TE9tPJ and + https://is.gd/Tsmh8G) */ + , + m_active{true} +{ +} + +//////////////////////////////////////////////////////////////////////////////// +template +sg::detail::scope_guard::scope_guard::~scope_guard() noexcept /* +need the extra injected-class-name here to make different compilers happy */ +{ + if (m_active) + m_callback(); +} + +//////////////////////////////////////////////////////////////////////////////// +template +sg::detail::scope_guard::scope_guard(scope_guard&& other) noexcept( + std::is_nothrow_constructible::value) + : m_callback(std::forward(other.m_callback)) // idem + , + m_active{std::move(other.m_active)} +{ + other.m_active = false; +} + +//////////////////////////////////////////////////////////////////////////////// +template inline void sg::detail::scope_guard::dismiss() noexcept +{ + m_active = false; +} + +//////////////////////////////////////////////////////////////////////////////// +template +inline auto sg::detail::make_scope_guard(Callback&& callback) noexcept( + std::is_nothrow_constructible::value) -> detail::scope_guard +{ + return detail::scope_guard{std::forward(callback)}; +} + +#endif /* SCOPE_GUARD_HPP_ */ diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/src/batch_stress_tests.cpp b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/src/batch_stress_tests.cpp new file mode 100644 index 000000000..8e13a6e22 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/src/batch_stress_tests.cpp @@ -0,0 +1,357 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#include "batch_stress_tests.hpp" + +#include "opentelemetry_helpers.hpp" +#include "scope_guard.hpp" + +#include + +using namespace Azure::Messaging::EventHubs; + +namespace trace_sdk = opentelemetry::sdk::trace; +namespace trace = opentelemetry::trace; +namespace logs_sdk = opentelemetry::sdk::logs; +namespace logs = opentelemetry::logs; + +namespace { + +} // namespace +BatchStressTest::BatchStressTest() {} + +namespace argagg { namespace convert { + + // Convert a string to a std::chrono::system_clock::duration + template <> std::chrono::system_clock::duration arg(const char* s) + { + std::string str(s); + std::string number; + std::string unit; + for (auto c : str) + { + if (std::isdigit(c)) + { + number.push_back(c); + } + else + { + unit.push_back(c); + } + } + auto value = std::stoll(number); + if (unit == "s") + { + return std::chrono::seconds(value); + } + else if (unit == "m") + { + return std::chrono::minutes(value); + } + else if (unit == "h") + { + return std::chrono::hours(value); + } + else if (unit == "ms") + { + return std::chrono::milliseconds(value); + } + else if (unit == "us") + { + return std::chrono::microseconds(value); + } + else + { + throw std::invalid_argument("Invalid duration unit: " + unit); + } + } +}} // namespace argagg::convert + +const std::string& BatchStressTest::GetStressScenarioName() { return m_scenarioName; } + +std::vector BatchScenarioOptions{ + {"NumberToSend", {"-c", "--send"}, "Number of events to send", 1}, + {"BatchSize", + {"-r", "--receive"}, + "Size to request each time we call ReceiveEvents(). Higher batch sizes will require higher " + "amounts of memory for this test", + 1}, + {"BatchDuration", {"-t", "--timeout"}, "Time to wait for each batch (ie: 1m, 30s, etc...)", 1}, + {"Prefetch", + {"-f", "--prefetch"}, + "Number of events to set for the prefetch. Negative numbers disable prefetch altogether. 0 " + "uses the default for the package", + 1}, + {"Rounds", + {"-n", "--rounds"}, + "Number of rounds to run with these parameters. -1 means MAX_INT", + 1}, + + {"PaddingBytes", + {"-P", "--padding"}, + "Extra number of bytes to add onto each message body.", + 1}, + {"PartitionId", + {"-p", "--partition"}, + "Partition ID to send events to and receive events from", + 1}, + {"MaxTimeouts", + {"-m", "--maxtimeouts"}, + "Number of consecutive receive timeouts allowed before quitting", + 0}, + {"UseSasCredential", + {"-S", "--useSasCredential"}, + "Use a SAS credential for authentication", + 0}, + {"SleepAfter", {"--sleepAfter"}, "Time to sleep after test completes", 1}, +}; + +// Default option values. + +// Note that the DefaultNumberToSend value is artificially reduced to 100 until the +// MessageSender::Open code fully supports Open. +constexpr const std::uint32_t DefaultNumberToSend = /*1000000*/ 100; +constexpr const std::uint32_t DefaultBatchSize = 1000; +constexpr const std::uint32_t DefaultPrefetch = 0; +constexpr const auto DefaultDuration = std::chrono::seconds(60); +constexpr const std::uint32_t DefaultRounds = 100; +constexpr const std::uint32_t DefaultPaddingBytes = 1024; +const std::string DefaultPartitionId{"0"}; // constexpr std::string is a c++17 feature. +constexpr const std::uint32_t DefaultMaxTimeouts = 10; + +const std::vector& BatchStressTest::GetScenarioOptions() +{ + return BatchScenarioOptions; +} + +void BatchStressTest::Initialize(argagg::parser_results const& parserResults) +{ + m_numberToSend = parserResults["NumberToSend"].as(DefaultNumberToSend); + m_batchSize = parserResults["BatchSize"].as(DefaultBatchSize); + m_batchDuration + = parserResults["BatchDuration"].as(DefaultDuration); + m_prefetchCount = parserResults["Prefetch"].as(DefaultPrefetch); + m_rounds = parserResults["Rounds"].as(DefaultRounds); + m_paddingBytes = parserResults["PaddingBytes"].as(DefaultPaddingBytes); + m_partitionId = parserResults["PartitionId"].as(DefaultPartitionId); + m_maxTimeouts = parserResults["MaxTimeouts"].as(DefaultMaxTimeouts); + m_verbose = parserResults["verbose"].as(false); + m_useSasCredential = parserResults["UseSasCredential"].as(true); + if (m_rounds == 0xffffffff) + { + m_rounds = (std::numeric_limits::max)(); + } + + auto span{CreateStressSpan("Initialize")}; + span.first->SetAttribute("NumberToSend", m_numberToSend); + span.first->SetAttribute("BatchSize", m_batchSize); + span.first->SetAttribute("BatchDuration", m_batchDuration.count()); + span.first->SetAttribute("Prefetch", m_prefetchCount); + span.first->SetAttribute("Rounds", m_rounds); + span.first->SetAttribute("PaddingBytes", m_paddingBytes); + span.first->SetAttribute("PartitionId", m_partitionId); + span.first->SetAttribute("MaxTimeouts", m_maxTimeouts); + span.first->SetAttribute("Verbose", m_verbose); + span.first->SetAttribute("UseSasCredential", m_useSasCredential); + + // m_sleepAfter = + // parserResults["SleepAfter"].as(std::chrono::seconds(0)); + + m_eventHubName = Azure::Core::_internal::Environment::GetVariable("EVENTHUB_NAME"); + + if (m_eventHubName.empty()) + { + GetLogger()->Fatal("Could not find required environment variable EVENTHUB_NAME"); + std::cerr << "Missing required environment variable EVENTHUB_NAME" << std::endl; + throw std::runtime_error("Missing environment variable, aborting."); + } + + m_eventHubNamespace = Azure::Core::_internal::Environment::GetVariable("EVENTHUBS_HOST"); + if (m_eventHubNamespace.empty()) + { + GetLogger()->Fatal("Could not find required environment variable EVENTHUBS_HOST"); + std::cerr << "Missing required environment variable EVENTHUBS_HOST" << std::endl; + throw std::runtime_error("Missing environment variable, aborting."); + } + + if (m_useSasCredential) + { + m_eventHubConnectionString + = Azure::Core::_internal::Environment::GetVariable("EVENTHUB_CONNECTION_STRING"); + if (m_eventHubConnectionString.empty()) + { + std::cerr << "Missing required environment variable EVENTHUB_CONNECTION_STRING" << std::endl; + GetLogger()->Fatal("Could not find required environment variable EVENTHUB_NAME"); + throw std::runtime_error("Missing environment variable, aborting."); + } + m_checkpointStoreConnectionString + = Azure::Core::_internal::Environment::GetVariable("CHECKPOINT_STORE_CONNECTION_STRING"); + } +} + +void BatchStressTest::Run() +{ + std::cout << "Run " << std::endl; + auto sendOutput = SendMessages(); + std::cout << "Starting receive tests for partition " << m_partitionId << std::endl; + std::cout << " Start position: " << sendOutput.first << std::endl + << " End position: " << sendOutput.second.LastEnqueuedSequenceNumber << std::endl; + + ReceiveMessages(sendOutput.first); +} +void BatchStressTest::Cleanup() {} + +std::pair< + Azure::Messaging::EventHubs::Models::StartPosition, + Azure::Messaging::EventHubs::Models::EventHubPartitionProperties> +BatchStressTest::SendMessages() +{ + std::unique_ptr producerClient; + if (m_useSasCredential) + { + producerClient = std::make_unique( + m_eventHubConnectionString, m_eventHubName); + } + else + { + producerClient = std::make_unique( + m_eventHubNamespace, + m_eventHubName, + std::make_shared()); + } + Azure::Core::Context context; + auto scopeGuard{ + sg::make_scope_guard([&context, &producerClient]() { producerClient->Close(context); })}; + + try + { + EventSenderOptions senderOptions; + senderOptions.PartitionId = m_partitionId; + senderOptions.MessageLimit = m_numberToSend; + senderOptions.NumberOfExtraBytes = m_paddingBytes; + auto sendEventsResult + = EventSender::SendEventsToPartition(producerClient, senderOptions, context); + producerClient->Close(context); + + return std::make_pair(sendEventsResult.first, sendEventsResult.second); + } + catch (std::exception const& ex) + { + GetTracer()->GetCurrentSpan()->AddEvent( + "Exception received", {{trace::SemanticConventions::kExceptionMessage, ex.what()}}); + std::cerr << "Exception " << ex.what(); + throw; + } +} +void BatchStressTest::ReceiveMessages( + Azure::Messaging::EventHubs::Models::StartPosition const& startPosition) +{ + auto span{CreateStressSpan("ReceiveMessages")}; + + try + { + Azure::Core::Context context; + ConsumerClientOptions clientOptions; + clientOptions.ApplicationID = "StressConsumerClient"; + + std::unique_ptr consumerClient; + if (m_useSasCredential) + { + consumerClient = std::make_unique( + m_eventHubConnectionString, m_eventHubName, DefaultConsumerGroup, clientOptions); + } + else + { + consumerClient = std::make_unique( + m_eventHubNamespace, + m_eventHubName, + std::make_shared()); + } + + { + auto getPartitionPropertiesSpan{ + CreateStressSpan("ReceiveMessages::GetPartitionProperties to warm up connection")}; + auto consumerProperties = consumerClient->GetEventHubProperties(context); + } + + for (auto round = 0u; round < m_rounds; round += 1) + { + std::cout << "Round " << round << std::endl; + auto consumeForTesterSpan{CreateStressSpan("ConsumeForBatchTester")}; + consumeForTesterSpan.first->SetAttribute("Round", round); + ConsumeForBatchTester(round, *consumerClient, startPosition, context); + } + + consumerClient->Close(context); + } + catch (std::exception const& ex) + { + GetTracer()->GetCurrentSpan()->AddEvent( + "Exception received", {{trace::SemanticConventions::kExceptionMessage, ex.what()}}); + std::cerr << "Exception " << ex.what(); + throw; + } +} + +void BatchStressTest::ConsumeForBatchTester( + uint32_t round, + ConsumerClient& client, + Models::StartPosition const& startPosition, + Azure::Core::Context const& context) const +{ + std::unique_ptr partitionClient; + { + auto span{CreateStressSpan("ConsumeForBatchTester::CreatePartitionClient")}; + PartitionClientOptions partitionOptions; + partitionOptions.StartPosition = startPosition; + partitionOptions.Prefetch = m_prefetchCount; + partitionClient = std::make_unique( + client.CreatePartitionClient(m_partitionId, partitionOptions)); + std::cout << "[r: " << round << "/" << m_rounds << "p: " << m_partitionId + << "] Starting to receive messages from partition" << std::endl; + } + size_t total = 0; + uint32_t numCancels = 0; + constexpr const uint32_t cancelLimit = 5; + + do + { + auto duration = std::chrono::system_clock::now() + m_batchDuration; + auto receiveContext = context.WithDeadline(duration); + + try + { + auto span{CreateStressSpan("ConsumeForBatchTester::ReceiveEvents")}; + auto events = partitionClient->ReceiveEvents(m_batchSize, receiveContext); + total += events.size(); + std::cout << "Total: " << total << std::endl; + if (total >= m_numberToSend) + { + break; + } + } + catch (Azure::Messaging::EventHubs::EventHubsException& ex) + { + std::cerr << "Exception thrown while receiving messages." << ex.what() << std::endl; + if (!ex.IsTransient) + { + std::cerr << "Error is not transient, aborting test." << std::endl; + throw; + } + } + catch (Azure::Core::OperationCancelledException&) + { + numCancels += 1; + if (numCancels > cancelLimit) + { + std::cerr << "cancellation errors were received " << numCancels + << " times in a row. Stoping test." << std::endl; + throw std::runtime_error("Too many cancellations received in a row, aborting test."); + } + else + { + std::cout << "received " << total << "/" << m_numberToSend << "then received error"; + } + } + } while (true); +} diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/src/opentelemetry_helpers.cpp b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/src/opentelemetry_helpers.cpp new file mode 100644 index 000000000..c126e6774 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/src/scenarios/src/opentelemetry_helpers.cpp @@ -0,0 +1,33 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#include "opentelemetry_helpers.hpp" + +#include "eventhubs_stress_scenarios.hpp" + +opentelemetry::nostd::shared_ptr GetLogger() +{ + auto logger{opentelemetry::logs::Provider::GetLoggerProvider()->GetLogger(EventHubsLoggerName)}; + return logger; +} + +opentelemetry::nostd::shared_ptr GetTracer() +{ + return opentelemetry::trace::Provider::GetTracerProvider()->GetTracer(EventHubsLoggerName); +} + +std::pair, opentelemetry::trace::Scope> +CreateStressSpan(std::string const& name) +{ + auto tracer = GetTracer(); + opentelemetry::trace::StartSpanOptions options; + options.parent = tracer->GetCurrentSpan()->GetContext(); + options.kind = opentelemetry::trace::SpanKind::kClient; + auto newSpan = tracer->StartSpan(name, options); + + auto scope{tracer->WithActiveSpan(newSpan)}; + + return std::make_pair< + opentelemetry::nostd::shared_ptr, + opentelemetry::trace::Scope>(std::move(newSpan), std::move(scope)); +} diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/stress-test-resources.bicep b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/stress-test-resources.bicep index 3d28a82a7..a70ffe1b0 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/stress-test-resources.bicep +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/stress-test-resources.bicep @@ -124,5 +124,5 @@ resource storageAccountName_default_container 'Microsoft.Storage/storageAccounts } output EVENTHUB_NAME string = eventHubName -output EVENTHUB_CONNECTION_STRING string = listKeys(resourceId('Microsoft.EventHub/namespaces/authorizationRules', namespaceName, 'RootManageSharedAccessKey'), apiVersion).primaryConnectionString -output CHECKPOINTSTORE_STORAGE_CONNECTION_STRING string = 'DefaultEndpointsProtocol=https;AccountName=${storageAccountName};AccountKey=${listKeys(storageAccount.id, storageApiVersion).keys[0].value};EndpointSuffix=${storageEndpointSuffix}' +output EVENTHUB_CONNECTION_STRING string = '"${listKeys(resourceId('Microsoft.EventHub/namespaces/authorizationRules', namespaceName, 'RootManageSharedAccessKey'), apiVersion).primaryConnectionString}"' +output CHECKPOINTSTORE_STORAGE_CONNECTION_STRING string = '"DefaultEndpointsProtocol=https;AccountName=${storageAccountName};AccountKey=${listKeys(storageAccount.id, storageApiVersion).keys[0].value};EndpointSuffix=${storageEndpointSuffix}"' diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/templates/deploy-job.yaml b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/templates/deploy-job.yaml index 2802f08f3..d063b76ed 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/templates/deploy-job.yaml +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/templates/deploy-job.yaml @@ -6,17 +6,29 @@ metadata: labels: chaos: "{{ default false .Stress.chaos }}" spec: + shareProcessNamespace: true containers: + - name: otel-collector +# image: mcr.microsoft.com/oss/otel/opentelemetry-collector-contrib:0.94.0 + image: stresspgs7b6dif73rup6.azurecr.io/stress/opentelemetry-collector-contrib-shell:0.94.0 + imagePullPolicy: Always + resources: + limits: + memory: 500Mi + cpu: "0.5" + env: + - name: APPLICATIONINSIGHTS_CONNECTION_STRING + value: "TO BE FILLED IN LATER." - name: main image: {{ .Stress.imageTag }} imagePullPolicy: Always - command: - [ - "valgrind", - "--tool=memcheck", - "-s", - "./azure-messaging-eventhubs-stress-test", - ] + command: ['bash', '-c'] + args: + - | + set -a; + source $ENV_FILE; + ./azure-messaging-eventhubs-stress-test produce BatchStressTest; + kill `pidof otelcol-contrib`; {{- include "stress-test-addons.container-env" . | nindent 6 }} resources: limits: diff --git a/sdk/eventhubs/azure-messaging-eventhubs/vcpkg.json b/sdk/eventhubs/azure-messaging-eventhubs/vcpkg.json index a41417053..2e4e6b316 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/vcpkg.json +++ b/sdk/eventhubs/azure-messaging-eventhubs/vcpkg.json @@ -3,6 +3,10 @@ "version-string": "1.0.0", "dependencies": [ "azure-core-cpp", - "azure-core-amqp-cpp" + "azure-core-amqp-cpp", + { + "name": "opentelemetry-cpp", + "features": [ "otlp-http" ] + } ] } diff --git a/sdk/eventhubs/ci.yml b/sdk/eventhubs/ci.yml index ca060b372..ead82c168 100644 --- a/sdk/eventhubs/ci.yml +++ b/sdk/eventhubs/ci.yml @@ -30,8 +30,8 @@ stages: CtestRegex: "azure-messaging-eventhubs.*" LiveTestCtestRegex: "azure-messaging-eventhubs.*" LiveTestTimeoutInMinutes: 120 - LineCoverageTarget: 40 - BranchCoverageTarget: 20 + LineCoverageTarget: 43 + BranchCoverageTarget: 22 Artifacts: - Name: azure-messaging-eventhubs Path: azure-messaging-eventhubs diff --git a/vcpkg.json b/vcpkg.json index 78701eb20..023781996 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -16,6 +16,7 @@ }, { "name": "opentelemetry-cpp", + "features": ["otlp-http"], "platform": "!(windows & !static)", "version>=": "1.3.0" },