diff --git a/CMakePresets.json b/CMakePresets.json index 2c71dc84b..b3f83b004 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -358,6 +358,21 @@ "curl-transport" ] }, + { + "name": "x64-static-release-perftests-asan", + "displayName": "x64 Release static With Perf Tests and samples, plus ASAN", + "inherits": [ + "x64-static", + "release-build", + "enable-tests", + "enable-samples", + "enable-perf", + "enable-address-sanitizer" + ], + "cacheVariables": { + "DISABLE_AZURE_CORE_OPENTELEMETRY": true + } + }, { "name": "x86-msvc-static-debug-perftests", "displayName": "x86 MSVC Debug static With Perf Tests and samples", @@ -480,14 +495,14 @@ { "name": "linux-basic-g++", "displayName": "Linux G++", - "description": "Using compilers: C = /usr/bin/cc, CXX = /usr/bin/c++", + "description": "Using compilers: C = /usr/bin/gcc, CXX = /usr/bin/g++", "binaryDir": "${sourceDir}/out/build/${presetName}", "generator": "Ninja", "hidden": true, "cacheVariables": { "CMAKE_INSTALL_PREFIX": "${sourceDir}/out/install/${presetName}", - "CMAKE_C_COMPILER": "/usr/bin/cc", - "CMAKE_CXX_COMPILER": "/usr/bin/c++" + "CMAKE_C_COMPILER": "/usr/bin/gcc", + "CMAKE_CXX_COMPILER": "/usr/bin/g++" }, "condition": { "type": "equals", @@ -585,6 +600,7 @@ "name": "g++-debug-asan-tests", "inherits": [ "linux-g++-debug-tests", + "enable-tests", "enable-address-sanitizer" ], "displayName": "Linux g++, ASAN+Debug+Tests" diff --git a/sdk/core/azure-core-amqp/CHANGELOG.md b/sdk/core/azure-core-amqp/CHANGELOG.md index 81ecdaff6..3cc4e9459 100644 --- a/sdk/core/azure-core-amqp/CHANGELOG.md +++ b/sdk/core/azure-core-amqp/CHANGELOG.md @@ -8,6 +8,9 @@ ### Bugs Fixed +- Fixed several memory leaks. +- AMQP Link Credits now work as expected. + ### Other Changes ## 1.0.0-beta.5 (2023-11-07) diff --git a/sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/common/async_operation_queue.hpp b/sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/common/async_operation_queue.hpp index bf45937e2..8ef8e728d 100644 --- a/sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/common/async_operation_queue.hpp +++ b/sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/common/async_operation_queue.hpp @@ -22,11 +22,16 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace * the AsyncOperationQueue. WaitForResult will block until a result is available. */ template class AsyncOperationQueue final { - public: AsyncOperationQueue() = default; ~AsyncOperationQueue() = default; + AsyncOperationQueue(const AsyncOperationQueue&) = delete; + AsyncOperationQueue& operator=(const AsyncOperationQueue&) = delete; + + AsyncOperationQueue(AsyncOperationQueue&&) = default; + AsyncOperationQueue& operator=(AsyncOperationQueue&&) = default; + void CompleteOperation(T... operationParameters) { std::unique_lock lock(m_operationComplete); diff --git a/sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/common/global_state.hpp b/sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/common/global_state.hpp index a2399483b..c5c179135 100644 --- a/sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/common/global_state.hpp +++ b/sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/common/global_state.hpp @@ -5,6 +5,7 @@ #include +#include #include #include #include @@ -34,6 +35,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace std::list> m_pollables; std::mutex m_pollablesMutex; std::thread m_pollingThread; + std::atomic m_activelyPolling; bool m_stopped{false}; public: @@ -47,11 +49,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace void AddPollable(std::shared_ptr pollable); - void RemovePollable(std::shared_ptr pollable) - { - std::lock_guard lock(m_pollablesMutex); - m_pollables.remove(pollable); - } + void RemovePollable(std::shared_ptr pollable); void AssertIdle() { diff --git a/sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/connection_string_credential.hpp b/sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/connection_string_credential.hpp index 81dc70aac..259e81bdf 100644 --- a/sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/connection_string_credential.hpp +++ b/sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/connection_string_credential.hpp @@ -24,6 +24,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal { { ParseConnectionString(connectionString); } + ConnectionStringParser(const ConnectionStringParser&) = default; + ConnectionStringParser& operator=(const ConnectionStringParser&) = default; + ConnectionStringParser(ConnectionStringParser&&) = default; + ConnectionStringParser& operator=(ConnectionStringParser&&) = default; ~ConnectionStringParser() = default; std::string const& GetEndpoint() const { return m_endpoint; } @@ -40,7 +44,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal { std::string m_sharedAccessKey; std::string m_uri; std::string m_hostName; - uint16_t m_port; + uint16_t m_port{}; std::string m_entityPath; }; diff --git a/sdk/core/azure-core-amqp/inc/azure/core/amqp/models/amqp_header.hpp b/sdk/core/azure-core-amqp/inc/azure/core/amqp/models/amqp_header.hpp index 8009f60fa..8cf062603 100644 --- a/sdk/core/azure-core-amqp/inc/azure/core/amqp/models/amqp_header.hpp +++ b/sdk/core/azure-core-amqp/inc/azure/core/amqp/models/amqp_header.hpp @@ -23,11 +23,12 @@ namespace Azure { namespace Core { namespace _internal { using type = BasicUniqueHandle; }; }}} // namespace Azure::Core::_internal - -namespace Azure { namespace Core { namespace Amqp { namespace Models { +namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace _detail { using UniqueMessageHeaderHandle = Azure::Core::_internal::UniqueHandle; +}}}}} // namespace Azure::Core::Amqp::Models::_detail +namespace Azure { namespace Core { namespace Amqp { namespace Models { /** * @brief The message header section carries standard delivery details about the transfer of a * message through the AMQP network. @@ -37,10 +38,23 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { */ struct MessageHeader final { - + /** @brief Default constructor. */ MessageHeader() = default; + /** @brief Destructor. */ ~MessageHeader() = default; + /** @brief Move constructor.*/ + MessageHeader(MessageHeader&& other) = default; + + /** @brief Move assignment operator.*/ + MessageHeader& operator=(MessageHeader&& other) = default; + + /** @brief Copy constructor.*/ + MessageHeader(MessageHeader const& other) = default; + + /** @brief Copy assignment operator.*/ + MessageHeader& operator=(MessageHeader const& other) = default; + /** @brief Compare two AMQP Message Headers for equality. * * @param that - the AMQP Message Header to compare to. @@ -126,8 +140,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace */ struct MessageHeaderFactory { - static MessageHeader FromUamqp(UniqueMessageHeaderHandle const& properties); - static UniqueMessageHeaderHandle ToUamqp(MessageHeader const& properties); + static MessageHeader FromUamqp(_detail::UniqueMessageHeaderHandle const& properties); + static _detail::UniqueMessageHeaderHandle ToUamqp(MessageHeader const& properties); }; }}}}} // namespace Azure::Core::Amqp::Models::_internal diff --git a/sdk/core/azure-core-amqp/inc/azure/core/amqp/models/amqp_message.hpp b/sdk/core/azure-core-amqp/inc/azure/core/amqp/models/amqp_message.hpp index 0131ed657..8f49ad009 100644 --- a/sdk/core/azure-core-amqp/inc/azure/core/amqp/models/amqp_message.hpp +++ b/sdk/core/azure-core-amqp/inc/azure/core/amqp/models/amqp_message.hpp @@ -101,13 +101,17 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { * * By default, AMQP uses 0, however services can override this to * express additional semantics about the message payload. + * + * For more information, see [AMQP Message + * Format](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format). */ uint32_t MessageFormat = AmqpDefaultMessageFormatValue; /** @brief The header for the message. * * For more information, see [AMQP Message - * Format](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format). + * Header](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-header) + * */ MessageHeader Header; @@ -141,8 +145,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { /** @brief Delivery Tag for the message. * - * For more information, see [AMQP Application - * Properties](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-application-properties). + * For more information, see [AMQP Transport + * Transfer](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-delivery-tag). */ AmqpValue DeliveryTag; diff --git a/sdk/core/azure-core-amqp/inc/azure/core/amqp/models/amqp_properties.hpp b/sdk/core/azure-core-amqp/inc/azure/core/amqp/models/amqp_properties.hpp index 3f37d8614..73c14ebd7 100644 --- a/sdk/core/azure-core-amqp/inc/azure/core/amqp/models/amqp_properties.hpp +++ b/sdk/core/azure-core-amqp/inc/azure/core/amqp/models/amqp_properties.hpp @@ -37,9 +37,30 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { */ struct MessageProperties final { + /** @brief Default constructor. + * + * Creates an empty MessageProperties object. + */ MessageProperties() = default; + + /** @brief Destructor. + * + * Destroys the MessageProperties object. + */ ~MessageProperties() = default; + /** @brief Move Constructor. */ + MessageProperties(MessageProperties&& other) = default; + + /** @brief Move Assignment operator. */ + MessageProperties& operator=(MessageProperties&& other) = default; + + /** @brief Copy Constructor. */ + MessageProperties(MessageProperties const& other) = default; + + /** @brief Copy Assignment operator. */ + MessageProperties& operator=(MessageProperties const& other) = default; + /** @brief The message-id, if set, uniquely identifies a message within the message system. * The message producer is usually responsible for setting the message-id in such a way that * it is assured to be globally unique. A broker MAY discard a message as a duplicate if the diff --git a/sdk/core/azure-core-amqp/inc/azure/core/amqp/models/amqp_value.hpp b/sdk/core/azure-core-amqp/inc/azure/core/amqp/models/amqp_value.hpp index 5f22c79e5..0bfe2d43f 100644 --- a/sdk/core/azure-core-amqp/inc/azure/core/amqp/models/amqp_value.hpp +++ b/sdk/core/azure-core-amqp/inc/azure/core/amqp/models/amqp_value.hpp @@ -5,6 +5,8 @@ #include "amqp_header.hpp" +#include +#include #include #include @@ -32,6 +34,7 @@ namespace Azure { namespace Core { namespace _internal { namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace _detail { using UniqueAmqpValueHandle = Azure::Core::_internal::UniqueHandle; + std::ostream& operator<<(std::ostream& os, AMQP_VALUE_DATA_TAG const* value); }}}}} // namespace Azure::Core::Amqp::Models::_detail namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace _internal { @@ -111,6 +114,29 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { AmqpValue() noexcept; ~AmqpValue(); + /** @brief Construct an AMQP Value from an existing AMQP Value + * @param that - source value to copy. + */ + AmqpValue(AmqpValue const& that) noexcept; + + /** @brief Move an AMQP Value to another existing AMQP Value + * @param that - source value to move. + */ + AmqpValue(AmqpValue&& that) noexcept; + + /** @brief Copy an AMQP value to the current AMQP value. + * + * @param that the other AMQP Value to copy. + * @returns "this". + */ + AmqpValue& operator=(AmqpValue const& that); + /** @brief Move an AMQP value to the current AMQP value. + * + * @param that the other AMQP Value to move. + * @returns "this". + */ + AmqpValue& operator=(AmqpValue&& that) noexcept; + /** @brief Construct an AMQP boolean value. * * Defined in [AMQP Core Types @@ -282,16 +308,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { */ AmqpValue(Azure::Core::Uuid const& value); - /** @brief Construct an AMQP Value from an existing AMQP Value - * @param that - source value to copy. - */ - AmqpValue(AmqpValue const& that) noexcept; - - /** @brief Move an AMQP Value to another existing AMQP Value - * @param that - source value to move. - */ - AmqpValue(AmqpValue&& that) noexcept; - // Interoperability functions for uAMQP /// @cond @@ -304,29 +320,29 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { */ operator AMQP_VALUE_DATA_TAG*() const; + /** @brief Interoperability helper function which releases the internal AmqpValue. + * + * @returns uAMQP AMQP_VALUE object. + * + * @remarks This is an internal operator which should not be called by customers. + */ + AMQP_VALUE_DATA_TAG* Release(); + /** @brief Interoperability helper function which creates an AmqpValue from a uAMQP * AMQP_VALUE object. * * @param value source uAMQP AMQP_VALUE object. * * @remarks This is an internal operator which should not be called by customers. + * + * @remarks Note that this does NOT capture the passed in AMQP_VALUE object, instead it clones + * the underlying AMQP_VALUE. This is why it takes a UniqueAmqpValueHandle as a parameter + * instead of a raw AMQP_VALUE - that ensures that someone will free the underlying AMQP_VALUE + * if needed. */ - AmqpValue(AMQP_VALUE_DATA_TAG* value); + AmqpValue(_detail::UniqueAmqpValueHandle const& value); /// @endcond - /** @brief Copy an AMQP value to the current AMQP value. - * - * @param that the other AMQP Value to copy. - * @returns "this". - */ - AmqpValue& operator=(AmqpValue const& that); - /** @brief Move an AMQP value to the current AMQP value. - * - * @param that the other AMQP Value to move. - * @returns "this". - */ - AmqpValue& operator=(AmqpValue&& that) noexcept; - /** @brief Equality comparison operator. * @param that - Value to compare to this value. * @returns true if the that is equal to this. @@ -565,6 +581,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { static AmqpValue Deserialize(uint8_t const* data, size_t size); private: + AmqpValue(AMQP_VALUE_DATA_TAG*) = delete; _detail::UniqueAmqpValueHandle m_value; }; std::ostream& operator<<(std::ostream& os, AmqpValue const& value); @@ -586,7 +603,21 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace AmqpCollectionBase(initializer_type const& initializer) : m_value{initializer} {} AmqpCollectionBase(T initializer) : m_value{initializer} {} - AmqpCollectionBase() {} + AmqpCollectionBase(){}; + + // Copy constructor + AmqpCollectionBase(const AmqpCollectionBase& other) = default; + + // Copy assignment operator + AmqpCollectionBase& operator=(const AmqpCollectionBase& other) = default; + + // Move constructor + AmqpCollectionBase(AmqpCollectionBase&& other) noexcept = default; + + // Move assignment operator + AmqpCollectionBase& operator=(AmqpCollectionBase&& other) = default; + + ~AmqpCollectionBase() = default; public: /** @brief Returns the underlying value. @@ -594,11 +625,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace explicit operator T const &() const { return m_value; } /** @brief Convert this collection type to an AMQP value.*/ - explicit operator AmqpValue() const { return static_cast(*this).get(); } + AmqpValue AsAmqpValue() const { return AmqpValue{UniqueAmqpValueHandle{*this}}; } /** @brief Returns the size of the underlying value.*/ inline typename T::size_type size() const { return m_value.size(); } + /// @brief Array accessor operator. + /// @param pos Position of an element in the container. + /// @return element value. const typename T::value_type& operator[](const typename T::size_type pos) const noexcept { return m_value.operator[](pos); @@ -619,6 +653,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace /** @brief Returns true if the underlying value is empty.*/ bool empty() const noexcept { return m_value.empty(); } + protected: /** * @brief Convert an AmqpCollectionBase instance to a uAMQP AMQP_VALUE. * @@ -626,7 +661,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace * implementation. * */ - operator UniqueAmqpValueHandle() const; + virtual operator UniqueAmqpValueHandle() const; }; }}}}} // namespace Azure::Core::Amqp::Models::_detail @@ -643,6 +678,21 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { /** @brief Construct a new AmqpArray object with an initializer list. */ AmqpArray(initializer_type const& values); + /** @brief Destroy an array. */ + virtual ~AmqpArray() = default; + + /** @brief Copy constructor */ + AmqpArray(const AmqpArray& other) = default; + + /** @brief Copy assignment operator */ + AmqpArray& operator=(const AmqpArray& other) = default; + + /** @brief Move constructor */ + AmqpArray(AmqpArray&& other) noexcept = default; + + /** @brief Move assignment operator */ + AmqpArray& operator=(AmqpArray&& other) noexcept = default; + /** @brief Construct a new AmqpArray object from an existing uAMQP AMQP_VALUE item * @remarks Note that this does NOT capture the passed in AMQP_VALUE object, the caller is * responsible for freeing that object. @@ -674,6 +724,18 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { { } + /** @brief Move Constructor */ + AmqpMap(AmqpMap&& other) noexcept = default; + + /** @brief Move assignment operator */ + AmqpMap& operator=(AmqpMap&& other) noexcept = default; + + /** @brief Copy Constructor */ + AmqpMap(const AmqpMap& other) = default; + + /** @brief Copy assignment operator */ + AmqpMap& operator=(const AmqpMap& other) = default; + /** @brief Construct a new AmqpMap object from an existing uAMQP AMQP_VALUE item * @remarks Note that this does NOT capture the passed in AMQP_VALUE object, the caller is * responsible for freeing that object. @@ -685,6 +747,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { */ AmqpMap(AMQP_VALUE_DATA_TAG* const value); + virtual ~AmqpMap() = default; + /** @brief Access an element in the map. * * @return a reference to the value associated with the specified key. @@ -714,12 +778,25 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { class AmqpList final : public _detail::AmqpCollectionBase, AmqpList> { public: AmqpList() : AmqpCollectionBase(){}; + virtual ~AmqpList() = default; /** @brief Construct a new AmqpList object with an initializer list. */ AmqpList(std::initializer_list::value_type> const& values) : AmqpCollectionBase(values) { } + /** @brief Copy Constructor */ + AmqpList(const AmqpList& other) = default; + + /** @brief Copy assignment operator */ + AmqpList& operator=(const AmqpList& other) = default; + + /** @brief Move Constructor */ + AmqpList(AmqpList&& other) noexcept = default; + + /** @brief Move assignment operator */ + AmqpList& operator=(AmqpList&& other) noexcept = default; + /** @brief Construct a new AmqpList object from an existing uAMQP AMQP_VALUE item * * @remarks Note that this does NOT capture the passed in AMQP_VALUE object, the caller is @@ -744,11 +821,24 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { : public _detail::AmqpCollectionBase, AmqpBinaryData> { public: AmqpBinaryData() : AmqpCollectionBase(){}; + virtual ~AmqpBinaryData() = default; /** @brief Construct a new AmqpBinaryData object with an initializer list. */ AmqpBinaryData(initializer_type const& values) : AmqpCollectionBase(values){}; /** @brief Construct a new AmqpBinaryData from a vector of bytes. */ AmqpBinaryData(std::vector const& values) : AmqpCollectionBase(values){}; + /** @brief Copy constructor */ + AmqpBinaryData(const AmqpBinaryData& other) = default; + + /** @brief Copy assignment operator */ + AmqpBinaryData& operator=(const AmqpBinaryData& other) = default; + + /** @brief Move Constructor */ + AmqpBinaryData(AmqpBinaryData&& other) noexcept = default; + + /** @brief Move assignment operator */ + AmqpBinaryData& operator=(AmqpBinaryData&& other) noexcept = default; + /** @brief Assign a vector of bytes to the current AmqpBinaryData. */ AmqpBinaryData& operator=(std::vector const& values) { @@ -780,6 +870,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { class AmqpSymbol final : public _detail::AmqpCollectionBase { public: AmqpSymbol() : AmqpCollectionBase(){}; + virtual ~AmqpSymbol() = default; + /** @brief Construct a new AmqpSymbol object with an initializer list. */ AmqpSymbol(std::string const& values) : AmqpCollectionBase(values){}; @@ -789,6 +881,18 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { /** @brief Construct a new AmqpSymbol object from a constant string value. */ AmqpSymbol(const char* const values) : AmqpCollectionBase(values){}; + /** @brief Copy constructor */ + AmqpSymbol(const AmqpSymbol& other) = default; + + /** @brief Copy Assignment operator */ + AmqpSymbol& operator=(const AmqpSymbol& other) = default; + + /** @brief Move constructor */ + AmqpSymbol(AmqpSymbol&& other) noexcept = default; + + /** @brief Move Assignment operator */ + AmqpSymbol& operator=(AmqpSymbol&& other) noexcept = default; + /** @brief Construct a new AmqpSymbol object from an existing uAMQP AMQP_VALUE item * * @remarks Note that this does NOT capture the passed in AMQP_VALUE object, the caller is @@ -830,6 +934,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { public: AmqpTimestamp(); + ~AmqpTimestamp() = default; + /** @brief Construct a new AmqpTimestamp object . */ AmqpTimestamp(std::chrono::milliseconds const& values); @@ -845,10 +951,17 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { */ AmqpTimestamp(AMQP_VALUE_DATA_TAG* const value); - /** - * @brief Convert an existing AmqpSymbol to an AmqpValue. - */ - explicit operator AmqpValue() const; + /** @brief Copy constructor */ + AmqpTimestamp(const AmqpTimestamp& other) = default; + + /** @brief Copy Assignment operator */ + AmqpTimestamp& operator=(const AmqpTimestamp& other) = default; + + /** @brief Move constructor */ + AmqpTimestamp(AmqpTimestamp&& other) noexcept = default; + + /** @brief Move Assignment operator */ + AmqpTimestamp& operator=(AmqpTimestamp&& other) noexcept = default; /** * @brief Convert an AmqpSymbol instance to a uAMQP AMQP_VALUE. @@ -861,6 +974,12 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { */ operator _detail::UniqueAmqpValueHandle() const; + /** @brief Convert this AmqpTimestamp to an AmqpValue. + * + * @return An AmqpValue containing this AmqpTimestamp. + */ + AmqpValue AsAmqpValue() const { return AmqpValue{_detail::UniqueAmqpValueHandle{*this}}; } + /** * @brief Convert an AmqpTimestamp instance to a std::chrono::milliseconds. */ @@ -885,17 +1004,37 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { * @note The AMQP Composite type representation does NOT include the underlying field names, * just the field values. * - * */ class AmqpComposite final : public _detail::AmqpCollectionBase, AmqpComposite> { public: /** @brief Construct a new AmqpComposite object. */ AmqpComposite() : AmqpCollectionBase(){}; + virtual ~AmqpComposite() = default; /** @brief Construct a new AmqpComposite object with an initializer list. */ AmqpComposite(AmqpValue const& descriptor, std::initializer_list const& values); + /** @brief Construct a new AmqpComposite object from another */ + AmqpComposite(const AmqpComposite& other) = default; + + /** @brief Copy assignment operator. + * @param other composite value to copy. + * @returns reference to this composite value. + */ + AmqpComposite& operator=(const AmqpComposite& other) = default; + + /** @brief Move constructor. + * @param other composite value to move. + */ + AmqpComposite(AmqpComposite&& other) noexcept = default; + + /** @brief Move assignment operator. + * @param other composite value to move. + * @returns reference to this composite value. + */ + AmqpComposite& operator=(AmqpComposite&& other) noexcept = default; + /** @brief Construct a new AmqpComposite object from an existing uAMQP AMQP_VALUE item * @remarks Note that this does NOT capture the passed in AMQP_VALUE object, the caller is * responsible for freeing that object. @@ -932,6 +1071,15 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { */ AmqpValue const& GetDescriptor() const { return m_descriptor; } + /** + * @brief Convert an existing AmqpComposite to an AmqpValue. + */ + explicit operator AmqpValue() const + { + return static_cast<_detail::UniqueAmqpValueHandle>(*this); + } + + protected: /** * @brief Convert an AmqpComposite instance to a uAMQP AMQP_VALUE. * @@ -942,13 +1090,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { * by the caller. */ operator _detail::UniqueAmqpValueHandle() const; - /** - * @brief Convert an existing AmqpComposite to an AmqpValue. - */ - explicit operator AmqpValue() const - { - return static_cast<_detail::UniqueAmqpValueHandle>(*this).get(); - } private: AmqpValue m_descriptor; @@ -975,6 +1116,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { */ AmqpDescribed(AmqpSymbol const& descriptor, AmqpValue const& value); + ~AmqpDescribed() = default; + /** @brief Construct a new AmqpDescribed object with a 64bit descriptor. * * @param descriptor - the Descriptor for the described value. The descriptor value SHOULD @@ -1041,6 +1184,12 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { */ operator _detail::UniqueAmqpValueHandle() const; + /** @brief Convert this AmqpDescribed to an AmqpValue. + * + * @return An AmqpValue containing this AmqpDescribed. + */ + AmqpValue AsAmqpValue() const; + /** * @brief Compare this AmqpDescribed value with another. * diff --git a/sdk/core/azure-core-amqp/src/amqp/connection.cpp b/sdk/core/azure-core-amqp/src/amqp/connection.cpp index 6415089cf..5d16a8522 100644 --- a/sdk/core/azure-core-amqp/src/amqp/connection.cpp +++ b/sdk/core/azure-core-amqp/src/amqp/connection.cpp @@ -269,9 +269,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { { throw std::runtime_error("Failed to set max frame size."); } - if (connection_set_properties( - m_connection.get(), - static_cast(m_options.Properties).get())) + if (connection_set_properties(m_connection.get(), m_options.Properties.AsAmqpValue())) { throw std::runtime_error("Failed to set connection properties."); } @@ -500,11 +498,11 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { throw std::logic_error("Connection already closed."); } - std::unique_lock lock(m_amqpMutex); - // Stop polling on this connection, we're shutting it down. EnableAsyncOperation(false); + std::unique_lock lock(m_amqpMutex); + if (m_connectionOpened) { if (connection_close( @@ -556,7 +554,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { { throw std::runtime_error("COuld not get properties."); } - return Models::AmqpValue{value}.AsMap(); + return Models::AmqpValue{Models::_detail::UniqueAmqpValueHandle{value}}.AsMap(); } uint32_t ConnectionImpl::GetRemoteMaxFrameSize() const diff --git a/sdk/core/azure-core-amqp/src/amqp/link.cpp b/sdk/core/azure-core-amqp/src/amqp/link.cpp index eea425061..bbc8e02cf 100644 --- a/sdk/core/azure-core-amqp/src/amqp/link.cpp +++ b/sdk/core/azure-core-amqp/src/amqp/link.cpp @@ -314,7 +314,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { void LinkImpl::SetDesiredCapabilities(Models::AmqpValue desiredCapabilities) { - if (link_set_desired_capabilities(m_link, desiredCapabilities)) { throw std::runtime_error("Could not set desired capabilities."); @@ -328,7 +327,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { { throw std::runtime_error("Could not convert field to header."); } - return Models::AmqpValue{desiredCapabilitiesVal}; + return Models::AmqpValue{Models::_detail::UniqueAmqpValueHandle{desiredCapabilitiesVal}}; } void LinkImpl::SubscribeToDetachEvent(OnLinkDetachEvent onLinkDetach) @@ -355,6 +354,13 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { } } + void LinkImpl::Poll() + { + // Ensure that the connection hierarchy's state is not modified while polling on the link. + auto lock{m_session->GetConnection()->Lock()}; + link_dowork(m_link); + } + void LinkImpl::ResetLinkCredit(std::uint32_t linkCredit, bool drain) { if (link_reset_link_credit(m_link, linkCredit, drain)) diff --git a/sdk/core/azure-core-amqp/src/amqp/management.cpp b/sdk/core/azure-core-amqp/src/amqp/management.cpp index f6ba1affd..5a58fc6fd 100644 --- a/sdk/core/azure-core-amqp/src/amqp/management.cpp +++ b/sdk/core/azure-core-amqp/src/amqp/management.cpp @@ -112,7 +112,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { try { m_messageSender->Open(context); + m_messageSenderOpen = true; m_messageReceiver->Open(context); + m_messageReceiverOpen = true; } catch (std::runtime_error const& e) { diff --git a/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp b/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp index d6038d2ac..27e4b06c6 100644 --- a/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp +++ b/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp @@ -211,7 +211,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { { m_link->SetMaxLinkCredit(m_options.MaxLinkCredit); } - m_link->SetAttachProperties(static_cast(m_options.Properties)); + m_link->SetAttachProperties(m_options.Properties.AsAmqpValue()); } AMQP_VALUE MessageReceiverImpl::OnMessageReceivedFn(const void* context, MESSAGE_HANDLE message) @@ -234,7 +234,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { { rv = receiver->OnMessageReceived(incomingMessage); } - return amqpvalue_clone(rv); + return rv.Release(); } return Models::_internal::Messaging::DeliveryRejected( @@ -474,6 +474,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { } // Mark the connection as async so that we can use the async APIs. m_session->GetConnection()->EnableAsyncOperation(true); + + // And add the link to the list of pollable items. + Common::_detail::GlobalStateHolder::GlobalStateInstance()->AddPollable(m_link); } void MessageReceiverImpl::Close() @@ -485,7 +488,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { Log::Stream(Logger::Level::Verbose) << "Lock for Closing message receiver."; } - auto lock{m_session->GetConnection()->Lock()}; AZURE_ASSERT(m_link); if (m_options.EnableTrace) @@ -497,8 +499,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { { Log::Stream(Logger::Level::Verbose) << "Closing message receiver. Stop async"; } + + Common::_detail::GlobalStateHolder::GlobalStateInstance()->RemovePollable( + m_link); // This will ensure that the link is cleaned up on the next poll() + m_session->GetConnection()->EnableAsyncOperation(false); + auto lock{m_session->GetConnection()->Lock()}; + // Clear messages from the queue. m_messageQueue.Clear(); if (messagereceiver_close(m_messageReceiver.get())) diff --git a/sdk/core/azure-core-amqp/src/amqp/message_sender.cpp b/sdk/core/azure-core-amqp/src/amqp/message_sender.cpp index f303ee30f..1afffd7dc 100644 --- a/sdk/core/azure-core-amqp/src/amqp/message_sender.cpp +++ b/sdk/core/azure-core-amqp/src/amqp/message_sender.cpp @@ -314,6 +314,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { Log::Stream(Logger::Level::Verbose) << "Opening message sender. Enable async operation."; } m_session->GetConnection()->EnableAsyncOperation(true); + + // Enable async on the link as well. + Common::_detail::GlobalStateHolder::GlobalStateInstance()->AddPollable(m_link); + m_senderOpen = true; } @@ -325,7 +329,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { { Log::Stream(Logger::Level::Verbose) << "Lock for Closing message sender."; } - auto lock{m_session->GetConnection()->Lock()}; if (m_options.EnableTrace) { @@ -333,6 +336,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { Log::Stream(Logger::Level::Verbose) << "Unsubscribe from link detach event."; } m_link->UnsubscribeFromDetachEvent(); + + Common::_detail::GlobalStateHolder::GlobalStateInstance()->RemovePollable( + m_link); // This will ensure that the link is cleaned up on the next poll() + #if SENDER_SYNCHRONOUS_CLOSE bool shouldWaitForClose = m_currentState == _internal::MessageSenderState::Closing || m_currentState == _internal::MessageSenderState::Open; @@ -340,6 +347,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { m_session->GetConnection()->EnableAsyncOperation(false); + auto lock{m_session->GetConnection()->Lock()}; + if (messagesender_close(m_messageSender.get())) { throw std::runtime_error("Could not close message sender"); @@ -401,7 +410,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { result = _internal::MessageSendStatus::Timeout; break; } - onComplete(result, disposition); + // Reference disposition so that we don't over-release when the AmqpValue passed to OnComplete + // is destroyed. + onComplete(result, Models::_detail::UniqueAmqpValueHandle{amqpvalue_clone(disposition)}); } }; diff --git a/sdk/core/azure-core-amqp/src/amqp/private/link_impl.hpp b/sdk/core/azure-core-amqp/src/amqp/private/link_impl.hpp index b2466b7c2..3059b6827 100644 --- a/sdk/core/azure-core-amqp/src/amqp/private/link_impl.hpp +++ b/sdk/core/azure-core-amqp/src/amqp/private/link_impl.hpp @@ -3,6 +3,7 @@ #pragma once +#include "azure/core/amqp/internal/common/global_state.hpp" #include "azure/core/amqp/internal/models/amqp_error.hpp" #include "azure/core/amqp/internal/models/message_source.hpp" #include "azure/core/amqp/internal/models/message_target.hpp" @@ -17,7 +18,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { - class LinkImpl final : public std::enable_shared_from_this { + class LinkImpl final : public std::enable_shared_from_this, + public Common::_detail::Pollable { using OnLinkDetachEvent = std::function; @@ -96,5 +98,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { ON_LINK_DETACH_EVENT_SUBSCRIPTION_HANDLE m_linkSubscriptionHandle{}; static void OnLinkDetachEventFn(void* context, ERROR_HANDLE error); + + // Inherited via Pollable + void Poll() override; }; }}}} // namespace Azure::Core::Amqp::_detail diff --git a/sdk/core/azure-core-amqp/src/amqp/session.cpp b/sdk/core/azure-core-amqp/src/amqp/session.cpp index b4ac22e9a..c7610df7d 100644 --- a/sdk/core/azure-core-amqp/src/amqp/session.cpp +++ b/sdk/core/azure-core-amqp/src/amqp/session.cpp @@ -235,15 +235,17 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { _internal::LinkEndpoint linkEndpoint(LinkEndpointFactory::CreateLinkEndpoint(newLinkEndpoint)); if (session->m_eventHandler) { + // The input source, target, and properties are owned by the caller, so we need to clone + // them before putting them in a UniqueAmqpValueHandle so we can construct an AmqpValue. return session->m_eventHandler->OnLinkAttached( _detail::SessionFactory::CreateFromInternal(session->shared_from_this()), linkEndpoint, name, role == role_receiver ? Azure::Core::Amqp::_internal::SessionRole::Receiver : Azure::Core::Amqp::_internal::SessionRole::Sender, - source, - target, - properties); + Models::_detail::UniqueAmqpValueHandle{amqpvalue_clone(source)}, + Models::_detail::UniqueAmqpValueHandle{amqpvalue_clone(target)}, + Models::_detail::UniqueAmqpValueHandle{amqpvalue_clone(properties)}); } else { diff --git a/sdk/core/azure-core-amqp/src/common/global_state.cpp b/sdk/core/azure-core-amqp/src/common/global_state.cpp index 780d2ad98..4b576a708 100644 --- a/sdk/core/azure-core-amqp/src/common/global_state.cpp +++ b/sdk/core/azure-core-amqp/src/common/global_state.cpp @@ -8,6 +8,7 @@ #include #include +#include #include #include @@ -23,6 +24,7 @@ using namespace Azure::Core::Diagnostics::_internal; using namespace Azure::Core::Diagnostics; +// cspell: words gballoc namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace _detail { // Logging callback for uAMQP and azure-c-shared-utility. @@ -87,6 +89,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace GlobalStateHolder::GlobalStateHolder() { +#if defined(GB_DEBUG_ALLOC) + gballoc_init(); +#endif if (platform_init()) { throw std::runtime_error("Could not initialize platform."); @@ -98,21 +103,25 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace m_pollingThread = std::thread([this]() { do { - std::list> capturedList; { - std::unique_lock lock{m_pollablesMutex}; - // If there are no pollables, there's no point in doing any work. - if (m_pollables.empty()) + std::list> capturedList; { - continue; + std::unique_lock lock{m_pollablesMutex}; + // If there are no pollables, there's no point in doing any work. + if (m_pollables.empty()) + { + continue; + } + capturedList = m_pollables; + m_activelyPolling = true; } - capturedList = m_pollables; - } - for (auto const& pollable : capturedList) - { - pollable->Poll(); + for (auto const& pollable : capturedList) + { + pollable->Poll(); + } } + m_activelyPolling = false; // std::this_thread::yield(); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } while (!m_stopped); @@ -127,6 +136,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace m_pollingThread.join(); } platform_deinit(); +#if defined(GB_DEBUG_ALLOC) + gballoc_deinit(); +#endif } void GlobalStateHolder::AddPollable(std::shared_ptr pollable) @@ -138,6 +150,35 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace } } + void GlobalStateHolder::RemovePollable(std::shared_ptr pollable) + { + // There is a bit of a complicated lock-free dance happening here. + // The m_pollables list is accessed by the polling thread, and the list is modified by the user + // thread. To ensure integrity of the list, the polling thread takes the lock, copies the + // pollable from the list, releases the lock and then iterates over the pollables at the + // snapshot. + // + // Because the pollable is a shared_ptr, the user thread can remove a pollable while the + // background thread is polling. + // + // But we want to make sure that the thread has finished polling (and thus has removed the copy + // of the pollables list). For that, we have the m_activelyPolling variable. It is set under the + // pollables lock, and cleared after the polling thread has finished polling (outside the lock). + // + // This means that we can spin on the m_activelyPolling variable *under* the pollables lock safe + // in the knowledge that IF the variable is set to true, it means that we acquired the + // pollablesMutex during the interval when the captured list is being interated over. And that + // the m_activelyPolling variable will only be cleared AFTER the captured list is freed. + // + + std::lock_guard lock(m_pollablesMutex); + m_pollables.remove(pollable); + // Spin until m_activelyPolling is false, this ensures that the polling thread is not using the + // capturedList copy of m_pollables. + while (m_activelyPolling.load()) + ; + } + GlobalStateHolder* GlobalStateHolder::GlobalStateInstance() { static GlobalStateHolder globalState; diff --git a/sdk/core/azure-core-amqp/src/models/amqp_error.cpp b/sdk/core/azure-core-amqp/src/models/amqp_error.cpp index 6f5f6b8fe..73408c2fe 100644 --- a/sdk/core/azure-core-amqp/src/models/amqp_error.cpp +++ b/sdk/core/azure-core-amqp/src/models/amqp_error.cpp @@ -42,7 +42,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace AMQP_VALUE info; if (!error_get_info(handle, &info) && info) { - rv.Info = AmqpValue{info}.AsMap(); + // error_get_info returns the AMQP value in place, so we clone it before passing it to the + // UniqueAmqpValueHandle. + rv.Info = AmqpValue{Models::_detail::UniqueAmqpValueHandle{amqpvalue_clone(info)}}.AsMap(); } return rv; @@ -57,14 +59,15 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace } if (!error.Info.empty()) { - error_set_info(errorHandle.get(), AmqpValue{error.Info}); + AmqpValue infoValue(error.Info.AsAmqpValue()); + error_set_info(errorHandle.get(), infoValue); } // amqpvalue_create_error clones the error handle, so we remember it separately. _detail::UniqueAmqpValueHandle handleAsValue{amqpvalue_create_error(errorHandle.get())}; // The AmqpValue constructor will clone the handle passed into it. // The UniqueAmqpValueHandle will take care of freeing the cloned handle. - return handleAsValue.get(); + return handleAsValue; } std::ostream& operator<<(std::ostream& os, AmqpError const& error) { diff --git a/sdk/core/azure-core-amqp/src/models/amqp_header.cpp b/sdk/core/azure-core-amqp/src/models/amqp_header.cpp index aed2e997a..252194453 100644 --- a/sdk/core/azure-core-amqp/src/models/amqp_header.cpp +++ b/sdk/core/azure-core-amqp/src/models/amqp_header.cpp @@ -29,7 +29,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { : true); } - MessageHeader _internal::MessageHeaderFactory::FromUamqp(UniqueMessageHeaderHandle const& handle) + MessageHeader _internal::MessageHeaderFactory::FromUamqp( + _detail::UniqueMessageHeaderHandle const& handle) { MessageHeader rv; bool boolValue; @@ -63,9 +64,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { return rv; } - UniqueMessageHeaderHandle _internal::MessageHeaderFactory::ToUamqp(MessageHeader const& header) + _detail::UniqueMessageHeaderHandle _internal::MessageHeaderFactory::ToUamqp( + MessageHeader const& header) { - UniqueMessageHeaderHandle rv{header_create()}; + _detail::UniqueMessageHeaderHandle rv{header_create()}; if (header.Durable) { if (header_set_durable(rv.get(), header.Durable)) @@ -130,14 +132,16 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { size_t MessageHeader::GetSerializedSize(MessageHeader const& properties) { auto handle = _internal::MessageHeaderFactory::ToUamqp(properties); - AmqpValue propertiesAsValue{amqpvalue_create_header(handle.get())}; + AmqpValue propertiesAsValue{ + Models::_detail::UniqueAmqpValueHandle{amqpvalue_create_header(handle.get())}}; return AmqpValue::GetSerializedSize(propertiesAsValue); } std::vector MessageHeader::Serialize(MessageHeader const& header) { auto handle = _internal::MessageHeaderFactory::ToUamqp(header); - AmqpValue headerAsValue{amqpvalue_create_header(handle.get())}; + AmqpValue headerAsValue{ + Models::_detail::UniqueAmqpValueHandle{amqpvalue_create_header(handle.get())}}; return Models::AmqpValue::Serialize(headerAsValue); } @@ -149,7 +153,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { { throw std::runtime_error("Could not convert value to AMQP Header."); } - UniqueMessageHeaderHandle uniqueHandle{handle}; + _detail::UniqueMessageHeaderHandle uniqueHandle{handle}; handle = nullptr; return _internal::MessageHeaderFactory::FromUamqp(uniqueHandle); } diff --git a/sdk/core/azure-core-amqp/src/models/amqp_message.cpp b/sdk/core/azure-core-amqp/src/models/amqp_message.cpp index f903efb5b..4e913bcdc 100644 --- a/sdk/core/azure-core-amqp/src/models/amqp_message.cpp +++ b/sdk/core/azure-core-amqp/src/models/amqp_message.cpp @@ -77,7 +77,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { if (!message_get_delivery_annotations(message, &annotationsVal) && annotationsVal != nullptr) { UniqueAmqpValueHandle deliveryAnnotations(annotationsVal); - auto deliveryMap = AmqpValue{deliveryAnnotations.get()}.AsMap(); + auto deliveryMap = AmqpValue{deliveryAnnotations}.AsMap(); rv.DeliveryAnnotations = deliveryMap; } } @@ -89,7 +89,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { UniqueAmqpValueHandle messageAnnotations(annotationVal); if (messageAnnotations) { - auto messageMap = AmqpValue{messageAnnotations.get()}.AsMap(); + auto messageMap = AmqpValue{messageAnnotations}.AsMap(); rv.MessageAnnotations = messageMap; } } @@ -159,7 +159,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { if (!message_get_delivery_tag(message, &deliveryTagVal)) { UniqueAmqpValueHandle deliveryTag(deliveryTagVal); - rv.DeliveryTag = AmqpValue{deliveryTag.get()}; + rv.DeliveryTag = AmqpValue{deliveryTag}; } } { @@ -168,7 +168,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { { UniqueAmqpValueHandle footerAnnotations(footerVal); footerVal = nullptr; - auto footerMap = AmqpValue{footerAnnotations.get()}.AsMap(); + auto footerMap = AmqpValue{footerAnnotations}.AsMap(); rv.Footer = footerMap; } } @@ -220,7 +220,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { AMQP_VALUE bodyValue; if (!message_get_body_amqp_value_in_place(message, &bodyValue)) { - rv.m_amqpValueBody = bodyValue; + rv.m_amqpValueBody = _detail::UniqueAmqpValueHandle{amqpvalue_clone(bodyValue)}; } rv.BodyType = MessageBodyType::Value; } @@ -258,8 +258,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { if (!message.DeliveryAnnotations.empty()) { - if (message_set_delivery_annotations( - rv.get(), static_cast(message.DeliveryAnnotations).get())) + if (message_set_delivery_annotations(rv.get(), message.DeliveryAnnotations.AsAmqpValue())) { throw std::runtime_error("Could not set delivery annotations."); } @@ -267,8 +266,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { if (!message.MessageAnnotations.empty()) { - if (message_set_message_annotations( - rv.get(), static_cast(message.MessageAnnotations).get())) + if (message_set_message_annotations(rv.get(), message.MessageAnnotations.AsAmqpValue())) { throw std::runtime_error("Could not set message annotations."); } @@ -289,8 +287,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { } appProperties.emplace(val); } - if (message_set_application_properties( - rv.get(), static_cast(appProperties).get())) + if (message_set_application_properties(rv.get(), appProperties.AsAmqpValue())) { throw std::runtime_error("Could not set application properties."); } @@ -298,8 +295,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { if (!message.DeliveryTag.IsNull()) { - if (message_set_delivery_tag( - rv.get(), static_cast(message.DeliveryTag).get())) + if (message_set_delivery_tag(rv.get(), message.DeliveryTag)) { throw std::runtime_error("Could not set delivery tag."); } @@ -307,7 +303,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { if (!message.Footer.empty()) { - if (message_set_footer(rv.get(), static_cast(message.Footer).get())) + if (message_set_footer(rv.get(), message.Footer.AsAmqpValue())) { throw std::runtime_error("Could not set message annotations."); } @@ -331,7 +327,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { case MessageBodyType::Sequence: for (auto const& sequenceVal : message.m_amqpSequenceBody) { - if (message_add_body_amqp_sequence(rv.get(), AmqpValue(sequenceVal))) + if (message_add_body_amqp_sequence(rv.get(), sequenceVal.AsAmqpValue())) { throw std::runtime_error("Could not set message body AMQP sequence value."); } @@ -424,16 +420,16 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { } if (!message.DeliveryAnnotations.empty()) { - AmqpValue deliveryAnnotations{ - amqpvalue_create_delivery_annotations(AmqpValue{message.DeliveryAnnotations})}; + AmqpValue deliveryAnnotations{Models::_detail::UniqueAmqpValueHandle{ + amqpvalue_create_delivery_annotations(message.DeliveryAnnotations.AsAmqpValue())}}; auto serializedDeliveryAnnotations = AmqpValue::Serialize(deliveryAnnotations); rv.insert( rv.end(), serializedDeliveryAnnotations.begin(), serializedDeliveryAnnotations.end()); } if (!message.MessageAnnotations.empty()) { - AmqpValue messageAnnotations{ - amqpvalue_create_message_annotations(AmqpValue{message.MessageAnnotations})}; + AmqpValue messageAnnotations{Models::_detail::UniqueAmqpValueHandle{ + amqpvalue_create_message_annotations(message.MessageAnnotations.AsAmqpValue())}}; auto serializedAnnotations = AmqpValue::Serialize(messageAnnotations); rv.insert(rv.end(), serializedAnnotations.begin(), serializedAnnotations.end()); } @@ -459,7 +455,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { } appProperties.emplace(val); } - AmqpValue propertiesValue{amqpvalue_create_application_properties(AmqpValue{appProperties})}; + AmqpValue propertiesValue{Models::_detail::UniqueAmqpValueHandle{ + amqpvalue_create_application_properties(appProperties.AsAmqpValue())}}; auto serializedApplicationProperties = AmqpValue::Serialize(propertiesValue); rv.insert( rv.end(), serializedApplicationProperties.begin(), serializedApplicationProperties.end()); @@ -476,7 +473,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { // described body. AmqpDescribed describedBody( static_cast(AmqpDescriptors::DataAmqpValue), message.m_amqpValueBody); - auto serializedBodyValue = AmqpValue::Serialize(static_cast(describedBody)); + auto serializedBodyValue = AmqpValue::Serialize(describedBody.AsAmqpValue()); rv.insert(rv.end(), serializedBodyValue.begin(), serializedBodyValue.end()); } break; @@ -484,8 +481,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { for (auto const& val : message.m_binaryDataBody) { AmqpDescribed describedBody( - static_cast(AmqpDescriptors::DataBinary), static_cast(val)); - auto serializedBodyValue = AmqpValue::Serialize(static_cast(describedBody)); + static_cast(AmqpDescriptors::DataBinary), val.AsAmqpValue()); + auto serializedBodyValue = AmqpValue::Serialize(describedBody.AsAmqpValue()); rv.insert(rv.end(), serializedBodyValue.begin(), serializedBodyValue.end()); } break; @@ -493,16 +490,16 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { for (auto const& val : message.m_amqpSequenceBody) { AmqpDescribed describedBody( - static_cast(AmqpDescriptors::DataAmqpSequence), - static_cast(val)); - auto serializedBodyValue = AmqpValue::Serialize(static_cast(describedBody)); + static_cast(AmqpDescriptors::DataAmqpSequence), val.AsAmqpValue()); + auto serializedBodyValue = AmqpValue::Serialize(describedBody.AsAmqpValue()); rv.insert(rv.end(), serializedBodyValue.begin(), serializedBodyValue.end()); } } } if (!message.Footer.empty()) { - AmqpValue footer{amqpvalue_create_footer(AmqpValue{message.Footer})}; + AmqpValue footer{Models::_detail::UniqueAmqpValueHandle{ + amqpvalue_create_footer(message.Footer.AsAmqpValue())}}; auto serializedFooter = AmqpValue::Serialize(footer); rv.insert(rv.end(), serializedFooter.begin(), serializedFooter.end()); } @@ -547,7 +544,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { { auto deserializer = static_cast(context); - deserializer->OnAmqpMessageFieldDecoded(value); + deserializer->OnAmqpMessageFieldDecoded( + _detail::UniqueAmqpValueHandle{amqpvalue_clone(value)}); } // Invoked when a message field diff --git a/sdk/core/azure-core-amqp/src/models/amqp_properties.cpp b/sdk/core/azure-core-amqp/src/models/amqp_properties.cpp index 63dadf233..8171dabf6 100644 --- a/sdk/core/azure-core-amqp/src/models/amqp_properties.cpp +++ b/sdk/core/azure-core-amqp/src/models/amqp_properties.cpp @@ -32,12 +32,12 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { // properties_get_message_id returns the value in-place. if (!properties_get_message_id(properties.get(), &value)) { - rv.MessageId = value; + rv.MessageId = _detail::UniqueAmqpValueHandle{amqpvalue_clone(value)}; } if (!properties_get_correlation_id(properties.get(), &value)) { - rv.CorrelationId = value; + rv.CorrelationId = _detail::UniqueAmqpValueHandle{amqpvalue_clone(value)}; } { @@ -53,7 +53,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { if (!properties_get_to(properties.get(), &value)) { - rv.To = value; + rv.To = _detail::UniqueAmqpValueHandle{amqpvalue_clone(value)}; } const char* stringValue; @@ -66,7 +66,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { if (!properties_get_reply_to(properties.get(), &value)) { - rv.ReplyTo = value; + rv.ReplyTo = _detail::UniqueAmqpValueHandle{amqpvalue_clone(value)}; } if (!properties_get_content_type(properties.get(), &stringValue)) @@ -114,7 +114,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { UniquePropertiesHandle _internal::MessagePropertiesFactory::ToUamqp( MessageProperties const& properties) { - Azure::Core::_internal::UniqueHandle returnValue(properties_create()); + UniquePropertiesHandle returnValue(properties_create()); if (properties.MessageId.HasValue()) { if (properties_set_message_id(returnValue.get(), properties.MessageId.Value())) @@ -276,7 +276,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { std::vector MessageProperties::Serialize(MessageProperties const& properties) { auto handle = _internal::MessagePropertiesFactory::ToUamqp(properties); - AmqpValue propertiesAsValue{amqpvalue_create_properties(handle.get())}; + Models::_detail::UniqueAmqpValueHandle propertiesAsuAMQPValue{ + amqpvalue_create_properties(handle.get())}; + AmqpValue propertiesAsValue{propertiesAsuAMQPValue}; return Models::AmqpValue::Serialize(propertiesAsValue); } diff --git a/sdk/core/azure-core-amqp/src/models/amqp_value.cpp b/sdk/core/azure-core-amqp/src/models/amqp_value.cpp index 920258ed2..0e94763de 100644 --- a/sdk/core/azure-core-amqp/src/models/amqp_value.cpp +++ b/sdk/core/azure-core-amqp/src/models/amqp_value.cpp @@ -44,6 +44,107 @@ namespace Azure { namespace Core { namespace _internal { }}} // namespace Azure::Core::_internal namespace Azure { namespace Core { namespace Amqp { namespace Models { + namespace _detail { + + std::ostream& operator<<(std::ostream& os, AMQP_TYPE const& value) + { + switch (value) + { + case AMQP_TYPE_INVALID: + os << "INVALID"; + break; + case AMQP_TYPE_NULL: + os << "NULL"; + break; + case AMQP_TYPE_BOOL: + os << "BOOL"; + break; + case AMQP_TYPE_UBYTE: + os << "UBYTE"; + break; + case AMQP_TYPE_USHORT: + os << "USHORT"; + break; + case AMQP_TYPE_UINT: + os << "UINT"; + break; + case AMQP_TYPE_ULONG: + os << "ULONG"; + break; + case AMQP_TYPE_BYTE: + os << "BYTE"; + break; + case AMQP_TYPE_SHORT: + os << "SHORT"; + break; + case AMQP_TYPE_INT: + os << "INT"; + break; + case AMQP_TYPE_LONG: + os << "LONG"; + break; + case AMQP_TYPE_FLOAT: + os << "FLOAT"; + break; + case AMQP_TYPE_DOUBLE: + os << "DOUBLE"; + break; + case AMQP_TYPE_CHAR: + os << "CHAR"; + break; + case AMQP_TYPE_TIMESTAMP: + os << "TIMESTAMP"; + break; + case AMQP_TYPE_UUID: + os << "UUID"; + break; + case AMQP_TYPE_BINARY: + os << "BINARY"; + break; + case AMQP_TYPE_STRING: + os << "STRING"; + break; + case AMQP_TYPE_SYMBOL: + os << "SYMBOL"; + break; + case AMQP_TYPE_LIST: + os << "LIST"; + break; + case AMQP_TYPE_MAP: + os << "MAP"; + break; + case AMQP_TYPE_ARRAY: + os << "ARRAY"; + break; + case AMQP_TYPE_DESCRIBED: + os << "DESCRIBED"; + break; + case AMQP_TYPE_COMPOSITE: + os << "COMPOSITE"; + break; + case AMQP_TYPE_UNKNOWN: + os << "UNKNOWN"; + break; + } + return os; + } + std::ostream& operator<<(std::ostream& os, AMQP_VALUE_DATA_TAG* const value) + { + if (value != nullptr) + { + os << "AMQP_VALUE: " << static_cast(value) << " " << amqpvalue_get_type(value) + << ": "; + char* valueAsString{amqpvalue_to_string(value)}; + os << valueAsString; + free(valueAsString); + } + else + { + os << "AMQP_VALUE: nullptr"; + } + return os; + } + } // namespace _detail AmqpValue::~AmqpValue() {} @@ -76,19 +177,21 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { : m_value{amqpvalue_clone(that.m_value.get())} { } + AmqpValue::AmqpValue(AmqpValue&& that) noexcept : m_value{that.m_value.release()} { that.m_value = nullptr; } + ///@cond - AmqpValue::AmqpValue(AMQP_VALUE_DATA_TAG* value) + AmqpValue::AmqpValue(UniqueAmqpValueHandle const& value) { // We shouldn't take ownership of the incoming value, so instead we clone it. // if no value is provided, treat it as null. if (value) { - m_value.reset(amqpvalue_clone(value)); + m_value.reset(amqpvalue_clone(value.get())); } else { @@ -97,6 +200,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { } /** @brief Internal accessor to convert an AmqpValue to a uAMQP AMQP_VALUE. */ AmqpValue::operator AMQP_VALUE_DATA_TAG*() const { return m_value.get(); } + + AMQP_VALUE_DATA_TAG* AmqpValue::Release() { return m_value.release(); } ///@endcond AmqpValue& AmqpValue::operator=(AmqpValue const& that) @@ -106,7 +211,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { } AmqpValue& AmqpValue::operator=(AmqpValue&& that) noexcept { - m_value.reset(that.m_value.release()); + m_value = std::move(that.m_value); return *this; } @@ -257,7 +362,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { { throw std::runtime_error("Could not retrieve uuid value"); } - std::array uuidArray; + std::array uuidArray{}; memcpy(uuidArray.data(), value, 16); return Azure::Core::Uuid::CreateFromArray(uuidArray); } @@ -423,7 +528,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { static void OnAmqpValueDecoded(void* context, AMQP_VALUE value) { auto deserializer = static_cast(context); - deserializer->m_decodedValue = value; + deserializer->m_decodedValue = _detail::UniqueAmqpValueHandle{amqpvalue_clone(value)}; } }; class AmqpValueSerializer final { @@ -468,7 +573,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { std::ostream& operator<<(std::ostream& os, AmqpValue const& value) { - char* valueAsString = amqpvalue_to_string(value); + char* valueAsString{amqpvalue_to_string(value)}; os << valueAsString; free(valueAsString); return os; @@ -507,7 +612,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { m_value.reserve(arraySize); for (std::uint32_t i = 0; i < arraySize; i += 1) { - m_value.push_back(amqpvalue_get_array_item(value, i)); + // amqpvalue_get_array_item clones the value. We don't need to clone it again. + UniqueAmqpValueHandle item{amqpvalue_get_array_item(value, i)}; + m_value.push_back(item); } } AmqpArray::AmqpArray(initializer_type const& initializer) : AmqpCollectionBase(initializer) @@ -561,7 +668,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { key.reset(kv); val.reset(vv); } - m_value.emplace(std::make_pair(AmqpValue(key.get()), AmqpValue(val.get()))); + m_value.emplace(std::make_pair(AmqpValue(key), AmqpValue(val))); } } @@ -593,7 +700,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { } for (std::uint32_t i = 0; i < listSize; i += 1) { - push_back(amqpvalue_get_list_item(value, i)); + UniqueAmqpValueHandle item{amqpvalue_get_list_item(value, i)}; + push_back(item); } } @@ -671,11 +779,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { return symbol; } - AmqpTimestamp::operator AmqpValue() const - { - return static_cast(*this).get(); - } - namespace { std::chrono::milliseconds GetMillisecondsFromAmqp(AMQP_VALUE value) { @@ -716,10 +819,12 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { } for (std::uint32_t i = 0; i < compositeSize; i += 1) { - push_back(amqpvalue_get_composite_item_in_place(value, i)); + push_back(_detail::UniqueAmqpValueHandle{ + amqpvalue_clone(amqpvalue_get_composite_item_in_place(value, i))}); } - m_descriptor = amqpvalue_get_inplace_descriptor(value); + m_descriptor + = _detail::UniqueAmqpValueHandle{amqpvalue_clone(amqpvalue_get_inplace_descriptor(value))}; if (m_descriptor.IsNull()) { throw std::runtime_error("Could not read descriptor for composite value."); @@ -733,6 +838,13 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { { } + template <> + _detail::AmqpCollectionBase, AmqpComposite>:: + operator UniqueAmqpValueHandle() const + { + return nullptr; + } + AmqpComposite::operator _detail::UniqueAmqpValueHandle() const { UniqueAmqpValueHandle composite{ @@ -756,13 +868,15 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { throw std::runtime_error("Input AMQP value MUST be a described value."); } - m_descriptor = amqpvalue_get_inplace_descriptor(value); + m_descriptor + = _detail::UniqueAmqpValueHandle{amqpvalue_clone(amqpvalue_get_inplace_descriptor(value))}; if (m_descriptor.IsNull()) { throw std::runtime_error("Could not read descriptor for described value."); } - m_value = amqpvalue_get_inplace_described_value(value); + m_value = _detail::UniqueAmqpValueHandle{ + amqpvalue_clone(amqpvalue_get_inplace_described_value(value))}; if (m_value.IsNull()) { throw std::runtime_error("Could not read descriptor for described value."); @@ -770,7 +884,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { } AmqpDescribed::AmqpDescribed(AmqpSymbol const& descriptor, AmqpValue const& value) - : m_descriptor(static_cast(descriptor).get()), m_value(value) + : m_descriptor{descriptor.AsAmqpValue()}, m_value{value} { } AmqpDescribed::AmqpDescribed(uint64_t descriptor, AmqpValue const& value) @@ -788,9 +902,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { return composite; } + AmqpValue AmqpDescribed::AsAmqpValue() const + { + return AmqpValue{static_cast<_detail::UniqueAmqpValueHandle>(*this)}; + } + AmqpDescribed::operator AmqpValue const() const { - return static_cast(*this).get(); + return static_cast(*this); } namespace { @@ -864,7 +983,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { std::ostream& operator<<(std::ostream& os, AmqpArray const& value) { // Let the AmqpValue specialization handle serialization of the array. - AmqpValue arrayValue(value); + AmqpValue arrayValue(value.AsAmqpValue()); os << arrayValue; return os; } @@ -872,7 +991,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { std::ostream& operator<<(std::ostream& os, AmqpList const& value) { // Let the AmqpValue specialization handle serialization of the list. - AmqpValue arrayValue(value); + AmqpValue arrayValue(value.AsAmqpValue()); os << arrayValue; return os; } @@ -880,14 +999,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { std::ostream& operator<<(std::ostream& os, AmqpMap const& value) { // Let the AmqpValue specialization handle serialization of the map. - AmqpValue mapValue(value); + AmqpValue mapValue(value.AsAmqpValue()); os << mapValue; return os; } std::ostream& operator<<(std::ostream& os, AmqpSymbol const& value) { // Let the AmqpValue specialization handle serialization of the array. - AmqpValue arrayValue(static_cast(value)); + AmqpValue arrayValue(value.AsAmqpValue()); os << arrayValue; return os; } diff --git a/sdk/core/azure-core-amqp/src/models/message_source.cpp b/sdk/core/azure-core-amqp/src/models/message_source.cpp index b56de8c95..0b938b85d 100644 --- a/sdk/core/azure-core-amqp/src/models/message_source.cpp +++ b/sdk/core/azure-core-amqp/src/models/message_source.cpp @@ -161,9 +161,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace } if (!options.DynamicNodeProperties.empty()) { - if (source_set_dynamic_node_properties( - m_source.get(), - static_cast<_detail::UniqueAmqpValueHandle>(options.DynamicNodeProperties).get())) + AmqpValue dynamicNodeProperties(options.DynamicNodeProperties.AsAmqpValue()); + if (source_set_dynamic_node_properties(m_source.get(), dynamicNodeProperties)) { throw std::runtime_error("Could not set dynamic node properties."); } @@ -177,8 +176,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace } if (!options.Filter.empty()) { - if (source_set_filter( - m_source.get(), static_cast<_detail::UniqueAmqpValueHandle>(options.Filter).get())) + if (source_set_filter(m_source.get(), options.Filter.AsAmqpValue())) { throw std::runtime_error("Could not set filter set."); } @@ -192,17 +190,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace } if (!options.Outcomes.empty()) { - if (source_set_outcomes( - m_source.get(), static_cast<_detail::UniqueAmqpValueHandle>(options.Outcomes).get())) + if (source_set_outcomes(m_source.get(), options.Outcomes.AsAmqpValue())) { throw std::runtime_error("Could not set outcomes."); } } if (!options.Capabilities.empty()) { - if (source_set_capabilities( - m_source.get(), - static_cast<_detail::UniqueAmqpValueHandle>(options.Capabilities).get())) + if (source_set_capabilities(m_source.get(), options.Capabilities.AsAmqpValue())) { throw std::runtime_error("Could not set capabilities."); } @@ -212,7 +207,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace // Convert the MessageSource into a Value. Models::AmqpValue MessageSource::AsAmqpValue() const { - return amqpvalue_create_source(m_source.get()); + Models::_detail::UniqueAmqpValueHandle sourceValue{amqpvalue_create_source(m_source.get())}; + return sourceValue; } Models::AmqpValue MessageSource::GetAddress() const @@ -222,7 +218,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace { throw std::runtime_error("Could not retrieve address from source."); } - return address; + // source_get_address does not reference its value, so we need to reference it before creating + // an AmqpValueHandle. + return Models::_detail::UniqueAmqpValueHandle{amqpvalue_clone(address)}; } TerminusDurability MessageSource::GetTerminusDurability() const @@ -326,7 +324,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace { throw std::runtime_error("Could not get default outcome."); } - return value; + // source_get_default_outcome does not reference the value returned, we reference it so it can + // be put into a UniqueAmqpValueHandle. + return Models::_detail::UniqueAmqpValueHandle{amqpvalue_clone(value)}; } Models::AmqpArray MessageSource::GetOutcomes() const @@ -432,14 +432,22 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace AMQP_VALUE outcome; if (!source_get_default_outcome(source, &outcome)) { - os << ", Default Outcome: " << AmqpValue{outcome}; + // source_get_default_outcome does not reference the value returned, we reference it so it + // can be put into a UniqueAmqpValueHandle. + os << ", Default Outcome: " + << AmqpValue{Models::_detail::UniqueAmqpValueHandle{amqpvalue_clone(outcome)}}; } } { AMQP_VALUE outcomes; if (!source_get_outcomes(source, &outcomes)) { - os << ", Outcomes: " << AmqpValue{outcomes}; + // Most of the time, source_get_outcomes does not reference its input. However, if the + // composite value at location 9 is a symbol, it does reference it. As a consequence, this + // will leak an AMQPSymbol if the value at location 9 is a symbol (as opposed to being an + // array). + os << ", Outcomes: " + << AmqpValue{Models::_detail::UniqueAmqpValueHandle{amqpvalue_clone(outcomes)}}; } } os << "}"; diff --git a/sdk/core/azure-core-amqp/src/models/message_target.cpp b/sdk/core/azure-core-amqp/src/models/message_target.cpp index 8314836e0..7ae19070b 100644 --- a/sdk/core/azure-core-amqp/src/models/message_target.cpp +++ b/sdk/core/azure-core-amqp/src/models/message_target.cpp @@ -3,6 +3,8 @@ #include "azure/core/amqp/internal/models/message_target.hpp" +#include + #include #include #include @@ -15,6 +17,9 @@ #include +using namespace Azure::Core::Diagnostics::_internal; +using namespace Azure::Core::Diagnostics; + namespace Azure { namespace Core { namespace _internal { void UniqueHandleHelper::FreeMessageTarget(TARGET_HANDLE value) { @@ -23,19 +28,18 @@ namespace Azure { namespace Core { namespace _internal { }}} // namespace Azure::Core::_internal namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace _internal { + extern const char* StringFromTerminusDurability(TerminusDurability); - // MessageTarget::MessageTarget(TARGET_HANDLE handle) : m_target{handle} {} - - MessageTarget::MessageTarget(Models::AmqpValue const& source) + MessageTarget::MessageTarget(Models::AmqpValue const& target) { - if (source.IsNull()) + if (target.IsNull()) { - throw std::invalid_argument("source cannot be null"); + throw std::invalid_argument("target cannot be null"); } TARGET_HANDLE targetHandle; - if (amqpvalue_get_target(source, &targetHandle)) + if (amqpvalue_get_target(target, &targetHandle)) { - throw std::runtime_error("Could not retrieve source from value."); + throw std::runtime_error("Could not retrieve target from value."); } m_target.reset(targetHandle); } @@ -69,7 +73,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace MessageTarget& MessageTarget::operator=(MessageTarget const& that) { - m_target.reset(target_clone(that.m_target.get())); + m_target.reset(target_clone(that)); return *this; } @@ -151,8 +155,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace if (!options.DynamicNodeProperties.empty()) { if (target_set_dynamic_node_properties( - m_target.get(), - static_cast<_detail::UniqueAmqpValueHandle>(options.DynamicNodeProperties).get())) + m_target.get(), options.DynamicNodeProperties.AsAmqpValue())) { throw std::runtime_error("Could not set dynamic node properties."); } @@ -160,9 +163,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace if (!options.Capabilities.empty()) { - if (target_set_capabilities( - m_target.get(), - static_cast<_detail::UniqueAmqpValueHandle>(options.Capabilities).get())) + if (target_set_capabilities(m_target.get(), options.Capabilities.AsAmqpValue())) { throw std::runtime_error("Could not set capabilities."); } @@ -172,7 +173,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace // Convert the MessageSource into a Value. Models::AmqpValue MessageTarget::AsAmqpValue() const { - return amqpvalue_create_target(m_target.get()); + Models::_detail::UniqueAmqpValueHandle targetValue{amqpvalue_create_target(m_target.get())}; + return targetValue; } Models::AmqpValue MessageTarget::GetAddress() const @@ -182,7 +184,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace { throw std::runtime_error("Could not retrieve address from source."); } - return address; + // target_get_address does not reference the underlying address so we need to addref it here so + // it gets freed properly. + return Models::_detail::UniqueAmqpValueHandle{amqpvalue_clone(address)}; } TerminusDurability MessageTarget::GetTerminusDurability() const @@ -254,11 +258,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace Models::AmqpMap MessageTarget::GetDynamicNodeProperties() const { AMQP_VALUE value; + // Note: target_get_dynamic_node_properties does NOT reference the value. if (target_get_dynamic_node_properties(m_target.get(), &value)) { throw std::runtime_error("Could not get dynamic."); } - return AmqpValue{value}.AsMap(); + // We clone the value before converting it to a UniqueAmqpValueHandle because the destructor for + // UniqueAmqpValueHandle will remove the reference. + return AmqpValue{Models::_detail::UniqueAmqpValueHandle{amqpvalue_clone(value)}}.AsMap(); } Models::AmqpArray MessageTarget::GetCapabilities() const @@ -271,8 +278,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace return value; } - extern const char* StringFromTerminusDurability(TerminusDurability); - std::ostream& operator<<(std::ostream& os, MessageTarget const& target) { os << "Target{ "; diff --git a/sdk/core/azure-core-amqp/src/models/messaging_values.cpp b/sdk/core/azure-core-amqp/src/models/messaging_values.cpp index d0aef8581..0f11b97f8 100644 --- a/sdk/core/azure-core-amqp/src/models/messaging_values.cpp +++ b/sdk/core/azure-core-amqp/src/models/messaging_values.cpp @@ -10,7 +10,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace _internal { Models::AmqpValue Messaging::DeliveryAccepted() { - auto rv = messaging_delivery_accepted(); + Models::_detail::UniqueAmqpValueHandle rv{messaging_delivery_accepted()}; if (!rv) { throw std::runtime_error("Could not allocate delivery accepted described value."); @@ -19,7 +19,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace } Models::AmqpValue Messaging::DeliveryReleased() { - auto rv = messaging_delivery_released(); + Models::_detail::UniqueAmqpValueHandle rv{messaging_delivery_released()}; if (!rv) { throw std::runtime_error("Could not allocate delivery released described value."); @@ -28,7 +28,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace } Models::AmqpValue Messaging::DeliveryReceived(uint32_t sectionNumber, uint64_t sectionOffset) { - auto rv = messaging_delivery_received(sectionNumber, sectionOffset); + Models::_detail::UniqueAmqpValueHandle rv{ + messaging_delivery_received(sectionNumber, sectionOffset)}; if (!rv) { throw std::runtime_error("Could not allocate delivery received described value."); @@ -40,10 +41,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace std::string const& errorDescription, AmqpValue const& errorInfo) { - auto rv = messaging_delivery_rejected( + Models::_detail::UniqueAmqpValueHandle rv{messaging_delivery_rejected( errorCondition.empty() ? nullptr : errorCondition.c_str(), errorDescription.empty() ? nullptr : errorDescription.c_str(), - errorInfo); + errorInfo)}; if (!rv) { throw std::runtime_error("Could not allocate delivery rejected described value."); @@ -55,7 +56,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace bool undeliverableHere, Models::AmqpValue annotations) { - auto rv = messaging_delivery_modified(deliveryFailed, undeliverableHere, annotations); + Models::_detail::UniqueAmqpValueHandle rv{ + messaging_delivery_modified(deliveryFailed, undeliverableHere, annotations)}; if (!rv) { throw std::runtime_error("Could not allocate delivery modified described value."); diff --git a/sdk/core/azure-core-amqp/test/ut/amqp_message_tests.cpp b/sdk/core/azure-core-amqp/test/ut/amqp_message_tests.cpp index b7398552f..bb04a56a3 100644 --- a/sdk/core/azure-core-amqp/test/ut/amqp_message_tests.cpp +++ b/sdk/core/azure-core-amqp/test/ut/amqp_message_tests.cpp @@ -133,7 +133,7 @@ TEST_F(TestMessage, TestBodyAmqpSequence) { AmqpMessage message; - message.SetBody({"Test", 95, static_cast(AmqpMap{{3, 5}, {4, 9}})}); + message.SetBody({"Test", 95, AmqpMap{{3, 5}, {4, 9}}.AsAmqpValue()}); EXPECT_EQ(1, message.GetBodyAsAmqpList().size()); EXPECT_EQ("Test", static_cast(message.GetBodyAsAmqpList()[0].at(0))); @@ -152,8 +152,7 @@ TEST_F(TestMessage, TestBodyAmqpSequence) } { AmqpMessage message; - message.SetBody( - {{1}, {"Test", 3}, {"Test", 95, static_cast(AmqpMap{{3, 5}, {4, 9}})}}); + message.SetBody({{1}, {"Test", 3}, {"Test", 95, AmqpMap{{3, 5}, {4, 9}}.AsAmqpValue()}}); EXPECT_EQ(3, message.GetBodyAsAmqpList().size()); EXPECT_EQ("Test", static_cast(message.GetBodyAsAmqpList()[1].at(0))); EXPECT_EQ(95, static_cast(message.GetBodyAsAmqpList()[2].at(1))); @@ -231,7 +230,7 @@ TEST_F(MessageSerialization, SerializeMessageBodyBinary) std::vector buffer; AmqpMessage message; message.Properties.MessageId = "12345"; - message.SetBody(static_cast(AmqpMap{{"key1", "value1"}, {"key2", "value2"}})); + message.SetBody(AmqpMap{{"key1", "value1"}, {"key2", "value2"}}.AsAmqpValue()); buffer = AmqpMessage::Serialize(message); AmqpMessage deserialized = AmqpMessage::Deserialize(buffer.data(), buffer.size()); EXPECT_EQ(message, deserialized); diff --git a/sdk/core/azure-core-amqp/test/ut/amqp_value_tests.cpp b/sdk/core/azure-core-amqp/test/ut/amqp_value_tests.cpp index b8af57c32..2641cfc33 100644 --- a/sdk/core/azure-core-amqp/test/ut/amqp_value_tests.cpp +++ b/sdk/core/azure-core-amqp/test/ut/amqp_value_tests.cpp @@ -214,9 +214,9 @@ TEST_F(TestValues, TestBinary) AmqpBinaryData binaryData; binaryData.push_back('a'); binaryData.push_back(3); - AmqpValue value(static_cast<_detail::UniqueAmqpValueHandle>(binaryData).get()); + AmqpValue value{binaryData.AsAmqpValue()}; - EXPECT_FALSE(value < AmqpValue(static_cast<_detail::UniqueAmqpValueHandle>(binaryData).get())); + EXPECT_FALSE(value < binaryData.AsAmqpValue()); AmqpBinaryData data2(value); EXPECT_EQ(2, data2.size()); @@ -250,10 +250,10 @@ TEST_F(TestValues, TestList) EXPECT_EQ(AmqpValueType::Byte, list1[3].GetType()); EXPECT_EQ(AmqpValue('a'), list1[3]); - AmqpValue value(static_cast<_detail::UniqueAmqpValueHandle>(list1).get()); + AmqpValue value{list1.AsAmqpValue()}; const AmqpList list2(value); - EXPECT_FALSE(value < AmqpValue(static_cast<_detail::UniqueAmqpValueHandle>(list1).get())); + EXPECT_FALSE(value < list1.AsAmqpValue()); EXPECT_EQ(4, list2.size()); @@ -268,8 +268,8 @@ TEST_F(TestValues, TestList) AmqpList test; AmqpDescribed desc{ static_cast(29), - static_cast(AmqpList{AmqpValue{"test:error"}, AmqpValue{"test description"}})}; - test.push_back(AmqpValue{desc}); + AmqpList{AmqpValue{"test:error"}, AmqpValue{"test description"}}.AsAmqpValue()}; + test.push_back(desc.AsAmqpValue()); EXPECT_EQ(1, test.size()); EXPECT_EQ(AmqpValueType::Described, test[0].GetType()); @@ -291,7 +291,7 @@ TEST_F(TestValues, TestList) AmqpComposite testAsComposite{test[0].AsComposite()}; EXPECT_EQ(testAsComposite.GetDescriptor(), AmqpValue{static_cast(29ll)}); { - AmqpValue testAsValue{test}; + AmqpValue testAsValue{test.AsAmqpValue()}; EXPECT_EQ(AmqpValueType::List, testAsValue.GetType()); auto testAsList{testAsValue.AsList()}; @@ -329,9 +329,9 @@ TEST_F(TestValues, TestMap) EXPECT_EQ(std::string("ABC"), static_cast(map1[AmqpValue(3)])); // Now round-trip the map through an AMQP value and confirm that the values persist. - AmqpValue valueOfMap = static_cast<_detail::UniqueAmqpValueHandle>(map1).get(); + AmqpValue valueOfMap = map1.AsAmqpValue(); AmqpMap map2(valueOfMap); - EXPECT_FALSE(valueOfMap < AmqpValue(static_cast<_detail::UniqueAmqpValueHandle>(map1).get())); + EXPECT_FALSE(valueOfMap < map1.AsAmqpValue()); EXPECT_EQ(5, static_cast(map2["ABC"])); EXPECT_EQ(std::string("ABC"), static_cast(map2[AmqpValue(3)])); @@ -345,7 +345,7 @@ TEST_F(TestValues, TestArray) EXPECT_EQ(5, array1.size()); - AmqpValue value = static_cast(array1); + AmqpValue value = array1.AsAmqpValue(); EXPECT_EQ(AmqpValueType::Array, value.GetType()); const AmqpArray array2 = value.AsArray(); @@ -354,20 +354,9 @@ TEST_F(TestValues, TestArray) EXPECT_EQ(3, static_cast(array2.at(1))); EXPECT_EQ(5, static_cast(array2.at(2))); EXPECT_FALSE(array1 < array2); - EXPECT_FALSE(value < AmqpValue(static_cast<_detail::UniqueAmqpValueHandle>(array2).get())); -} - -TEST_F(TestValues, TestArray1) -{ - AmqpArray array1{1}; - - EXPECT_EQ(1, array1.size()); - - AmqpValue value = static_cast(array1); - EXPECT_EQ(AmqpValueType::Array, value.GetType()); - - GTEST_LOG_(INFO) << "Copy AMQP value as array to a new value"; - const AmqpArray array2 = value.AsArray(); + { + EXPECT_FALSE(value < array2.AsAmqpValue()); + } } TEST_F(TestValues, TestArrayDifferentTypes) @@ -399,7 +388,7 @@ TEST_F(TestValues, TestTimestamp) std::chrono::system_clock::now().time_since_epoch())}; AmqpTimestamp value{timeNow}; EXPECT_EQ(static_cast(value), timeNow); - AmqpValue av{value}; + AmqpValue av{value.AsAmqpValue()}; AmqpTimestamp ts2{av.AsTimestamp()}; EXPECT_EQ(timeNow, static_cast(ts2)); @@ -429,7 +418,7 @@ TEST_F(TestValues, TestCompositeValue) { { AmqpComposite value("My Composite Type", {1, 2, 5.5, "ABC", 5}); - EXPECT_EQ(AmqpValueType::Composite, AmqpValue(value).GetType()); + EXPECT_EQ(AmqpValueType::Composite, value.AsAmqpValue().GetType()); EXPECT_EQ(5, value.size()); } @@ -455,7 +444,7 @@ TEST_F(TestValues, TestCompositeValue) // Put some things in the map. { AmqpComposite compositeVal(static_cast(116ull), {25, 25.0f}); - AmqpValue value = static_cast(compositeVal); + AmqpValue value = compositeVal.AsAmqpValue(); AmqpComposite testVal(value.AsComposite()); EXPECT_EQ(compositeVal.size(), testVal.size()); @@ -476,7 +465,7 @@ TEST_F(TestValues, TestDescribed) EXPECT_EQ(AmqpSymbol("My Composite Type"), described1.GetDescriptor().AsSymbol()); EXPECT_EQ(5, static_cast(described1.GetValue())); - AmqpValue value = static_cast(described1); + AmqpValue value = described1.AsAmqpValue(); EXPECT_EQ(AmqpValueType::Described, value.GetType()); AmqpDescribed described2 = value.AsDescribed(); @@ -493,7 +482,7 @@ TEST_F(TestValues, TestDescribed) EXPECT_EQ(937, static_cast(value.GetDescriptor())); EXPECT_EQ(5, static_cast(value.GetValue())); - AmqpValue value2 = static_cast(value); + AmqpValue value2 = value.AsAmqpValue(); AmqpDescribed described2 = value2.AsDescribed(); EXPECT_EQ(AmqpValueType::Described, value2.GetType()); @@ -1272,7 +1261,7 @@ TEST_F(TestValueSerialization, SerializeList) // First form, serialized as first form. { AmqpList emptyList; - AmqpValue value{emptyList}; + AmqpValue value{emptyList.AsAmqpValue()}; std::vector val = AmqpValue::Serialize(value); EXPECT_EQ(val.size(), 1); EXPECT_EQ(0x45, val[0]); diff --git a/sdk/core/azure-core-amqp/test/ut/management_tests.cpp b/sdk/core/azure-core-amqp/test/ut/management_tests.cpp index e1696ae20..a8d05efa9 100644 --- a/sdk/core/azure-core-amqp/test/ut/management_tests.cpp +++ b/sdk/core/azure-core-amqp/test/ut/management_tests.cpp @@ -47,6 +47,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { auto openResult = management.Open(); EXPECT_EQ(openResult, ManagementOpenStatus::Error); } + TEST_F(TestManagement, ManagementOpenClose) { MessageTests::AmqpServerMock mockServer; @@ -69,6 +70,56 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { mockServer.StopListening(); } + + TEST_F(TestManagement, ManagementOpenCloseAuthenticated) + { + MessageTests::AmqpServerMock mockServer; + + auto sasCredential = std::make_shared( + "Endpoint=amqp://localhost:" + std::to_string(mockServer.GetPort()) + + "/;SharedAccessKeyName=MyTestKey;SharedAccessKey=abcdabcd;EntityPath=testLocation"); + + ConnectionOptions connectionOptions; + connectionOptions.Port = mockServer.GetPort(); + Connection connection("localhost", sasCredential, connectionOptions); + + Session session{connection.CreateSession({})}; + ManagementClientOptions options; + options.EnableTrace = 1; + ManagementClient management(session.CreateManagementClient("Test", options)); + + mockServer.StartListening(); + + auto openResult = management.Open(); + EXPECT_EQ(openResult, ManagementOpenStatus::Ok); + + management.Close(); + + mockServer.StopListening(); + } + + TEST_F(TestManagement, ManagementOpenCloseError) + { + MessageTests::AmqpServerMock mockServer; + + ConnectionOptions connectionOptions; + connectionOptions.Port = mockServer.GetPort(); + Connection connection("localhost", nullptr, connectionOptions); + + Session session{connection.CreateSession({})}; + ManagementClientOptions options; + options.EnableTrace = 1; + ManagementClient management(session.CreateManagementClient("Test", options)); + + mockServer.StartListening(); + Azure::Core::Context context; + context.Cancel(); + EXPECT_EQ(management.Open(context), ManagementOpenStatus::Cancelled); + + management.Close(); + + mockServer.StopListening(); + } #endif // !defined(AZ_PLATFORM_MAC) #if !defined(AZ_PLATFORM_MAC) @@ -162,7 +213,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { ConnectionOptions connectionOptions; connectionOptions.Port = mockServer.GetPort(); - Connection connection("localhost", nullptr, connectionOptions); + auto sasCredential = std::make_shared( + "Endpoint=amqp://localhost:" + std::to_string(mockServer.GetPort()) + + "/;SharedAccessKeyName=MyTestKey;SharedAccessKey=abcdabcd;EntityPath=testLocation"); + Connection connection("localhost", sasCredential, connectionOptions); Session session{connection.CreateSession({})}; ManagementClientOptions options; options.EnableTrace = 1; diff --git a/sdk/core/azure-core-amqp/test/ut/message_sender_receiver.cpp b/sdk/core/azure-core-amqp/test/ut/message_sender_receiver.cpp index ac7fa7161..a9af5132e 100644 --- a/sdk/core/azure-core-amqp/test/ut/message_sender_receiver.cpp +++ b/sdk/core/azure-core-amqp/test/ut/message_sender_receiver.cpp @@ -23,7 +23,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { extern uint16_t FindAvailableSocket(); - class TestMessages : public testing::Test { + class TestMessageSendReceive : public testing::Test { protected: void SetUp() override {} void TearDown() override {} @@ -33,7 +33,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { using namespace Azure::Core::Amqp; #if !defined(AZ_PLATFORM_MAC) - TEST_F(TestMessages, SimpleReceiver) + TEST_F(TestMessageSendReceive, SimpleReceiver) { // Create a connection @@ -48,8 +48,16 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { MessageReceiver receiver1(session.CreateMessageReceiver("MySource", {}, nullptr)); MessageReceiver receiver2(session.CreateMessageReceiver("MySource", {}, nullptr)); } + + GTEST_LOG_(INFO) << _internal::MessageReceiverState::Invalid + << _internal::MessageReceiverState::Closing + << _internal::MessageReceiverState::Idle + << _internal::MessageReceiverState::Opening + << _internal::MessageReceiverState::Open + << _internal::MessageReceiverState::Error; + EXPECT_ANY_THROW(GTEST_LOG_(INFO) << static_cast<_internal::MessageReceiverState>(5993)); } - TEST_F(TestMessages, ReceiverProperties) + TEST_F(TestMessageSendReceive, ReceiverProperties) { // Create a connection Connection connection("localhost", nullptr, {}); Session session{connection.CreateSession()}; @@ -70,7 +78,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { } } - TEST_F(TestMessages, SimpleSender) + TEST_F(TestMessageSendReceive, SimpleSender) { // Create a connection @@ -85,8 +93,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { MessageSender sender1(session.CreateMessageSender("MySource", {}, nullptr)); MessageSender sender2(session.CreateMessageSender("MySource", {}, nullptr)); } + GTEST_LOG_(INFO) << _internal::MessageSenderState::Invalid + << _internal::MessageSenderState::Closing + << _internal::MessageSenderState::Idle + << _internal::MessageSenderState::Opening + << _internal::MessageSenderState::Open << _internal::MessageSenderState::Error; + EXPECT_ANY_THROW(GTEST_LOG_(INFO) << static_cast<_internal::MessageSenderState>(5993)); } - TEST_F(TestMessages, SenderProperties) + TEST_F(TestMessageSendReceive, SenderProperties) { // Create a connection Connection connection("localhost", nullptr, {}); Session session{connection.CreateSession()}; @@ -98,7 +112,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { } } - TEST_F(TestMessages, ReceiverOpenClose) + TEST_F(TestMessageSendReceive, ReceiverOpenClose) { MessageTests::AmqpServerMock mockServer; @@ -156,7 +170,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { context.Cancel(); } - TEST_F(TestMessages, SenderOpenClose) + TEST_F(TestMessageSendReceive, SenderOpenClose) { uint16_t testPort = FindAvailableSocket(); GTEST_LOG_(INFO) << "Test port: " << testPort; @@ -181,7 +195,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { listener.Stop(); } - TEST_F(TestMessages, TestLocalhostVsTls) + TEST_F(TestMessageSendReceive, TestLocalhostVsTls) { MessageTests::AmqpServerMock mockServer(5671); @@ -222,6 +236,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { session.CreateMessageSender("localhost/ingress", options, &senderEvents)); EXPECT_NO_THROW(sender.Open()); + EXPECT_EQ(65536, sender.GetMaxMessageSize()); + Azure::Core::Amqp::Models::AmqpMessage message; message.SetBody(Azure::Core::Amqp::Models::AmqpBinaryData{'h', 'e', 'l', 'l', 'o'}); @@ -249,7 +265,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { mockServer.StopListening(); } - TEST_F(TestMessages, SenderSendAsync) + TEST_F(TestMessageSendReceive, SenderSendAsync) { MessageTests::AmqpServerMock mockServer{}; @@ -309,7 +325,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { mockServer.StopListening(); } - TEST_F(TestMessages, SenderSendSync) + TEST_F(TestMessageSendReceive, SenderSendSync) { MessageTests::AmqpServerMock mockServer{}; ConnectionOptions connectionOptions; @@ -352,7 +368,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { mockServer.StopListening(); } - TEST_F(TestMessages, AuthenticatedSender) + TEST_F(TestMessageSendReceive, AuthenticatedSender) { MessageTests::AmqpServerMock server; @@ -388,7 +404,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { server.StopListening(); } - TEST_F(TestMessages, AuthenticatedSenderAzureToken) + TEST_F(TestMessageSendReceive, AuthenticatedSenderAzureToken) { class AzureTokenCredential : public Azure::Core::Credentials::TokenCredential { Azure::Core::Credentials::AccessToken GetToken( @@ -441,7 +457,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { server.StopListening(); } - TEST_F(TestMessages, AuthenticatedReceiver) + TEST_F(TestMessageSendReceive, AuthenticatedReceiver) { class ReceiverMock : public MessageTests::AmqpServerMock { public: @@ -465,8 +481,13 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { sendMessage.SetBody(Azure::Core::Amqp::Models::AmqpValue{"This is a message body."}); if (m_linkMessageQueues.at(m_senderNodeName).LinkSender) { - m_linkMessageQueues.at(m_senderNodeName).LinkSender->Send(sendMessage); + GTEST_LOG_(INFO) << "Sent, resetting should send."; m_shouldSendMessage = false; + m_linkMessageQueues.at(m_senderNodeName).LinkSender->Send(sendMessage); + } + else + { + GTEST_LOG_(INFO) << "No sender, skipping"; } } } @@ -494,6 +515,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { receiverOptions.MessageTarget = "egress"; receiverOptions.SettleMode = Azure::Core::Amqp::_internal::ReceiverSettleMode::First; receiverOptions.MaxMessageSize = 65536; + receiverOptions.MaxLinkCredit = 500; // We allow at most 500 messages to be received. receiverOptions.Name = "receiver-link"; receiverOptions.EnableTrace = true; MessageReceiver receiver(session.CreateMessageReceiver( @@ -505,6 +527,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { { server.ShouldSendMessage(true); auto message = receiver.WaitForIncomingMessage(); + GTEST_LOG_(INFO) << "Received message."; ASSERT_TRUE(message.first.HasValue()); ASSERT_FALSE(message.second); EXPECT_EQ( @@ -519,11 +542,25 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { auto message = receiver.WaitForIncomingMessage(receiveContext), Azure::Core::OperationCancelledException); } + + { + auto result = receiver.TryWaitForIncomingMessage(); + EXPECT_FALSE(result.first.HasValue()); + } + + { + GTEST_LOG_(INFO) << "Trigger message send for polling."; + server.ShouldSendMessage(true); + std::this_thread::sleep_for(std::chrono::milliseconds(5000)); + GTEST_LOG_(INFO) << "Message should have been sent and processed."; + auto result = receiver.TryWaitForIncomingMessage(); + EXPECT_TRUE(result.first.HasValue()); + } receiver.Close(); server.StopListening(); } - TEST_F(TestMessages, AuthenticatedReceiverAzureToken) + TEST_F(TestMessageSendReceive, AuthenticatedReceiverAzureToken) { class ReceiverMock : public MessageTests::AmqpServerMock { public: @@ -626,5 +663,89 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { receiver.Close(); server.StopListening(); } + + TEST_F(TestMessageSendReceive, AuthenticatedReceiverTryReceive) + { + class ReceiverMock : public MessageTests::AmqpServerMock { + public: + void ShouldSendMessage(bool shouldSend) { m_shouldSendMessage = shouldSend; } + void SetSenderNodeName(std::string const& senderNodeName) + { + m_senderNodeName = senderNodeName; + } + + private: + mutable bool m_shouldSendMessage{false}; + std::string m_senderNodeName; + + void Poll() const override + { + if (m_shouldSendMessage + && m_linkMessageQueues.find(m_senderNodeName) != m_linkMessageQueues.end()) + { + GTEST_LOG_(INFO) << "Sending message to client." + m_senderNodeName; + Azure::Core::Amqp::Models::AmqpMessage sendMessage; + sendMessage.SetBody(Azure::Core::Amqp::Models::AmqpValue{"This is a message body."}); + if (m_linkMessageQueues.at(m_senderNodeName).LinkSender) + { + GTEST_LOG_(INFO) << "Sent, resetting should send."; + m_shouldSendMessage = false; + m_linkMessageQueues.at(m_senderNodeName).LinkSender->Send(sendMessage); + } + else + { + GTEST_LOG_(INFO) << "No sender, skipping"; + } + } + } + }; + + ReceiverMock server; + + auto sasCredential = std::make_shared( + "Endpoint=amqp://localhost:" + std::to_string(server.GetPort()) + + "/;SharedAccessKeyName=MyTestKey;SharedAccessKey=abcdabcd;EntityPath=testLocation"); + + ConnectionOptions connectionOptions; + + // connectionOptions.IdleTimeout = std::chrono::minutes(5); + connectionOptions.ContainerId = testing::UnitTest::GetInstance()->current_test_info()->name(); + connectionOptions.Port = server.GetPort(); + Connection connection("localhost", sasCredential, connectionOptions); + Session session{connection.CreateSession()}; + + server.SetSenderNodeName(sasCredential->GetEndpoint() + sasCredential->GetEntityPath()); + server.StartListening(); + + MessageReceiverOptions receiverOptions; + receiverOptions.Name = "receiver-link"; + receiverOptions.MessageTarget = "egress"; + receiverOptions.SettleMode = Azure::Core::Amqp::_internal::ReceiverSettleMode::First; + receiverOptions.MaxMessageSize = 65536; + receiverOptions.MaxLinkCredit = 500; // We allow at most 500 messages to be received. + receiverOptions.Name = "receiver-link"; + receiverOptions.EnableTrace = true; + MessageReceiver receiver(session.CreateMessageReceiver( + sasCredential->GetEndpoint() + sasCredential->GetEntityPath(), receiverOptions, nullptr)); + + receiver.Open(); + + { + auto result = receiver.TryWaitForIncomingMessage(); + EXPECT_FALSE(result.first.HasValue()); + } + + { + GTEST_LOG_(INFO) << "Trigger message send for polling."; + server.ShouldSendMessage(true); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + GTEST_LOG_(INFO) << "Message should have been sent and processed."; + auto result = receiver.TryWaitForIncomingMessage(); + EXPECT_TRUE(result.first.HasValue()); + } + receiver.Close(); + server.StopListening(); + } + #endif // !defined(AZ_PLATFORM_MAC) }}}} // namespace Azure::Core::Amqp::Tests diff --git a/sdk/core/azure-core-amqp/test/ut/message_source_target.cpp b/sdk/core/azure-core-amqp/test/ut/message_source_target.cpp index d7e3c2e05..7adf87232 100644 --- a/sdk/core/azure-core-amqp/test/ut/message_source_target.cpp +++ b/sdk/core/azure-core-amqp/test/ut/message_source_target.cpp @@ -33,12 +33,12 @@ TEST_F(TestSourceTarget, SimpleSourceTarget) { EXPECT_ANY_THROW(MessageSource source(AmqpValue{})); - AmqpValue val = static_cast(AmqpArray()); + AmqpValue val = AmqpArray().AsAmqpValue(); EXPECT_ANY_THROW(MessageSource source{val}); } { EXPECT_ANY_THROW(MessageTarget target(AmqpValue{})); - AmqpValue val = static_cast(AmqpArray()); + AmqpValue val = AmqpArray().AsAmqpValue(); EXPECT_ANY_THROW(MessageTarget target(val)); } } @@ -79,7 +79,7 @@ TEST_F(TestSourceTarget, TargetProperties) { MessageTargetOptions options; - options.Capabilities.push_back(static_cast(AmqpSymbol{"Test"})); + options.Capabilities.push_back(AmqpSymbol{"Test"}.AsAmqpValue()); MessageTarget target(options); EXPECT_EQ(1, target.GetCapabilities().size()); EXPECT_EQ(AmqpValueType::Symbol, target.GetCapabilities()[0].GetType()); @@ -201,6 +201,19 @@ TEST_F(TestSourceTarget, TargetProperties) } } +TEST_F(TestSourceTarget, TargetCreateCopy) +{ + { + MessageTarget target("address1"); + const AmqpValue v = target.AsAmqpValue(); + // AmqpValue value(v); + + // MessageTarget target2(value); + MessageTarget target2(v); + EXPECT_EQ(target.GetAddress(), target2.GetAddress()); + } +} + TEST_F(TestSourceTarget, TargetThroughValue) { MessageTarget target("address1"); @@ -238,7 +251,7 @@ TEST_F(TestSourceTarget, SourceProperties) { MessageSourceOptions options; - options.Capabilities.push_back(static_cast(AmqpSymbol{"Test"})); + options.Capabilities.push_back(AmqpSymbol{"Test"}.AsAmqpValue()); MessageSource source(options); EXPECT_EQ(1, source.GetCapabilities().size()); EXPECT_EQ(AmqpValueType::Symbol, source.GetCapabilities()[0].GetType()); @@ -374,7 +387,7 @@ TEST_F(TestSourceTarget, SourceProperties) { MessageSourceOptions options; - options.Outcomes.push_back(static_cast(AmqpSymbol("Test"))); + options.Outcomes.push_back(AmqpSymbol("Test").AsAmqpValue()); MessageSource source(options); EXPECT_EQ(1, source.GetOutcomes().size()); EXPECT_EQ(AmqpValueType::Symbol, source.GetOutcomes().at(0).GetType()); diff --git a/sdk/core/azure-core-amqp/test/ut/mock_amqp_server.hpp b/sdk/core/azure-core-amqp/test/ut/mock_amqp_server.hpp index 02b774e21..075f669a7 100644 --- a/sdk/core/azure-core-amqp/test/ut/mock_amqp_server.hpp +++ b/sdk/core/azure-core-amqp/test/ut/mock_amqp_server.hpp @@ -104,6 +104,24 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { = m_linkMessageQueues[nodeName].MessageSenderPresentQueue.WaitForResult(context); return result != nullptr; } + + std::unique_ptr TryWaitForMessage( + std::string const& nodeName) + { + // Poll for completion on both the mock server and the connection, that ensures that + // we can implement unsolicited sends from the Poll function. + auto result = m_linkMessageQueues[nodeName].MessageQueue.TryWaitForResult(); + if (result) + { + return std::move(std::get<0>(*result)); + } + else + { + Poll(); + return nullptr; + } + } + std::unique_ptr WaitForMessage( std::string const& nodeName) { @@ -133,13 +151,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { std::string const& nodeName, MessageLinkComponents const& linkComponents) { - GTEST_LOG_(INFO) << "Wait for incoming message."; - auto message = WaitForMessage(nodeName); - if (!message) - { - GTEST_LOG_(INFO) << "No message, canceling thread"; - } - else + auto message = TryWaitForMessage(nodeName); + if (message) { GTEST_LOG_(INFO) << "Received message: " << *message; if (nodeName == "$cbs" && IsCbsMessage(*message)) @@ -151,6 +164,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { MessageReceived(nodeName, linkComponents, *message); } } + std::this_thread::yield(); }; public: diff --git a/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/src/link.c b/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/src/link.c index 780f8339f..7b5bfd5c2 100644 --- a/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/src/link.c +++ b/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/src/link.c @@ -1731,7 +1731,8 @@ void link_dowork(LINK_HANDLE link) { LogError("NULL link"); } - else + // The caller of link_dowork has no way of determining the state of the link, so skip doing work if the link is not attached. + else if (link->link_state == LINK_STATE_ATTACHED) { tickcounter_ms_t current_tick; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/event_data.hpp b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/event_data.hpp index 21e7ca13f..cdd2623df 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/event_data.hpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/event_data.hpp @@ -155,6 +155,26 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models { */ ReceivedEventData(Azure::Core::Amqp::Models::AmqpMessage const& message); + // Destructor + ~ReceivedEventData() = default; + + /** @brief Copy an ReceivedEventData to another. + */ + ReceivedEventData(ReceivedEventData const&) = default; + + /** @brief Assign an ReceivedEventData to another. + * / + */ + ReceivedEventData& operator=(ReceivedEventData const&) = default; + + /** @brief Create an ReceivedEventData moving from another. + */ + ReceivedEventData(ReceivedEventData&&) = default; + + /** @brief Move an ReceivedEventData to another. + */ + ReceivedEventData& operator=(ReceivedEventData&&) = default; + /** @brief Get the raw AMQP message. * * Returns the underlying AMQP message that was received from the Event Hubs service. diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/event_data.cpp b/sdk/eventhubs/azure-messaging-eventhubs/src/event_data.cpp index 54f24ae5c..35df65c3f 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/event_data.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/event_data.cpp @@ -6,10 +6,16 @@ #include "private/eventhubs_constants.hpp" #include "private/eventhubs_utilities.hpp" +#include +#include + #include #include #include +using namespace Azure::Core::Diagnostics::_internal; +using namespace Azure::Core::Diagnostics; + namespace Azure { namespace Messaging { namespace EventHubs { namespace Models { EventData::EventData(Azure::Core::Amqp::Models::AmqpMessage const& message) @@ -86,7 +92,14 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models { { // The Key in MessageAnnotations is normally an AmqpSymbol, cast it to a string Key when // placing in SystemProperties. - SystemProperties.emplace(static_cast(key), item.second); + std::string keyName = static_cast(key); + auto result{SystemProperties.emplace(keyName, item.second)}; + if (!result.second) + { + // If the key already exists, log a warning. + Log::Stream(Logger::Level::Warning) + << "Duplicate key in MessageAnnotations: " << key << std::endl; + } } } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/partition_client.cpp b/sdk/eventhubs/azure-messaging-eventhubs/src/partition_client.cpp index 1b1629ae5..431184e0d 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/partition_client.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/partition_client.cpp @@ -27,7 +27,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { Azure::Core::Amqp::Models::AmqpValue const& filterValue) { Azure::Core::Amqp::Models::AmqpDescribed value{description.Code, filterValue}; - sourceOptions.Filter.emplace(description.Name, value); + sourceOptions.Filter.emplace(description.Name, value.AsAmqpValue()); } FilterDescription SelectorFilter{"apache.org:selector-filter:string", 0x0000468c00000004}; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/event_data_test.cpp b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/event_data_test.cpp index 93e6041f4..8c64cc1ff 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/event_data_test.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/event_data_test.cpp @@ -157,9 +157,9 @@ TEST_F(EventDataTest, ReceivedEventData) { { Azure::Core::Amqp::Models::AmqpMessage message; - message.MessageAnnotations[static_cast( - Azure::Core::Amqp::Models::AmqpSymbol{ - Azure::Messaging::EventHubs::_detail::PartitionKeyAnnotation})] + message.MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{ + Azure::Messaging::EventHubs::_detail::PartitionKeyAnnotation} + .AsAmqpValue()] = "PartitionKey"; Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message); ASSERT_TRUE(receivedEventData.PartitionKey); @@ -176,12 +176,13 @@ TEST_F(EventDataTest, ReceivedEventData) GTEST_LOG_(INFO) << "timeNow: " << timeNow.ToString(); - message.MessageAnnotations[static_cast( - Azure::Core::Amqp::Models::AmqpSymbol{ - Azure::Messaging::EventHubs::_detail::EnqueuedTimeAnnotation})] - = static_cast( - Azure::Core::Amqp::Models::AmqpTimestamp{ - std::chrono::duration_cast(timeNow.time_since_epoch())}); + message.MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{ + Azure::Messaging::EventHubs::_detail::EnqueuedTimeAnnotation} + .AsAmqpValue()] + = Azure::Core::Amqp::Models::AmqpTimestamp{std::chrono::duration_cast< + std::chrono::milliseconds>( + timeNow.time_since_epoch())} + .AsAmqpValue(); Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message); ASSERT_TRUE(receivedEventData.EnqueuedTime.HasValue()); GTEST_LOG_(INFO) << "EnqueuedTime: " << receivedEventData.EnqueuedTime.Value().ToString(); @@ -193,9 +194,9 @@ TEST_F(EventDataTest, ReceivedEventData) { Azure::Core::Amqp::Models::AmqpMessage message; - message.MessageAnnotations[static_cast( - Azure::Core::Amqp::Models::AmqpSymbol{ - Azure::Messaging::EventHubs::_detail::SequenceNumberAnnotation})] + message.MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{ + Azure::Messaging::EventHubs::_detail::SequenceNumberAnnotation} + .AsAmqpValue()] = static_cast(235); Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message); ASSERT_TRUE(receivedEventData.SequenceNumber); @@ -206,9 +207,9 @@ TEST_F(EventDataTest, ReceivedEventData) } { Azure::Core::Amqp::Models::AmqpMessage message; - message.MessageAnnotations[static_cast( - Azure::Core::Amqp::Models::AmqpSymbol{ - Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation})] + message.MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{ + Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation} + .AsAmqpValue()] = 54644; Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message); ASSERT_TRUE(receivedEventData.Offset); @@ -219,9 +220,9 @@ TEST_F(EventDataTest, ReceivedEventData) } { Azure::Core::Amqp::Models::AmqpMessage message; - message.MessageAnnotations[static_cast( - Azure::Core::Amqp::Models::AmqpSymbol{ - Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation})] + message.MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{ + Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation} + .AsAmqpValue()] = "54644"; Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message); ASSERT_TRUE(receivedEventData.Offset); @@ -232,9 +233,9 @@ TEST_F(EventDataTest, ReceivedEventData) } { Azure::Core::Amqp::Models::AmqpMessage message; - message.MessageAnnotations[static_cast( - Azure::Core::Amqp::Models::AmqpSymbol{ - Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation})] + message.MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{ + Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation} + .AsAmqpValue()] = static_cast(53); Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message); ASSERT_TRUE(receivedEventData.Offset); @@ -245,9 +246,9 @@ TEST_F(EventDataTest, ReceivedEventData) } { Azure::Core::Amqp::Models::AmqpMessage message; - message.MessageAnnotations[static_cast( - Azure::Core::Amqp::Models::AmqpSymbol{ - Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation})] + message.MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{ + Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation} + .AsAmqpValue()] = static_cast(57); Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message); EXPECT_TRUE(receivedEventData.Offset); @@ -258,9 +259,9 @@ TEST_F(EventDataTest, ReceivedEventData) } { Azure::Core::Amqp::Models::AmqpMessage message; - message.MessageAnnotations[static_cast( - Azure::Core::Amqp::Models::AmqpSymbol{ - Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation})] + message.MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{ + Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation} + .AsAmqpValue()] = static_cast(661011); Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message); EXPECT_TRUE(receivedEventData.Offset); @@ -271,9 +272,9 @@ TEST_F(EventDataTest, ReceivedEventData) } { Azure::Core::Amqp::Models::AmqpMessage message; - message.MessageAnnotations[static_cast( - Azure::Core::Amqp::Models::AmqpSymbol{ - Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation})] + message.MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{ + Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation} + .AsAmqpValue()] = static_cast(1412612); Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message); EXPECT_TRUE(receivedEventData.Offset); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/processor_test.cpp b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/processor_test.cpp index 0647a7974..a90613ec6 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/processor_test.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/processor_test.cpp @@ -111,7 +111,6 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { ProcessorOptions processorOptions; processorOptions.LoadBalancingStrategy = processorStrategy; processorOptions.UpdateInterval = std::chrono::milliseconds(1000); - processorOptions.Prefetch = 1500; // Set the initial link credits to 1500. Processor processor{consumerClient, checkpointStore, processorOptions}; @@ -594,17 +593,17 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { processor.Stop(); } -// The processor balanced and greedy tests fail when run on Linux or Mac. The tests run fine on -// Windows. For now, disable the tests on Linux and Mac. + // The processor balanced and greedy tests fail when run on Linux or Mac. The tests run fine on + // Windows. For now, disable the tests on Linux and Mac. #if !defined(AZ_PLATFORM_LINUX) && !defined(AZ_PLATFORM_MAC) - TEST_F(ProcessorTest, Processor_Balanced_LIVEONLY_) - { - TestWithLoadBalancer(Models::ProcessorStrategy::ProcessorStrategyBalanced); - } - TEST_F(ProcessorTest, Processor_Greedy_LIVEONLY_) - { - TestWithLoadBalancer(Models::ProcessorStrategy::ProcessorStrategyGreedy); - } + TEST_F(ProcessorTest, Processor_Balanced_LIVEONLY_) + { + TestWithLoadBalancer(Models::ProcessorStrategy::ProcessorStrategyBalanced); + } + TEST_F(ProcessorTest, Processor_Greedy_LIVEONLY_) + { + TestWithLoadBalancer(Models::ProcessorStrategy::ProcessorStrategyGreedy); + } #endif #if 0 TEST_F(ProcessorTest, Processor_Balanced_AcquisitionOnly_LIVEONLY_)