Fixed AMQP Link Credits hang; fixed several AMQP memory leaks. (#5119)

* Add link pollable; Fixed several memory leaks

* Rule of 5 fixes for AMQP and EventHubs types; moved message handle type to _detail namespace.

* Update sdk/core/azure-core-amqp/CHANGELOG.md

Co-authored-by: Anton Kolesnyk <41349689+antkmsft@users.noreply.github.com>

---------

Co-authored-by: Anton Kolesnyk <41349689+antkmsft@users.noreply.github.com>
This commit is contained in:
Larry Osterman 2023-11-09 15:55:54 -08:00 committed by GitHub
parent 0ca8a1d9d5
commit 7b8d324540
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 948 additions and 296 deletions

View File

@ -358,6 +358,21 @@
"curl-transport"
]
},
{
"name": "x64-static-release-perftests-asan",
"displayName": "x64 Release static With Perf Tests and samples, plus ASAN",
"inherits": [
"x64-static",
"release-build",
"enable-tests",
"enable-samples",
"enable-perf",
"enable-address-sanitizer"
],
"cacheVariables": {
"DISABLE_AZURE_CORE_OPENTELEMETRY": true
}
},
{
"name": "x86-msvc-static-debug-perftests",
"displayName": "x86 MSVC Debug static With Perf Tests and samples",
@ -480,14 +495,14 @@
{
"name": "linux-basic-g++",
"displayName": "Linux G++",
"description": "Using compilers: C = /usr/bin/cc, CXX = /usr/bin/c++",
"description": "Using compilers: C = /usr/bin/gcc, CXX = /usr/bin/g++",
"binaryDir": "${sourceDir}/out/build/${presetName}",
"generator": "Ninja",
"hidden": true,
"cacheVariables": {
"CMAKE_INSTALL_PREFIX": "${sourceDir}/out/install/${presetName}",
"CMAKE_C_COMPILER": "/usr/bin/cc",
"CMAKE_CXX_COMPILER": "/usr/bin/c++"
"CMAKE_C_COMPILER": "/usr/bin/gcc",
"CMAKE_CXX_COMPILER": "/usr/bin/g++"
},
"condition": {
"type": "equals",
@ -585,6 +600,7 @@
"name": "g++-debug-asan-tests",
"inherits": [
"linux-g++-debug-tests",
"enable-tests",
"enable-address-sanitizer"
],
"displayName": "Linux g++, ASAN+Debug+Tests"

View File

@ -8,6 +8,9 @@
### Bugs Fixed
- Fixed several memory leaks.
- AMQP Link Credits now work as expected.
### Other Changes
## 1.0.0-beta.5 (2023-11-07)

View File

@ -22,11 +22,16 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace
* the AsyncOperationQueue. WaitForResult will block until a result is available.
*/
template <typename... T> class AsyncOperationQueue final {
public:
AsyncOperationQueue() = default;
~AsyncOperationQueue() = default;
AsyncOperationQueue(const AsyncOperationQueue&) = delete;
AsyncOperationQueue& operator=(const AsyncOperationQueue&) = delete;
AsyncOperationQueue(AsyncOperationQueue&&) = default;
AsyncOperationQueue& operator=(AsyncOperationQueue&&) = default;
void CompleteOperation(T... operationParameters)
{
std::unique_lock<std::mutex> lock(m_operationComplete);

View File

@ -5,6 +5,7 @@
#include <azure/core/azure_assert.hpp>
#include <atomic>
#include <list>
#include <memory>
#include <mutex>
@ -34,6 +35,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace
std::list<std::shared_ptr<Pollable>> m_pollables;
std::mutex m_pollablesMutex;
std::thread m_pollingThread;
std::atomic<bool> m_activelyPolling;
bool m_stopped{false};
public:
@ -47,11 +49,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace
void AddPollable(std::shared_ptr<Pollable> pollable);
void RemovePollable(std::shared_ptr<Pollable> pollable)
{
std::lock_guard<std::mutex> lock(m_pollablesMutex);
m_pollables.remove(pollable);
}
void RemovePollable(std::shared_ptr<Pollable> pollable);
void AssertIdle()
{

View File

@ -24,6 +24,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
{
ParseConnectionString(connectionString);
}
ConnectionStringParser(const ConnectionStringParser&) = default;
ConnectionStringParser& operator=(const ConnectionStringParser&) = default;
ConnectionStringParser(ConnectionStringParser&&) = default;
ConnectionStringParser& operator=(ConnectionStringParser&&) = default;
~ConnectionStringParser() = default;
std::string const& GetEndpoint() const { return m_endpoint; }
@ -40,7 +44,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
std::string m_sharedAccessKey;
std::string m_uri;
std::string m_hostName;
uint16_t m_port;
uint16_t m_port{};
std::string m_entityPath;
};

View File

@ -23,11 +23,12 @@ namespace Azure { namespace Core { namespace _internal {
using type = BasicUniqueHandle<HEADER_INSTANCE_TAG, FreeAmqpHeader>;
};
}}} // namespace Azure::Core::_internal
namespace Azure { namespace Core { namespace Amqp { namespace Models {
namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace _detail {
using UniqueMessageHeaderHandle = Azure::Core::_internal::UniqueHandle<HEADER_INSTANCE_TAG>;
}}}}} // namespace Azure::Core::Amqp::Models::_detail
namespace Azure { namespace Core { namespace Amqp { namespace Models {
/**
* @brief The message header section carries standard delivery details about the transfer of a
* message through the AMQP network.
@ -37,10 +38,23 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
*/
struct MessageHeader final
{
/** @brief Default constructor. */
MessageHeader() = default;
/** @brief Destructor. */
~MessageHeader() = default;
/** @brief Move constructor.*/
MessageHeader(MessageHeader&& other) = default;
/** @brief Move assignment operator.*/
MessageHeader& operator=(MessageHeader&& other) = default;
/** @brief Copy constructor.*/
MessageHeader(MessageHeader const& other) = default;
/** @brief Copy assignment operator.*/
MessageHeader& operator=(MessageHeader const& other) = default;
/** @brief Compare two AMQP Message Headers for equality.
*
* @param that - the AMQP Message Header to compare to.
@ -126,8 +140,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
*/
struct MessageHeaderFactory
{
static MessageHeader FromUamqp(UniqueMessageHeaderHandle const& properties);
static UniqueMessageHeaderHandle ToUamqp(MessageHeader const& properties);
static MessageHeader FromUamqp(_detail::UniqueMessageHeaderHandle const& properties);
static _detail::UniqueMessageHeaderHandle ToUamqp(MessageHeader const& properties);
};
}}}}} // namespace Azure::Core::Amqp::Models::_internal

View File

@ -101,13 +101,17 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
*
* By default, AMQP uses 0, however services can override this to
* express additional semantics about the message payload.
*
* For more information, see [AMQP Message
* Format](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format).
*/
uint32_t MessageFormat = AmqpDefaultMessageFormatValue;
/** @brief The header for the message.
*
* For more information, see [AMQP Message
* Format](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format).
* Header](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-header)
*
*/
MessageHeader Header;
@ -141,8 +145,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
/** @brief Delivery Tag for the message.
*
* For more information, see [AMQP Application
* Properties](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-application-properties).
* For more information, see [AMQP Transport
* Transfer](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-delivery-tag).
*/
AmqpValue DeliveryTag;

View File

@ -37,9 +37,30 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
*/
struct MessageProperties final
{
/** @brief Default constructor.
*
* Creates an empty MessageProperties object.
*/
MessageProperties() = default;
/** @brief Destructor.
*
* Destroys the MessageProperties object.
*/
~MessageProperties() = default;
/** @brief Move Constructor. */
MessageProperties(MessageProperties&& other) = default;
/** @brief Move Assignment operator. */
MessageProperties& operator=(MessageProperties&& other) = default;
/** @brief Copy Constructor. */
MessageProperties(MessageProperties const& other) = default;
/** @brief Copy Assignment operator. */
MessageProperties& operator=(MessageProperties const& other) = default;
/** @brief The message-id, if set, uniquely identifies a message within the message system.
* The message producer is usually responsible for setting the message-id in such a way that
* it is assured to be globally unique. A broker MAY discard a message as a duplicate if the

View File

@ -5,6 +5,8 @@
#include "amqp_header.hpp"
#include <azure/core/diagnostics/logger.hpp>
#include <azure/core/internal/diagnostics/log.hpp>
#include <azure/core/internal/unique_handle.hpp>
#include <azure/core/uuid.hpp>
@ -32,6 +34,7 @@ namespace Azure { namespace Core { namespace _internal {
namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace _detail {
using UniqueAmqpValueHandle = Azure::Core::_internal::UniqueHandle<AMQP_VALUE_DATA_TAG>;
std::ostream& operator<<(std::ostream& os, AMQP_VALUE_DATA_TAG const* value);
}}}}} // namespace Azure::Core::Amqp::Models::_detail
namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace _internal {
@ -111,6 +114,29 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
AmqpValue() noexcept;
~AmqpValue();
/** @brief Construct an AMQP Value from an existing AMQP Value
* @param that - source value to copy.
*/
AmqpValue(AmqpValue const& that) noexcept;
/** @brief Move an AMQP Value to another existing AMQP Value
* @param that - source value to move.
*/
AmqpValue(AmqpValue&& that) noexcept;
/** @brief Copy an AMQP value to the current AMQP value.
*
* @param that the other AMQP Value to copy.
* @returns "this".
*/
AmqpValue& operator=(AmqpValue const& that);
/** @brief Move an AMQP value to the current AMQP value.
*
* @param that the other AMQP Value to move.
* @returns "this".
*/
AmqpValue& operator=(AmqpValue&& that) noexcept;
/** @brief Construct an AMQP boolean value.
*
* Defined in [AMQP Core Types
@ -282,16 +308,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
*/
AmqpValue(Azure::Core::Uuid const& value);
/** @brief Construct an AMQP Value from an existing AMQP Value
* @param that - source value to copy.
*/
AmqpValue(AmqpValue const& that) noexcept;
/** @brief Move an AMQP Value to another existing AMQP Value
* @param that - source value to move.
*/
AmqpValue(AmqpValue&& that) noexcept;
// Interoperability functions for uAMQP
/// @cond
@ -304,29 +320,29 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
*/
operator AMQP_VALUE_DATA_TAG*() const;
/** @brief Interoperability helper function which releases the internal AmqpValue.
*
* @returns uAMQP AMQP_VALUE object.
*
* @remarks This is an internal operator which should not be called by customers.
*/
AMQP_VALUE_DATA_TAG* Release();
/** @brief Interoperability helper function which creates an AmqpValue from a uAMQP
* AMQP_VALUE object.
*
* @param value source uAMQP AMQP_VALUE object.
*
* @remarks This is an internal operator which should not be called by customers.
*
* @remarks Note that this does NOT capture the passed in AMQP_VALUE object, instead it clones
* the underlying AMQP_VALUE. This is why it takes a UniqueAmqpValueHandle as a parameter
* instead of a raw AMQP_VALUE - that ensures that someone will free the underlying AMQP_VALUE
* if needed.
*/
AmqpValue(AMQP_VALUE_DATA_TAG* value);
AmqpValue(_detail::UniqueAmqpValueHandle const& value);
/// @endcond
/** @brief Copy an AMQP value to the current AMQP value.
*
* @param that the other AMQP Value to copy.
* @returns "this".
*/
AmqpValue& operator=(AmqpValue const& that);
/** @brief Move an AMQP value to the current AMQP value.
*
* @param that the other AMQP Value to move.
* @returns "this".
*/
AmqpValue& operator=(AmqpValue&& that) noexcept;
/** @brief Equality comparison operator.
* @param that - Value to compare to this value.
* @returns true if the that is equal to this.
@ -565,6 +581,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
static AmqpValue Deserialize(uint8_t const* data, size_t size);
private:
AmqpValue(AMQP_VALUE_DATA_TAG*) = delete;
_detail::UniqueAmqpValueHandle m_value;
};
std::ostream& operator<<(std::ostream& os, AmqpValue const& value);
@ -586,7 +603,21 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
AmqpCollectionBase(initializer_type const& initializer) : m_value{initializer} {}
AmqpCollectionBase(T initializer) : m_value{initializer} {}
AmqpCollectionBase() {}
AmqpCollectionBase(){};
// Copy constructor
AmqpCollectionBase(const AmqpCollectionBase& other) = default;
// Copy assignment operator
AmqpCollectionBase& operator=(const AmqpCollectionBase& other) = default;
// Move constructor
AmqpCollectionBase(AmqpCollectionBase&& other) noexcept = default;
// Move assignment operator
AmqpCollectionBase& operator=(AmqpCollectionBase&& other) = default;
~AmqpCollectionBase() = default;
public:
/** @brief Returns the underlying value.
@ -594,11 +625,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
explicit operator T const &() const { return m_value; }
/** @brief Convert this collection type to an AMQP value.*/
explicit operator AmqpValue() const { return static_cast<UniqueAmqpValueHandle>(*this).get(); }
AmqpValue AsAmqpValue() const { return AmqpValue{UniqueAmqpValueHandle{*this}}; }
/** @brief Returns the size of the underlying value.*/
inline typename T::size_type size() const { return m_value.size(); }
/// @brief Array accessor operator.
/// @param pos Position of an element in the container.
/// @return element value.
const typename T::value_type& operator[](const typename T::size_type pos) const noexcept
{
return m_value.operator[](pos);
@ -619,6 +653,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
/** @brief Returns true if the underlying value is empty.*/
bool empty() const noexcept { return m_value.empty(); }
protected:
/**
* @brief Convert an AmqpCollectionBase instance to a uAMQP AMQP_VALUE.
*
@ -626,7 +661,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
* implementation.
*
*/
operator UniqueAmqpValueHandle() const;
virtual operator UniqueAmqpValueHandle() const;
};
}}}}} // namespace Azure::Core::Amqp::Models::_detail
@ -643,6 +678,21 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
/** @brief Construct a new AmqpArray object with an initializer list. */
AmqpArray(initializer_type const& values);
/** @brief Destroy an array. */
virtual ~AmqpArray() = default;
/** @brief Copy constructor */
AmqpArray(const AmqpArray& other) = default;
/** @brief Copy assignment operator */
AmqpArray& operator=(const AmqpArray& other) = default;
/** @brief Move constructor */
AmqpArray(AmqpArray&& other) noexcept = default;
/** @brief Move assignment operator */
AmqpArray& operator=(AmqpArray&& other) noexcept = default;
/** @brief Construct a new AmqpArray object from an existing uAMQP AMQP_VALUE item
* @remarks Note that this does NOT capture the passed in AMQP_VALUE object, the caller is
* responsible for freeing that object.
@ -674,6 +724,18 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
{
}
/** @brief Move Constructor */
AmqpMap(AmqpMap&& other) noexcept = default;
/** @brief Move assignment operator */
AmqpMap& operator=(AmqpMap&& other) noexcept = default;
/** @brief Copy Constructor */
AmqpMap(const AmqpMap& other) = default;
/** @brief Copy assignment operator */
AmqpMap& operator=(const AmqpMap& other) = default;
/** @brief Construct a new AmqpMap object from an existing uAMQP AMQP_VALUE item
* @remarks Note that this does NOT capture the passed in AMQP_VALUE object, the caller is
* responsible for freeing that object.
@ -685,6 +747,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
*/
AmqpMap(AMQP_VALUE_DATA_TAG* const value);
virtual ~AmqpMap() = default;
/** @brief Access an element in the map.
*
* @return a reference to the value associated with the specified key.
@ -714,12 +778,25 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
class AmqpList final : public _detail::AmqpCollectionBase<std::vector<AmqpValue>, AmqpList> {
public:
AmqpList() : AmqpCollectionBase(){};
virtual ~AmqpList() = default;
/** @brief Construct a new AmqpList object with an initializer list. */
AmqpList(std::initializer_list<std::vector<AmqpValue>::value_type> const& values)
: AmqpCollectionBase(values)
{
}
/** @brief Copy Constructor */
AmqpList(const AmqpList& other) = default;
/** @brief Copy assignment operator */
AmqpList& operator=(const AmqpList& other) = default;
/** @brief Move Constructor */
AmqpList(AmqpList&& other) noexcept = default;
/** @brief Move assignment operator */
AmqpList& operator=(AmqpList&& other) noexcept = default;
/** @brief Construct a new AmqpList object from an existing uAMQP AMQP_VALUE item
*
* @remarks Note that this does NOT capture the passed in AMQP_VALUE object, the caller is
@ -744,11 +821,24 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
: public _detail::AmqpCollectionBase<std::vector<std::uint8_t>, AmqpBinaryData> {
public:
AmqpBinaryData() : AmqpCollectionBase(){};
virtual ~AmqpBinaryData() = default;
/** @brief Construct a new AmqpBinaryData object with an initializer list. */
AmqpBinaryData(initializer_type const& values) : AmqpCollectionBase(values){};
/** @brief Construct a new AmqpBinaryData from a vector of bytes. */
AmqpBinaryData(std::vector<std::uint8_t> const& values) : AmqpCollectionBase(values){};
/** @brief Copy constructor */
AmqpBinaryData(const AmqpBinaryData& other) = default;
/** @brief Copy assignment operator */
AmqpBinaryData& operator=(const AmqpBinaryData& other) = default;
/** @brief Move Constructor */
AmqpBinaryData(AmqpBinaryData&& other) noexcept = default;
/** @brief Move assignment operator */
AmqpBinaryData& operator=(AmqpBinaryData&& other) noexcept = default;
/** @brief Assign a vector of bytes to the current AmqpBinaryData. */
AmqpBinaryData& operator=(std::vector<std::uint8_t> const& values)
{
@ -780,6 +870,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
class AmqpSymbol final : public _detail::AmqpCollectionBase<std::string, AmqpSymbol> {
public:
AmqpSymbol() : AmqpCollectionBase(){};
virtual ~AmqpSymbol() = default;
/** @brief Construct a new AmqpSymbol object with an initializer list. */
AmqpSymbol(std::string const& values) : AmqpCollectionBase(values){};
@ -789,6 +881,18 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
/** @brief Construct a new AmqpSymbol object from a constant string value. */
AmqpSymbol(const char* const values) : AmqpCollectionBase(values){};
/** @brief Copy constructor */
AmqpSymbol(const AmqpSymbol& other) = default;
/** @brief Copy Assignment operator */
AmqpSymbol& operator=(const AmqpSymbol& other) = default;
/** @brief Move constructor */
AmqpSymbol(AmqpSymbol&& other) noexcept = default;
/** @brief Move Assignment operator */
AmqpSymbol& operator=(AmqpSymbol&& other) noexcept = default;
/** @brief Construct a new AmqpSymbol object from an existing uAMQP AMQP_VALUE item
*
* @remarks Note that this does NOT capture the passed in AMQP_VALUE object, the caller is
@ -830,6 +934,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
public:
AmqpTimestamp();
~AmqpTimestamp() = default;
/** @brief Construct a new AmqpTimestamp object . */
AmqpTimestamp(std::chrono::milliseconds const& values);
@ -845,10 +951,17 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
*/
AmqpTimestamp(AMQP_VALUE_DATA_TAG* const value);
/**
* @brief Convert an existing AmqpSymbol to an AmqpValue.
*/
explicit operator AmqpValue() const;
/** @brief Copy constructor */
AmqpTimestamp(const AmqpTimestamp& other) = default;
/** @brief Copy Assignment operator */
AmqpTimestamp& operator=(const AmqpTimestamp& other) = default;
/** @brief Move constructor */
AmqpTimestamp(AmqpTimestamp&& other) noexcept = default;
/** @brief Move Assignment operator */
AmqpTimestamp& operator=(AmqpTimestamp&& other) noexcept = default;
/**
* @brief Convert an AmqpSymbol instance to a uAMQP AMQP_VALUE.
@ -861,6 +974,12 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
*/
operator _detail::UniqueAmqpValueHandle() const;
/** @brief Convert this AmqpTimestamp to an AmqpValue.
*
* @return An AmqpValue containing this AmqpTimestamp.
*/
AmqpValue AsAmqpValue() const { return AmqpValue{_detail::UniqueAmqpValueHandle{*this}}; }
/**
* @brief Convert an AmqpTimestamp instance to a std::chrono::milliseconds.
*/
@ -885,17 +1004,37 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
* @note The AMQP Composite type representation does NOT include the underlying field names,
* just the field values.
*
*
*/
class AmqpComposite final
: public _detail::AmqpCollectionBase<std::vector<AmqpValue>, AmqpComposite> {
public:
/** @brief Construct a new AmqpComposite object. */
AmqpComposite() : AmqpCollectionBase(){};
virtual ~AmqpComposite() = default;
/** @brief Construct a new AmqpComposite object with an initializer list. */
AmqpComposite(AmqpValue const& descriptor, std::initializer_list<AmqpValue> const& values);
/** @brief Construct a new AmqpComposite object from another */
AmqpComposite(const AmqpComposite& other) = default;
/** @brief Copy assignment operator.
* @param other composite value to copy.
* @returns reference to this composite value.
*/
AmqpComposite& operator=(const AmqpComposite& other) = default;
/** @brief Move constructor.
* @param other composite value to move.
*/
AmqpComposite(AmqpComposite&& other) noexcept = default;
/** @brief Move assignment operator.
* @param other composite value to move.
* @returns reference to this composite value.
*/
AmqpComposite& operator=(AmqpComposite&& other) noexcept = default;
/** @brief Construct a new AmqpComposite object from an existing uAMQP AMQP_VALUE item
* @remarks Note that this does NOT capture the passed in AMQP_VALUE object, the caller is
* responsible for freeing that object.
@ -932,6 +1071,15 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
*/
AmqpValue const& GetDescriptor() const { return m_descriptor; }
/**
* @brief Convert an existing AmqpComposite to an AmqpValue.
*/
explicit operator AmqpValue() const
{
return static_cast<_detail::UniqueAmqpValueHandle>(*this);
}
protected:
/**
* @brief Convert an AmqpComposite instance to a uAMQP AMQP_VALUE.
*
@ -942,13 +1090,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
* by the caller.
*/
operator _detail::UniqueAmqpValueHandle() const;
/**
* @brief Convert an existing AmqpComposite to an AmqpValue.
*/
explicit operator AmqpValue() const
{
return static_cast<_detail::UniqueAmqpValueHandle>(*this).get();
}
private:
AmqpValue m_descriptor;
@ -975,6 +1116,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
*/
AmqpDescribed(AmqpSymbol const& descriptor, AmqpValue const& value);
~AmqpDescribed() = default;
/** @brief Construct a new AmqpDescribed object with a 64bit descriptor.
*
* @param descriptor - the Descriptor for the described value. The descriptor value SHOULD
@ -1041,6 +1184,12 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
*/
operator _detail::UniqueAmqpValueHandle() const;
/** @brief Convert this AmqpDescribed to an AmqpValue.
*
* @return An AmqpValue containing this AmqpDescribed.
*/
AmqpValue AsAmqpValue() const;
/**
* @brief Compare this AmqpDescribed value with another.
*

View File

@ -269,9 +269,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
{
throw std::runtime_error("Failed to set max frame size.");
}
if (connection_set_properties(
m_connection.get(),
static_cast<Models::_detail::UniqueAmqpValueHandle>(m_options.Properties).get()))
if (connection_set_properties(m_connection.get(), m_options.Properties.AsAmqpValue()))
{
throw std::runtime_error("Failed to set connection properties.");
}
@ -500,11 +498,11 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
throw std::logic_error("Connection already closed.");
}
std::unique_lock<LockType> lock(m_amqpMutex);
// Stop polling on this connection, we're shutting it down.
EnableAsyncOperation(false);
std::unique_lock<LockType> lock(m_amqpMutex);
if (m_connectionOpened)
{
if (connection_close(
@ -556,7 +554,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
{
throw std::runtime_error("COuld not get properties.");
}
return Models::AmqpValue{value}.AsMap();
return Models::AmqpValue{Models::_detail::UniqueAmqpValueHandle{value}}.AsMap();
}
uint32_t ConnectionImpl::GetRemoteMaxFrameSize() const

View File

@ -314,7 +314,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
void LinkImpl::SetDesiredCapabilities(Models::AmqpValue desiredCapabilities)
{
if (link_set_desired_capabilities(m_link, desiredCapabilities))
{
throw std::runtime_error("Could not set desired capabilities.");
@ -328,7 +327,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
{
throw std::runtime_error("Could not convert field to header.");
}
return Models::AmqpValue{desiredCapabilitiesVal};
return Models::AmqpValue{Models::_detail::UniqueAmqpValueHandle{desiredCapabilitiesVal}};
}
void LinkImpl::SubscribeToDetachEvent(OnLinkDetachEvent onLinkDetach)
@ -355,6 +354,13 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
}
}
void LinkImpl::Poll()
{
// Ensure that the connection hierarchy's state is not modified while polling on the link.
auto lock{m_session->GetConnection()->Lock()};
link_dowork(m_link);
}
void LinkImpl::ResetLinkCredit(std::uint32_t linkCredit, bool drain)
{
if (link_reset_link_credit(m_link, linkCredit, drain))

View File

@ -112,7 +112,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
try
{
m_messageSender->Open(context);
m_messageSenderOpen = true;
m_messageReceiver->Open(context);
m_messageReceiverOpen = true;
}
catch (std::runtime_error const& e)
{

View File

@ -211,7 +211,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
{
m_link->SetMaxLinkCredit(m_options.MaxLinkCredit);
}
m_link->SetAttachProperties(static_cast<Models::AmqpValue>(m_options.Properties));
m_link->SetAttachProperties(m_options.Properties.AsAmqpValue());
}
AMQP_VALUE MessageReceiverImpl::OnMessageReceivedFn(const void* context, MESSAGE_HANDLE message)
@ -234,7 +234,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
{
rv = receiver->OnMessageReceived(incomingMessage);
}
return amqpvalue_clone(rv);
return rv.Release();
}
return Models::_internal::Messaging::DeliveryRejected(
@ -474,6 +474,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
}
// Mark the connection as async so that we can use the async APIs.
m_session->GetConnection()->EnableAsyncOperation(true);
// And add the link to the list of pollable items.
Common::_detail::GlobalStateHolder::GlobalStateInstance()->AddPollable(m_link);
}
void MessageReceiverImpl::Close()
@ -485,7 +488,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
Log::Stream(Logger::Level::Verbose) << "Lock for Closing message receiver.";
}
auto lock{m_session->GetConnection()->Lock()};
AZURE_ASSERT(m_link);
if (m_options.EnableTrace)
@ -497,8 +499,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
{
Log::Stream(Logger::Level::Verbose) << "Closing message receiver. Stop async";
}
Common::_detail::GlobalStateHolder::GlobalStateInstance()->RemovePollable(
m_link); // This will ensure that the link is cleaned up on the next poll()
m_session->GetConnection()->EnableAsyncOperation(false);
auto lock{m_session->GetConnection()->Lock()};
// Clear messages from the queue.
m_messageQueue.Clear();
if (messagereceiver_close(m_messageReceiver.get()))

View File

@ -314,6 +314,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
Log::Stream(Logger::Level::Verbose) << "Opening message sender. Enable async operation.";
}
m_session->GetConnection()->EnableAsyncOperation(true);
// Enable async on the link as well.
Common::_detail::GlobalStateHolder::GlobalStateInstance()->AddPollable(m_link);
m_senderOpen = true;
}
@ -325,7 +329,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
{
Log::Stream(Logger::Level::Verbose) << "Lock for Closing message sender.";
}
auto lock{m_session->GetConnection()->Lock()};
if (m_options.EnableTrace)
{
@ -333,6 +336,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
Log::Stream(Logger::Level::Verbose) << "Unsubscribe from link detach event.";
}
m_link->UnsubscribeFromDetachEvent();
Common::_detail::GlobalStateHolder::GlobalStateInstance()->RemovePollable(
m_link); // This will ensure that the link is cleaned up on the next poll()
#if SENDER_SYNCHRONOUS_CLOSE
bool shouldWaitForClose = m_currentState == _internal::MessageSenderState::Closing
|| m_currentState == _internal::MessageSenderState::Open;
@ -340,6 +347,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
m_session->GetConnection()->EnableAsyncOperation(false);
auto lock{m_session->GetConnection()->Lock()};
if (messagesender_close(m_messageSender.get()))
{
throw std::runtime_error("Could not close message sender");
@ -401,7 +410,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
result = _internal::MessageSendStatus::Timeout;
break;
}
onComplete(result, disposition);
// Reference disposition so that we don't over-release when the AmqpValue passed to OnComplete
// is destroyed.
onComplete(result, Models::_detail::UniqueAmqpValueHandle{amqpvalue_clone(disposition)});
}
};

View File

@ -3,6 +3,7 @@
#pragma once
#include "azure/core/amqp/internal/common/global_state.hpp"
#include "azure/core/amqp/internal/models/amqp_error.hpp"
#include "azure/core/amqp/internal/models/message_source.hpp"
#include "azure/core/amqp/internal/models/message_target.hpp"
@ -17,7 +18,8 @@
namespace Azure { namespace Core { namespace Amqp { namespace _detail {
class LinkImpl final : public std::enable_shared_from_this<LinkImpl> {
class LinkImpl final : public std::enable_shared_from_this<LinkImpl>,
public Common::_detail::Pollable {
using OnLinkDetachEvent = std::function<void(Models::_internal::AmqpError)>;
@ -96,5 +98,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
ON_LINK_DETACH_EVENT_SUBSCRIPTION_HANDLE m_linkSubscriptionHandle{};
static void OnLinkDetachEventFn(void* context, ERROR_HANDLE error);
// Inherited via Pollable
void Poll() override;
};
}}}} // namespace Azure::Core::Amqp::_detail

View File

@ -235,15 +235,17 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
_internal::LinkEndpoint linkEndpoint(LinkEndpointFactory::CreateLinkEndpoint(newLinkEndpoint));
if (session->m_eventHandler)
{
// The input source, target, and properties are owned by the caller, so we need to clone
// them before putting them in a UniqueAmqpValueHandle so we can construct an AmqpValue.
return session->m_eventHandler->OnLinkAttached(
_detail::SessionFactory::CreateFromInternal(session->shared_from_this()),
linkEndpoint,
name,
role == role_receiver ? Azure::Core::Amqp::_internal::SessionRole::Receiver
: Azure::Core::Amqp::_internal::SessionRole::Sender,
source,
target,
properties);
Models::_detail::UniqueAmqpValueHandle{amqpvalue_clone(source)},
Models::_detail::UniqueAmqpValueHandle{amqpvalue_clone(target)},
Models::_detail::UniqueAmqpValueHandle{amqpvalue_clone(properties)});
}
else
{

View File

@ -8,6 +8,7 @@
#include <azure/core/diagnostics/logger.hpp>
#include <azure/core/internal/diagnostics/log.hpp>
#include <azure_c_shared_utility/gballoc.h>
#include <azure_c_shared_utility/platform.h>
#include <azure_c_shared_utility/xlogging.h>
@ -23,6 +24,7 @@
using namespace Azure::Core::Diagnostics::_internal;
using namespace Azure::Core::Diagnostics;
// cspell: words gballoc
namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace _detail {
// Logging callback for uAMQP and azure-c-shared-utility.
@ -87,6 +89,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace
GlobalStateHolder::GlobalStateHolder()
{
#if defined(GB_DEBUG_ALLOC)
gballoc_init();
#endif
if (platform_init())
{
throw std::runtime_error("Could not initialize platform.");
@ -98,21 +103,25 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace
m_pollingThread = std::thread([this]() {
do
{
std::list<std::shared_ptr<Pollable>> capturedList;
{
std::unique_lock<std::mutex> lock{m_pollablesMutex};
// If there are no pollables, there's no point in doing any work.
if (m_pollables.empty())
std::list<std::shared_ptr<Pollable>> capturedList;
{
continue;
std::unique_lock<std::mutex> lock{m_pollablesMutex};
// If there are no pollables, there's no point in doing any work.
if (m_pollables.empty())
{
continue;
}
capturedList = m_pollables;
m_activelyPolling = true;
}
capturedList = m_pollables;
}
for (auto const& pollable : capturedList)
{
pollable->Poll();
for (auto const& pollable : capturedList)
{
pollable->Poll();
}
}
m_activelyPolling = false;
// std::this_thread::yield();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
} while (!m_stopped);
@ -127,6 +136,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace
m_pollingThread.join();
}
platform_deinit();
#if defined(GB_DEBUG_ALLOC)
gballoc_deinit();
#endif
}
void GlobalStateHolder::AddPollable(std::shared_ptr<Pollable> pollable)
@ -138,6 +150,35 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace
}
}
void GlobalStateHolder::RemovePollable(std::shared_ptr<Pollable> pollable)
{
// There is a bit of a complicated lock-free dance happening here.
// The m_pollables list is accessed by the polling thread, and the list is modified by the user
// thread. To ensure integrity of the list, the polling thread takes the lock, copies the
// pollable from the list, releases the lock and then iterates over the pollables at the
// snapshot.
//
// Because the pollable is a shared_ptr, the user thread can remove a pollable while the
// background thread is polling.
//
// But we want to make sure that the thread has finished polling (and thus has removed the copy
// of the pollables list). For that, we have the m_activelyPolling variable. It is set under the
// pollables lock, and cleared after the polling thread has finished polling (outside the lock).
//
// This means that we can spin on the m_activelyPolling variable *under* the pollables lock safe
// in the knowledge that IF the variable is set to true, it means that we acquired the
// pollablesMutex during the interval when the captured list is being interated over. And that
// the m_activelyPolling variable will only be cleared AFTER the captured list is freed.
//
std::lock_guard<std::mutex> lock(m_pollablesMutex);
m_pollables.remove(pollable);
// Spin until m_activelyPolling is false, this ensures that the polling thread is not using the
// capturedList copy of m_pollables.
while (m_activelyPolling.load())
;
}
GlobalStateHolder* GlobalStateHolder::GlobalStateInstance()
{
static GlobalStateHolder globalState;

View File

@ -42,7 +42,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
AMQP_VALUE info;
if (!error_get_info(handle, &info) && info)
{
rv.Info = AmqpValue{info}.AsMap();
// error_get_info returns the AMQP value in place, so we clone it before passing it to the
// UniqueAmqpValueHandle.
rv.Info = AmqpValue{Models::_detail::UniqueAmqpValueHandle{amqpvalue_clone(info)}}.AsMap();
}
return rv;
@ -57,14 +59,15 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
}
if (!error.Info.empty())
{
error_set_info(errorHandle.get(), AmqpValue{error.Info});
AmqpValue infoValue(error.Info.AsAmqpValue());
error_set_info(errorHandle.get(), infoValue);
}
// amqpvalue_create_error clones the error handle, so we remember it separately.
_detail::UniqueAmqpValueHandle handleAsValue{amqpvalue_create_error(errorHandle.get())};
// The AmqpValue constructor will clone the handle passed into it.
// The UniqueAmqpValueHandle will take care of freeing the cloned handle.
return handleAsValue.get();
return handleAsValue;
}
std::ostream& operator<<(std::ostream& os, AmqpError const& error)
{

View File

@ -29,7 +29,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
: true);
}
MessageHeader _internal::MessageHeaderFactory::FromUamqp(UniqueMessageHeaderHandle const& handle)
MessageHeader _internal::MessageHeaderFactory::FromUamqp(
_detail::UniqueMessageHeaderHandle const& handle)
{
MessageHeader rv;
bool boolValue;
@ -63,9 +64,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
return rv;
}
UniqueMessageHeaderHandle _internal::MessageHeaderFactory::ToUamqp(MessageHeader const& header)
_detail::UniqueMessageHeaderHandle _internal::MessageHeaderFactory::ToUamqp(
MessageHeader const& header)
{
UniqueMessageHeaderHandle rv{header_create()};
_detail::UniqueMessageHeaderHandle rv{header_create()};
if (header.Durable)
{
if (header_set_durable(rv.get(), header.Durable))
@ -130,14 +132,16 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
size_t MessageHeader::GetSerializedSize(MessageHeader const& properties)
{
auto handle = _internal::MessageHeaderFactory::ToUamqp(properties);
AmqpValue propertiesAsValue{amqpvalue_create_header(handle.get())};
AmqpValue propertiesAsValue{
Models::_detail::UniqueAmqpValueHandle{amqpvalue_create_header(handle.get())}};
return AmqpValue::GetSerializedSize(propertiesAsValue);
}
std::vector<uint8_t> MessageHeader::Serialize(MessageHeader const& header)
{
auto handle = _internal::MessageHeaderFactory::ToUamqp(header);
AmqpValue headerAsValue{amqpvalue_create_header(handle.get())};
AmqpValue headerAsValue{
Models::_detail::UniqueAmqpValueHandle{amqpvalue_create_header(handle.get())}};
return Models::AmqpValue::Serialize(headerAsValue);
}
@ -149,7 +153,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
{
throw std::runtime_error("Could not convert value to AMQP Header.");
}
UniqueMessageHeaderHandle uniqueHandle{handle};
_detail::UniqueMessageHeaderHandle uniqueHandle{handle};
handle = nullptr;
return _internal::MessageHeaderFactory::FromUamqp(uniqueHandle);
}

View File

@ -77,7 +77,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
if (!message_get_delivery_annotations(message, &annotationsVal) && annotationsVal != nullptr)
{
UniqueAmqpValueHandle deliveryAnnotations(annotationsVal);
auto deliveryMap = AmqpValue{deliveryAnnotations.get()}.AsMap();
auto deliveryMap = AmqpValue{deliveryAnnotations}.AsMap();
rv.DeliveryAnnotations = deliveryMap;
}
}
@ -89,7 +89,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
UniqueAmqpValueHandle messageAnnotations(annotationVal);
if (messageAnnotations)
{
auto messageMap = AmqpValue{messageAnnotations.get()}.AsMap();
auto messageMap = AmqpValue{messageAnnotations}.AsMap();
rv.MessageAnnotations = messageMap;
}
}
@ -159,7 +159,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
if (!message_get_delivery_tag(message, &deliveryTagVal))
{
UniqueAmqpValueHandle deliveryTag(deliveryTagVal);
rv.DeliveryTag = AmqpValue{deliveryTag.get()};
rv.DeliveryTag = AmqpValue{deliveryTag};
}
}
{
@ -168,7 +168,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
{
UniqueAmqpValueHandle footerAnnotations(footerVal);
footerVal = nullptr;
auto footerMap = AmqpValue{footerAnnotations.get()}.AsMap();
auto footerMap = AmqpValue{footerAnnotations}.AsMap();
rv.Footer = footerMap;
}
}
@ -220,7 +220,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
AMQP_VALUE bodyValue;
if (!message_get_body_amqp_value_in_place(message, &bodyValue))
{
rv.m_amqpValueBody = bodyValue;
rv.m_amqpValueBody = _detail::UniqueAmqpValueHandle{amqpvalue_clone(bodyValue)};
}
rv.BodyType = MessageBodyType::Value;
}
@ -258,8 +258,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
if (!message.DeliveryAnnotations.empty())
{
if (message_set_delivery_annotations(
rv.get(), static_cast<UniqueAmqpValueHandle>(message.DeliveryAnnotations).get()))
if (message_set_delivery_annotations(rv.get(), message.DeliveryAnnotations.AsAmqpValue()))
{
throw std::runtime_error("Could not set delivery annotations.");
}
@ -267,8 +266,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
if (!message.MessageAnnotations.empty())
{
if (message_set_message_annotations(
rv.get(), static_cast<UniqueAmqpValueHandle>(message.MessageAnnotations).get()))
if (message_set_message_annotations(rv.get(), message.MessageAnnotations.AsAmqpValue()))
{
throw std::runtime_error("Could not set message annotations.");
}
@ -289,8 +287,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
}
appProperties.emplace(val);
}
if (message_set_application_properties(
rv.get(), static_cast<UniqueAmqpValueHandle>(appProperties).get()))
if (message_set_application_properties(rv.get(), appProperties.AsAmqpValue()))
{
throw std::runtime_error("Could not set application properties.");
}
@ -298,8 +295,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
if (!message.DeliveryTag.IsNull())
{
if (message_set_delivery_tag(
rv.get(), static_cast<UniqueAmqpValueHandle>(message.DeliveryTag).get()))
if (message_set_delivery_tag(rv.get(), message.DeliveryTag))
{
throw std::runtime_error("Could not set delivery tag.");
}
@ -307,7 +303,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
if (!message.Footer.empty())
{
if (message_set_footer(rv.get(), static_cast<UniqueAmqpValueHandle>(message.Footer).get()))
if (message_set_footer(rv.get(), message.Footer.AsAmqpValue()))
{
throw std::runtime_error("Could not set message annotations.");
}
@ -331,7 +327,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
case MessageBodyType::Sequence:
for (auto const& sequenceVal : message.m_amqpSequenceBody)
{
if (message_add_body_amqp_sequence(rv.get(), AmqpValue(sequenceVal)))
if (message_add_body_amqp_sequence(rv.get(), sequenceVal.AsAmqpValue()))
{
throw std::runtime_error("Could not set message body AMQP sequence value.");
}
@ -424,16 +420,16 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
}
if (!message.DeliveryAnnotations.empty())
{
AmqpValue deliveryAnnotations{
amqpvalue_create_delivery_annotations(AmqpValue{message.DeliveryAnnotations})};
AmqpValue deliveryAnnotations{Models::_detail::UniqueAmqpValueHandle{
amqpvalue_create_delivery_annotations(message.DeliveryAnnotations.AsAmqpValue())}};
auto serializedDeliveryAnnotations = AmqpValue::Serialize(deliveryAnnotations);
rv.insert(
rv.end(), serializedDeliveryAnnotations.begin(), serializedDeliveryAnnotations.end());
}
if (!message.MessageAnnotations.empty())
{
AmqpValue messageAnnotations{
amqpvalue_create_message_annotations(AmqpValue{message.MessageAnnotations})};
AmqpValue messageAnnotations{Models::_detail::UniqueAmqpValueHandle{
amqpvalue_create_message_annotations(message.MessageAnnotations.AsAmqpValue())}};
auto serializedAnnotations = AmqpValue::Serialize(messageAnnotations);
rv.insert(rv.end(), serializedAnnotations.begin(), serializedAnnotations.end());
}
@ -459,7 +455,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
}
appProperties.emplace(val);
}
AmqpValue propertiesValue{amqpvalue_create_application_properties(AmqpValue{appProperties})};
AmqpValue propertiesValue{Models::_detail::UniqueAmqpValueHandle{
amqpvalue_create_application_properties(appProperties.AsAmqpValue())}};
auto serializedApplicationProperties = AmqpValue::Serialize(propertiesValue);
rv.insert(
rv.end(), serializedApplicationProperties.begin(), serializedApplicationProperties.end());
@ -476,7 +473,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
// described body.
AmqpDescribed describedBody(
static_cast<std::uint64_t>(AmqpDescriptors::DataAmqpValue), message.m_amqpValueBody);
auto serializedBodyValue = AmqpValue::Serialize(static_cast<AmqpValue>(describedBody));
auto serializedBodyValue = AmqpValue::Serialize(describedBody.AsAmqpValue());
rv.insert(rv.end(), serializedBodyValue.begin(), serializedBodyValue.end());
}
break;
@ -484,8 +481,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
for (auto const& val : message.m_binaryDataBody)
{
AmqpDescribed describedBody(
static_cast<std::uint64_t>(AmqpDescriptors::DataBinary), static_cast<AmqpValue>(val));
auto serializedBodyValue = AmqpValue::Serialize(static_cast<AmqpValue>(describedBody));
static_cast<std::uint64_t>(AmqpDescriptors::DataBinary), val.AsAmqpValue());
auto serializedBodyValue = AmqpValue::Serialize(describedBody.AsAmqpValue());
rv.insert(rv.end(), serializedBodyValue.begin(), serializedBodyValue.end());
}
break;
@ -493,16 +490,16 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
for (auto const& val : message.m_amqpSequenceBody)
{
AmqpDescribed describedBody(
static_cast<std::uint64_t>(AmqpDescriptors::DataAmqpSequence),
static_cast<AmqpValue>(val));
auto serializedBodyValue = AmqpValue::Serialize(static_cast<AmqpValue>(describedBody));
static_cast<std::uint64_t>(AmqpDescriptors::DataAmqpSequence), val.AsAmqpValue());
auto serializedBodyValue = AmqpValue::Serialize(describedBody.AsAmqpValue());
rv.insert(rv.end(), serializedBodyValue.begin(), serializedBodyValue.end());
}
}
}
if (!message.Footer.empty())
{
AmqpValue footer{amqpvalue_create_footer(AmqpValue{message.Footer})};
AmqpValue footer{Models::_detail::UniqueAmqpValueHandle{
amqpvalue_create_footer(message.Footer.AsAmqpValue())}};
auto serializedFooter = AmqpValue::Serialize(footer);
rv.insert(rv.end(), serializedFooter.begin(), serializedFooter.end());
}
@ -547,7 +544,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
{
auto deserializer = static_cast<AmqpMessageDeserializer*>(context);
deserializer->OnAmqpMessageFieldDecoded(value);
deserializer->OnAmqpMessageFieldDecoded(
_detail::UniqueAmqpValueHandle{amqpvalue_clone(value)});
}
// Invoked when a message field

View File

@ -32,12 +32,12 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
// properties_get_message_id returns the value in-place.
if (!properties_get_message_id(properties.get(), &value))
{
rv.MessageId = value;
rv.MessageId = _detail::UniqueAmqpValueHandle{amqpvalue_clone(value)};
}
if (!properties_get_correlation_id(properties.get(), &value))
{
rv.CorrelationId = value;
rv.CorrelationId = _detail::UniqueAmqpValueHandle{amqpvalue_clone(value)};
}
{
@ -53,7 +53,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
if (!properties_get_to(properties.get(), &value))
{
rv.To = value;
rv.To = _detail::UniqueAmqpValueHandle{amqpvalue_clone(value)};
}
const char* stringValue;
@ -66,7 +66,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
if (!properties_get_reply_to(properties.get(), &value))
{
rv.ReplyTo = value;
rv.ReplyTo = _detail::UniqueAmqpValueHandle{amqpvalue_clone(value)};
}
if (!properties_get_content_type(properties.get(), &stringValue))
@ -114,7 +114,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
UniquePropertiesHandle _internal::MessagePropertiesFactory::ToUamqp(
MessageProperties const& properties)
{
Azure::Core::_internal::UniqueHandle<PROPERTIES_INSTANCE_TAG> returnValue(properties_create());
UniquePropertiesHandle returnValue(properties_create());
if (properties.MessageId.HasValue())
{
if (properties_set_message_id(returnValue.get(), properties.MessageId.Value()))
@ -276,7 +276,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
std::vector<uint8_t> MessageProperties::Serialize(MessageProperties const& properties)
{
auto handle = _internal::MessagePropertiesFactory::ToUamqp(properties);
AmqpValue propertiesAsValue{amqpvalue_create_properties(handle.get())};
Models::_detail::UniqueAmqpValueHandle propertiesAsuAMQPValue{
amqpvalue_create_properties(handle.get())};
AmqpValue propertiesAsValue{propertiesAsuAMQPValue};
return Models::AmqpValue::Serialize(propertiesAsValue);
}

View File

@ -44,6 +44,107 @@ namespace Azure { namespace Core { namespace _internal {
}}} // namespace Azure::Core::_internal
namespace Azure { namespace Core { namespace Amqp { namespace Models {
namespace _detail {
std::ostream& operator<<(std::ostream& os, AMQP_TYPE const& value)
{
switch (value)
{
case AMQP_TYPE_INVALID:
os << "INVALID";
break;
case AMQP_TYPE_NULL:
os << "NULL";
break;
case AMQP_TYPE_BOOL:
os << "BOOL";
break;
case AMQP_TYPE_UBYTE:
os << "UBYTE";
break;
case AMQP_TYPE_USHORT:
os << "USHORT";
break;
case AMQP_TYPE_UINT:
os << "UINT";
break;
case AMQP_TYPE_ULONG:
os << "ULONG";
break;
case AMQP_TYPE_BYTE:
os << "BYTE";
break;
case AMQP_TYPE_SHORT:
os << "SHORT";
break;
case AMQP_TYPE_INT:
os << "INT";
break;
case AMQP_TYPE_LONG:
os << "LONG";
break;
case AMQP_TYPE_FLOAT:
os << "FLOAT";
break;
case AMQP_TYPE_DOUBLE:
os << "DOUBLE";
break;
case AMQP_TYPE_CHAR:
os << "CHAR";
break;
case AMQP_TYPE_TIMESTAMP:
os << "TIMESTAMP";
break;
case AMQP_TYPE_UUID:
os << "UUID";
break;
case AMQP_TYPE_BINARY:
os << "BINARY";
break;
case AMQP_TYPE_STRING:
os << "STRING";
break;
case AMQP_TYPE_SYMBOL:
os << "SYMBOL";
break;
case AMQP_TYPE_LIST:
os << "LIST";
break;
case AMQP_TYPE_MAP:
os << "MAP";
break;
case AMQP_TYPE_ARRAY:
os << "ARRAY";
break;
case AMQP_TYPE_DESCRIBED:
os << "DESCRIBED";
break;
case AMQP_TYPE_COMPOSITE:
os << "COMPOSITE";
break;
case AMQP_TYPE_UNKNOWN:
os << "UNKNOWN";
break;
}
return os;
}
std::ostream& operator<<(std::ostream& os, AMQP_VALUE_DATA_TAG* const value)
{
if (value != nullptr)
{
os << "AMQP_VALUE: " << static_cast<void*>(value) << " " << amqpvalue_get_type(value)
<< ": ";
char* valueAsString{amqpvalue_to_string(value)};
os << valueAsString;
free(valueAsString);
}
else
{
os << "AMQP_VALUE: nullptr";
}
return os;
}
} // namespace _detail
AmqpValue::~AmqpValue() {}
@ -76,19 +177,21 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
: m_value{amqpvalue_clone(that.m_value.get())}
{
}
AmqpValue::AmqpValue(AmqpValue&& that) noexcept : m_value{that.m_value.release()}
{
that.m_value = nullptr;
}
///@cond
AmqpValue::AmqpValue(AMQP_VALUE_DATA_TAG* value)
AmqpValue::AmqpValue(UniqueAmqpValueHandle const& value)
{
// We shouldn't take ownership of the incoming value, so instead we clone it.
// if no value is provided, treat it as null.
if (value)
{
m_value.reset(amqpvalue_clone(value));
m_value.reset(amqpvalue_clone(value.get()));
}
else
{
@ -97,6 +200,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
}
/** @brief Internal accessor to convert an AmqpValue to a uAMQP AMQP_VALUE. */
AmqpValue::operator AMQP_VALUE_DATA_TAG*() const { return m_value.get(); }
AMQP_VALUE_DATA_TAG* AmqpValue::Release() { return m_value.release(); }
///@endcond
AmqpValue& AmqpValue::operator=(AmqpValue const& that)
@ -106,7 +211,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
}
AmqpValue& AmqpValue::operator=(AmqpValue&& that) noexcept
{
m_value.reset(that.m_value.release());
m_value = std::move(that.m_value);
return *this;
}
@ -257,7 +362,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
{
throw std::runtime_error("Could not retrieve uuid value");
}
std::array<uint8_t, 16> uuidArray;
std::array<uint8_t, 16> uuidArray{};
memcpy(uuidArray.data(), value, 16);
return Azure::Core::Uuid::CreateFromArray(uuidArray);
}
@ -423,7 +528,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
static void OnAmqpValueDecoded(void* context, AMQP_VALUE value)
{
auto deserializer = static_cast<AmqpValueDeserializer*>(context);
deserializer->m_decodedValue = value;
deserializer->m_decodedValue = _detail::UniqueAmqpValueHandle{amqpvalue_clone(value)};
}
};
class AmqpValueSerializer final {
@ -468,7 +573,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
std::ostream& operator<<(std::ostream& os, AmqpValue const& value)
{
char* valueAsString = amqpvalue_to_string(value);
char* valueAsString{amqpvalue_to_string(value)};
os << valueAsString;
free(valueAsString);
return os;
@ -507,7 +612,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
m_value.reserve(arraySize);
for (std::uint32_t i = 0; i < arraySize; i += 1)
{
m_value.push_back(amqpvalue_get_array_item(value, i));
// amqpvalue_get_array_item clones the value. We don't need to clone it again.
UniqueAmqpValueHandle item{amqpvalue_get_array_item(value, i)};
m_value.push_back(item);
}
}
AmqpArray::AmqpArray(initializer_type const& initializer) : AmqpCollectionBase(initializer)
@ -561,7 +668,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
key.reset(kv);
val.reset(vv);
}
m_value.emplace(std::make_pair(AmqpValue(key.get()), AmqpValue(val.get())));
m_value.emplace(std::make_pair(AmqpValue(key), AmqpValue(val)));
}
}
@ -593,7 +700,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
}
for (std::uint32_t i = 0; i < listSize; i += 1)
{
push_back(amqpvalue_get_list_item(value, i));
UniqueAmqpValueHandle item{amqpvalue_get_list_item(value, i)};
push_back(item);
}
}
@ -671,11 +779,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
return symbol;
}
AmqpTimestamp::operator AmqpValue() const
{
return static_cast<UniqueAmqpValueHandle>(*this).get();
}
namespace {
std::chrono::milliseconds GetMillisecondsFromAmqp(AMQP_VALUE value)
{
@ -716,10 +819,12 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
}
for (std::uint32_t i = 0; i < compositeSize; i += 1)
{
push_back(amqpvalue_get_composite_item_in_place(value, i));
push_back(_detail::UniqueAmqpValueHandle{
amqpvalue_clone(amqpvalue_get_composite_item_in_place(value, i))});
}
m_descriptor = amqpvalue_get_inplace_descriptor(value);
m_descriptor
= _detail::UniqueAmqpValueHandle{amqpvalue_clone(amqpvalue_get_inplace_descriptor(value))};
if (m_descriptor.IsNull())
{
throw std::runtime_error("Could not read descriptor for composite value.");
@ -733,6 +838,13 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
{
}
template <>
_detail::AmqpCollectionBase<std::vector<Models::AmqpValue>, AmqpComposite>::
operator UniqueAmqpValueHandle() const
{
return nullptr;
}
AmqpComposite::operator _detail::UniqueAmqpValueHandle() const
{
UniqueAmqpValueHandle composite{
@ -756,13 +868,15 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
throw std::runtime_error("Input AMQP value MUST be a described value.");
}
m_descriptor = amqpvalue_get_inplace_descriptor(value);
m_descriptor
= _detail::UniqueAmqpValueHandle{amqpvalue_clone(amqpvalue_get_inplace_descriptor(value))};
if (m_descriptor.IsNull())
{
throw std::runtime_error("Could not read descriptor for described value.");
}
m_value = amqpvalue_get_inplace_described_value(value);
m_value = _detail::UniqueAmqpValueHandle{
amqpvalue_clone(amqpvalue_get_inplace_described_value(value))};
if (m_value.IsNull())
{
throw std::runtime_error("Could not read descriptor for described value.");
@ -770,7 +884,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
}
AmqpDescribed::AmqpDescribed(AmqpSymbol const& descriptor, AmqpValue const& value)
: m_descriptor(static_cast<UniqueAmqpValueHandle>(descriptor).get()), m_value(value)
: m_descriptor{descriptor.AsAmqpValue()}, m_value{value}
{
}
AmqpDescribed::AmqpDescribed(uint64_t descriptor, AmqpValue const& value)
@ -788,9 +902,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
return composite;
}
AmqpValue AmqpDescribed::AsAmqpValue() const
{
return AmqpValue{static_cast<_detail::UniqueAmqpValueHandle>(*this)};
}
AmqpDescribed::operator AmqpValue const() const
{
return static_cast<UniqueAmqpValueHandle>(*this).get();
return static_cast<UniqueAmqpValueHandle>(*this);
}
namespace {
@ -864,7 +983,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
std::ostream& operator<<(std::ostream& os, AmqpArray const& value)
{
// Let the AmqpValue specialization handle serialization of the array.
AmqpValue arrayValue(value);
AmqpValue arrayValue(value.AsAmqpValue());
os << arrayValue;
return os;
}
@ -872,7 +991,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
std::ostream& operator<<(std::ostream& os, AmqpList const& value)
{
// Let the AmqpValue specialization handle serialization of the list.
AmqpValue arrayValue(value);
AmqpValue arrayValue(value.AsAmqpValue());
os << arrayValue;
return os;
}
@ -880,14 +999,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
std::ostream& operator<<(std::ostream& os, AmqpMap const& value)
{
// Let the AmqpValue specialization handle serialization of the map.
AmqpValue mapValue(value);
AmqpValue mapValue(value.AsAmqpValue());
os << mapValue;
return os;
}
std::ostream& operator<<(std::ostream& os, AmqpSymbol const& value)
{
// Let the AmqpValue specialization handle serialization of the array.
AmqpValue arrayValue(static_cast<AmqpValue>(value));
AmqpValue arrayValue(value.AsAmqpValue());
os << arrayValue;
return os;
}

View File

@ -161,9 +161,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
}
if (!options.DynamicNodeProperties.empty())
{
if (source_set_dynamic_node_properties(
m_source.get(),
static_cast<_detail::UniqueAmqpValueHandle>(options.DynamicNodeProperties).get()))
AmqpValue dynamicNodeProperties(options.DynamicNodeProperties.AsAmqpValue());
if (source_set_dynamic_node_properties(m_source.get(), dynamicNodeProperties))
{
throw std::runtime_error("Could not set dynamic node properties.");
}
@ -177,8 +176,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
}
if (!options.Filter.empty())
{
if (source_set_filter(
m_source.get(), static_cast<_detail::UniqueAmqpValueHandle>(options.Filter).get()))
if (source_set_filter(m_source.get(), options.Filter.AsAmqpValue()))
{
throw std::runtime_error("Could not set filter set.");
}
@ -192,17 +190,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
}
if (!options.Outcomes.empty())
{
if (source_set_outcomes(
m_source.get(), static_cast<_detail::UniqueAmqpValueHandle>(options.Outcomes).get()))
if (source_set_outcomes(m_source.get(), options.Outcomes.AsAmqpValue()))
{
throw std::runtime_error("Could not set outcomes.");
}
}
if (!options.Capabilities.empty())
{
if (source_set_capabilities(
m_source.get(),
static_cast<_detail::UniqueAmqpValueHandle>(options.Capabilities).get()))
if (source_set_capabilities(m_source.get(), options.Capabilities.AsAmqpValue()))
{
throw std::runtime_error("Could not set capabilities.");
}
@ -212,7 +207,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
// Convert the MessageSource into a Value.
Models::AmqpValue MessageSource::AsAmqpValue() const
{
return amqpvalue_create_source(m_source.get());
Models::_detail::UniqueAmqpValueHandle sourceValue{amqpvalue_create_source(m_source.get())};
return sourceValue;
}
Models::AmqpValue MessageSource::GetAddress() const
@ -222,7 +218,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
{
throw std::runtime_error("Could not retrieve address from source.");
}
return address;
// source_get_address does not reference its value, so we need to reference it before creating
// an AmqpValueHandle.
return Models::_detail::UniqueAmqpValueHandle{amqpvalue_clone(address)};
}
TerminusDurability MessageSource::GetTerminusDurability() const
@ -326,7 +324,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
{
throw std::runtime_error("Could not get default outcome.");
}
return value;
// source_get_default_outcome does not reference the value returned, we reference it so it can
// be put into a UniqueAmqpValueHandle.
return Models::_detail::UniqueAmqpValueHandle{amqpvalue_clone(value)};
}
Models::AmqpArray MessageSource::GetOutcomes() const
@ -432,14 +432,22 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
AMQP_VALUE outcome;
if (!source_get_default_outcome(source, &outcome))
{
os << ", Default Outcome: " << AmqpValue{outcome};
// source_get_default_outcome does not reference the value returned, we reference it so it
// can be put into a UniqueAmqpValueHandle.
os << ", Default Outcome: "
<< AmqpValue{Models::_detail::UniqueAmqpValueHandle{amqpvalue_clone(outcome)}};
}
}
{
AMQP_VALUE outcomes;
if (!source_get_outcomes(source, &outcomes))
{
os << ", Outcomes: " << AmqpValue{outcomes};
// Most of the time, source_get_outcomes does not reference its input. However, if the
// composite value at location 9 is a symbol, it does reference it. As a consequence, this
// will leak an AMQPSymbol if the value at location 9 is a symbol (as opposed to being an
// array).
os << ", Outcomes: "
<< AmqpValue{Models::_detail::UniqueAmqpValueHandle{amqpvalue_clone(outcomes)}};
}
}
os << "}";

View File

@ -3,6 +3,8 @@
#include "azure/core/amqp/internal/models/message_target.hpp"
#include <azure/core/internal/diagnostics/log.hpp>
#include <azure_uamqp_c/amqp_definitions_fields.h>
#include <azure_uamqp_c/amqp_definitions_terminus_durability.h>
#include <azure_uamqp_c/amqp_definitions_terminus_expiry_policy.h>
@ -15,6 +17,9 @@
#include <iostream>
using namespace Azure::Core::Diagnostics::_internal;
using namespace Azure::Core::Diagnostics;
namespace Azure { namespace Core { namespace _internal {
void UniqueHandleHelper<TARGET_INSTANCE_TAG>::FreeMessageTarget(TARGET_HANDLE value)
{
@ -23,19 +28,18 @@ namespace Azure { namespace Core { namespace _internal {
}}} // namespace Azure::Core::_internal
namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace _internal {
extern const char* StringFromTerminusDurability(TerminusDurability);
// MessageTarget::MessageTarget(TARGET_HANDLE handle) : m_target{handle} {}
MessageTarget::MessageTarget(Models::AmqpValue const& source)
MessageTarget::MessageTarget(Models::AmqpValue const& target)
{
if (source.IsNull())
if (target.IsNull())
{
throw std::invalid_argument("source cannot be null");
throw std::invalid_argument("target cannot be null");
}
TARGET_HANDLE targetHandle;
if (amqpvalue_get_target(source, &targetHandle))
if (amqpvalue_get_target(target, &targetHandle))
{
throw std::runtime_error("Could not retrieve source from value.");
throw std::runtime_error("Could not retrieve target from value.");
}
m_target.reset(targetHandle);
}
@ -69,7 +73,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
MessageTarget& MessageTarget::operator=(MessageTarget const& that)
{
m_target.reset(target_clone(that.m_target.get()));
m_target.reset(target_clone(that));
return *this;
}
@ -151,8 +155,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
if (!options.DynamicNodeProperties.empty())
{
if (target_set_dynamic_node_properties(
m_target.get(),
static_cast<_detail::UniqueAmqpValueHandle>(options.DynamicNodeProperties).get()))
m_target.get(), options.DynamicNodeProperties.AsAmqpValue()))
{
throw std::runtime_error("Could not set dynamic node properties.");
}
@ -160,9 +163,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
if (!options.Capabilities.empty())
{
if (target_set_capabilities(
m_target.get(),
static_cast<_detail::UniqueAmqpValueHandle>(options.Capabilities).get()))
if (target_set_capabilities(m_target.get(), options.Capabilities.AsAmqpValue()))
{
throw std::runtime_error("Could not set capabilities.");
}
@ -172,7 +173,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
// Convert the MessageSource into a Value.
Models::AmqpValue MessageTarget::AsAmqpValue() const
{
return amqpvalue_create_target(m_target.get());
Models::_detail::UniqueAmqpValueHandle targetValue{amqpvalue_create_target(m_target.get())};
return targetValue;
}
Models::AmqpValue MessageTarget::GetAddress() const
@ -182,7 +184,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
{
throw std::runtime_error("Could not retrieve address from source.");
}
return address;
// target_get_address does not reference the underlying address so we need to addref it here so
// it gets freed properly.
return Models::_detail::UniqueAmqpValueHandle{amqpvalue_clone(address)};
}
TerminusDurability MessageTarget::GetTerminusDurability() const
@ -254,11 +258,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
Models::AmqpMap MessageTarget::GetDynamicNodeProperties() const
{
AMQP_VALUE value;
// Note: target_get_dynamic_node_properties does NOT reference the value.
if (target_get_dynamic_node_properties(m_target.get(), &value))
{
throw std::runtime_error("Could not get dynamic.");
}
return AmqpValue{value}.AsMap();
// We clone the value before converting it to a UniqueAmqpValueHandle because the destructor for
// UniqueAmqpValueHandle will remove the reference.
return AmqpValue{Models::_detail::UniqueAmqpValueHandle{amqpvalue_clone(value)}}.AsMap();
}
Models::AmqpArray MessageTarget::GetCapabilities() const
@ -271,8 +278,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
return value;
}
extern const char* StringFromTerminusDurability(TerminusDurability);
std::ostream& operator<<(std::ostream& os, MessageTarget const& target)
{
os << "Target{ ";

View File

@ -10,7 +10,7 @@
namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace _internal {
Models::AmqpValue Messaging::DeliveryAccepted()
{
auto rv = messaging_delivery_accepted();
Models::_detail::UniqueAmqpValueHandle rv{messaging_delivery_accepted()};
if (!rv)
{
throw std::runtime_error("Could not allocate delivery accepted described value.");
@ -19,7 +19,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
}
Models::AmqpValue Messaging::DeliveryReleased()
{
auto rv = messaging_delivery_released();
Models::_detail::UniqueAmqpValueHandle rv{messaging_delivery_released()};
if (!rv)
{
throw std::runtime_error("Could not allocate delivery released described value.");
@ -28,7 +28,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
}
Models::AmqpValue Messaging::DeliveryReceived(uint32_t sectionNumber, uint64_t sectionOffset)
{
auto rv = messaging_delivery_received(sectionNumber, sectionOffset);
Models::_detail::UniqueAmqpValueHandle rv{
messaging_delivery_received(sectionNumber, sectionOffset)};
if (!rv)
{
throw std::runtime_error("Could not allocate delivery received described value.");
@ -40,10 +41,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
std::string const& errorDescription,
AmqpValue const& errorInfo)
{
auto rv = messaging_delivery_rejected(
Models::_detail::UniqueAmqpValueHandle rv{messaging_delivery_rejected(
errorCondition.empty() ? nullptr : errorCondition.c_str(),
errorDescription.empty() ? nullptr : errorDescription.c_str(),
errorInfo);
errorInfo)};
if (!rv)
{
throw std::runtime_error("Could not allocate delivery rejected described value.");
@ -55,7 +56,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
bool undeliverableHere,
Models::AmqpValue annotations)
{
auto rv = messaging_delivery_modified(deliveryFailed, undeliverableHere, annotations);
Models::_detail::UniqueAmqpValueHandle rv{
messaging_delivery_modified(deliveryFailed, undeliverableHere, annotations)};
if (!rv)
{
throw std::runtime_error("Could not allocate delivery modified described value.");

View File

@ -133,7 +133,7 @@ TEST_F(TestMessage, TestBodyAmqpSequence)
{
AmqpMessage message;
message.SetBody({"Test", 95, static_cast<AmqpValue>(AmqpMap{{3, 5}, {4, 9}})});
message.SetBody({"Test", 95, AmqpMap{{3, 5}, {4, 9}}.AsAmqpValue()});
EXPECT_EQ(1, message.GetBodyAsAmqpList().size());
EXPECT_EQ("Test", static_cast<std::string>(message.GetBodyAsAmqpList()[0].at(0)));
@ -152,8 +152,7 @@ TEST_F(TestMessage, TestBodyAmqpSequence)
}
{
AmqpMessage message;
message.SetBody(
{{1}, {"Test", 3}, {"Test", 95, static_cast<AmqpValue>(AmqpMap{{3, 5}, {4, 9}})}});
message.SetBody({{1}, {"Test", 3}, {"Test", 95, AmqpMap{{3, 5}, {4, 9}}.AsAmqpValue()}});
EXPECT_EQ(3, message.GetBodyAsAmqpList().size());
EXPECT_EQ("Test", static_cast<std::string>(message.GetBodyAsAmqpList()[1].at(0)));
EXPECT_EQ(95, static_cast<int32_t>(message.GetBodyAsAmqpList()[2].at(1)));
@ -231,7 +230,7 @@ TEST_F(MessageSerialization, SerializeMessageBodyBinary)
std::vector<uint8_t> buffer;
AmqpMessage message;
message.Properties.MessageId = "12345";
message.SetBody(static_cast<AmqpValue>(AmqpMap{{"key1", "value1"}, {"key2", "value2"}}));
message.SetBody(AmqpMap{{"key1", "value1"}, {"key2", "value2"}}.AsAmqpValue());
buffer = AmqpMessage::Serialize(message);
AmqpMessage deserialized = AmqpMessage::Deserialize(buffer.data(), buffer.size());
EXPECT_EQ(message, deserialized);

View File

@ -214,9 +214,9 @@ TEST_F(TestValues, TestBinary)
AmqpBinaryData binaryData;
binaryData.push_back('a');
binaryData.push_back(3);
AmqpValue value(static_cast<_detail::UniqueAmqpValueHandle>(binaryData).get());
AmqpValue value{binaryData.AsAmqpValue()};
EXPECT_FALSE(value < AmqpValue(static_cast<_detail::UniqueAmqpValueHandle>(binaryData).get()));
EXPECT_FALSE(value < binaryData.AsAmqpValue());
AmqpBinaryData data2(value);
EXPECT_EQ(2, data2.size());
@ -250,10 +250,10 @@ TEST_F(TestValues, TestList)
EXPECT_EQ(AmqpValueType::Byte, list1[3].GetType());
EXPECT_EQ(AmqpValue('a'), list1[3]);
AmqpValue value(static_cast<_detail::UniqueAmqpValueHandle>(list1).get());
AmqpValue value{list1.AsAmqpValue()};
const AmqpList list2(value);
EXPECT_FALSE(value < AmqpValue(static_cast<_detail::UniqueAmqpValueHandle>(list1).get()));
EXPECT_FALSE(value < list1.AsAmqpValue());
EXPECT_EQ(4, list2.size());
@ -268,8 +268,8 @@ TEST_F(TestValues, TestList)
AmqpList test;
AmqpDescribed desc{
static_cast<uint64_t>(29),
static_cast<AmqpValue>(AmqpList{AmqpValue{"test:error"}, AmqpValue{"test description"}})};
test.push_back(AmqpValue{desc});
AmqpList{AmqpValue{"test:error"}, AmqpValue{"test description"}}.AsAmqpValue()};
test.push_back(desc.AsAmqpValue());
EXPECT_EQ(1, test.size());
EXPECT_EQ(AmqpValueType::Described, test[0].GetType());
@ -291,7 +291,7 @@ TEST_F(TestValues, TestList)
AmqpComposite testAsComposite{test[0].AsComposite()};
EXPECT_EQ(testAsComposite.GetDescriptor(), AmqpValue{static_cast<uint64_t>(29ll)});
{
AmqpValue testAsValue{test};
AmqpValue testAsValue{test.AsAmqpValue()};
EXPECT_EQ(AmqpValueType::List, testAsValue.GetType());
auto testAsList{testAsValue.AsList()};
@ -329,9 +329,9 @@ TEST_F(TestValues, TestMap)
EXPECT_EQ(std::string("ABC"), static_cast<std::string>(map1[AmqpValue(3)]));
// Now round-trip the map through an AMQP value and confirm that the values persist.
AmqpValue valueOfMap = static_cast<_detail::UniqueAmqpValueHandle>(map1).get();
AmqpValue valueOfMap = map1.AsAmqpValue();
AmqpMap map2(valueOfMap);
EXPECT_FALSE(valueOfMap < AmqpValue(static_cast<_detail::UniqueAmqpValueHandle>(map1).get()));
EXPECT_FALSE(valueOfMap < map1.AsAmqpValue());
EXPECT_EQ(5, static_cast<int32_t>(map2["ABC"]));
EXPECT_EQ(std::string("ABC"), static_cast<std::string>(map2[AmqpValue(3)]));
@ -345,7 +345,7 @@ TEST_F(TestValues, TestArray)
EXPECT_EQ(5, array1.size());
AmqpValue value = static_cast<AmqpValue>(array1);
AmqpValue value = array1.AsAmqpValue();
EXPECT_EQ(AmqpValueType::Array, value.GetType());
const AmqpArray array2 = value.AsArray();
@ -354,20 +354,9 @@ TEST_F(TestValues, TestArray)
EXPECT_EQ(3, static_cast<std::int32_t>(array2.at(1)));
EXPECT_EQ(5, static_cast<std::int32_t>(array2.at(2)));
EXPECT_FALSE(array1 < array2);
EXPECT_FALSE(value < AmqpValue(static_cast<_detail::UniqueAmqpValueHandle>(array2).get()));
}
TEST_F(TestValues, TestArray1)
{
AmqpArray array1{1};
EXPECT_EQ(1, array1.size());
AmqpValue value = static_cast<AmqpValue>(array1);
EXPECT_EQ(AmqpValueType::Array, value.GetType());
GTEST_LOG_(INFO) << "Copy AMQP value as array to a new value";
const AmqpArray array2 = value.AsArray();
{
EXPECT_FALSE(value < array2.AsAmqpValue());
}
}
TEST_F(TestValues, TestArrayDifferentTypes)
@ -399,7 +388,7 @@ TEST_F(TestValues, TestTimestamp)
std::chrono::system_clock::now().time_since_epoch())};
AmqpTimestamp value{timeNow};
EXPECT_EQ(static_cast<std::chrono::milliseconds>(value), timeNow);
AmqpValue av{value};
AmqpValue av{value.AsAmqpValue()};
AmqpTimestamp ts2{av.AsTimestamp()};
EXPECT_EQ(timeNow, static_cast<std::chrono::milliseconds>(ts2));
@ -429,7 +418,7 @@ TEST_F(TestValues, TestCompositeValue)
{
{
AmqpComposite value("My Composite Type", {1, 2, 5.5, "ABC", 5});
EXPECT_EQ(AmqpValueType::Composite, AmqpValue(value).GetType());
EXPECT_EQ(AmqpValueType::Composite, value.AsAmqpValue().GetType());
EXPECT_EQ(5, value.size());
}
@ -455,7 +444,7 @@ TEST_F(TestValues, TestCompositeValue)
// Put some things in the map.
{
AmqpComposite compositeVal(static_cast<uint64_t>(116ull), {25, 25.0f});
AmqpValue value = static_cast<AmqpValue>(compositeVal);
AmqpValue value = compositeVal.AsAmqpValue();
AmqpComposite testVal(value.AsComposite());
EXPECT_EQ(compositeVal.size(), testVal.size());
@ -476,7 +465,7 @@ TEST_F(TestValues, TestDescribed)
EXPECT_EQ(AmqpSymbol("My Composite Type"), described1.GetDescriptor().AsSymbol());
EXPECT_EQ(5, static_cast<int32_t>(described1.GetValue()));
AmqpValue value = static_cast<AmqpValue>(described1);
AmqpValue value = described1.AsAmqpValue();
EXPECT_EQ(AmqpValueType::Described, value.GetType());
AmqpDescribed described2 = value.AsDescribed();
@ -493,7 +482,7 @@ TEST_F(TestValues, TestDescribed)
EXPECT_EQ(937, static_cast<uint64_t>(value.GetDescriptor()));
EXPECT_EQ(5, static_cast<int32_t>(value.GetValue()));
AmqpValue value2 = static_cast<AmqpValue>(value);
AmqpValue value2 = value.AsAmqpValue();
AmqpDescribed described2 = value2.AsDescribed();
EXPECT_EQ(AmqpValueType::Described, value2.GetType());
@ -1272,7 +1261,7 @@ TEST_F(TestValueSerialization, SerializeList)
// First form, serialized as first form.
{
AmqpList emptyList;
AmqpValue value{emptyList};
AmqpValue value{emptyList.AsAmqpValue()};
std::vector<uint8_t> val = AmqpValue::Serialize(value);
EXPECT_EQ(val.size(), 1);
EXPECT_EQ(0x45, val[0]);

View File

@ -47,6 +47,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
auto openResult = management.Open();
EXPECT_EQ(openResult, ManagementOpenStatus::Error);
}
TEST_F(TestManagement, ManagementOpenClose)
{
MessageTests::AmqpServerMock mockServer;
@ -69,6 +70,56 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
mockServer.StopListening();
}
TEST_F(TestManagement, ManagementOpenCloseAuthenticated)
{
MessageTests::AmqpServerMock mockServer;
auto sasCredential = std::make_shared<ServiceBusSasConnectionStringCredential>(
"Endpoint=amqp://localhost:" + std::to_string(mockServer.GetPort())
+ "/;SharedAccessKeyName=MyTestKey;SharedAccessKey=abcdabcd;EntityPath=testLocation");
ConnectionOptions connectionOptions;
connectionOptions.Port = mockServer.GetPort();
Connection connection("localhost", sasCredential, connectionOptions);
Session session{connection.CreateSession({})};
ManagementClientOptions options;
options.EnableTrace = 1;
ManagementClient management(session.CreateManagementClient("Test", options));
mockServer.StartListening();
auto openResult = management.Open();
EXPECT_EQ(openResult, ManagementOpenStatus::Ok);
management.Close();
mockServer.StopListening();
}
TEST_F(TestManagement, ManagementOpenCloseError)
{
MessageTests::AmqpServerMock mockServer;
ConnectionOptions connectionOptions;
connectionOptions.Port = mockServer.GetPort();
Connection connection("localhost", nullptr, connectionOptions);
Session session{connection.CreateSession({})};
ManagementClientOptions options;
options.EnableTrace = 1;
ManagementClient management(session.CreateManagementClient("Test", options));
mockServer.StartListening();
Azure::Core::Context context;
context.Cancel();
EXPECT_EQ(management.Open(context), ManagementOpenStatus::Cancelled);
management.Close();
mockServer.StopListening();
}
#endif // !defined(AZ_PLATFORM_MAC)
#if !defined(AZ_PLATFORM_MAC)
@ -162,7 +213,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
ConnectionOptions connectionOptions;
connectionOptions.Port = mockServer.GetPort();
Connection connection("localhost", nullptr, connectionOptions);
auto sasCredential = std::make_shared<ServiceBusSasConnectionStringCredential>(
"Endpoint=amqp://localhost:" + std::to_string(mockServer.GetPort())
+ "/;SharedAccessKeyName=MyTestKey;SharedAccessKey=abcdabcd;EntityPath=testLocation");
Connection connection("localhost", sasCredential, connectionOptions);
Session session{connection.CreateSession({})};
ManagementClientOptions options;
options.EnableTrace = 1;

View File

@ -23,7 +23,7 @@
namespace Azure { namespace Core { namespace Amqp { namespace Tests {
extern uint16_t FindAvailableSocket();
class TestMessages : public testing::Test {
class TestMessageSendReceive : public testing::Test {
protected:
void SetUp() override {}
void TearDown() override {}
@ -33,7 +33,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
using namespace Azure::Core::Amqp;
#if !defined(AZ_PLATFORM_MAC)
TEST_F(TestMessages, SimpleReceiver)
TEST_F(TestMessageSendReceive, SimpleReceiver)
{
// Create a connection
@ -48,8 +48,16 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
MessageReceiver receiver1(session.CreateMessageReceiver("MySource", {}, nullptr));
MessageReceiver receiver2(session.CreateMessageReceiver("MySource", {}, nullptr));
}
GTEST_LOG_(INFO) << _internal::MessageReceiverState::Invalid
<< _internal::MessageReceiverState::Closing
<< _internal::MessageReceiverState::Idle
<< _internal::MessageReceiverState::Opening
<< _internal::MessageReceiverState::Open
<< _internal::MessageReceiverState::Error;
EXPECT_ANY_THROW(GTEST_LOG_(INFO) << static_cast<_internal::MessageReceiverState>(5993));
}
TEST_F(TestMessages, ReceiverProperties)
TEST_F(TestMessageSendReceive, ReceiverProperties)
{ // Create a connection
Connection connection("localhost", nullptr, {});
Session session{connection.CreateSession()};
@ -70,7 +78,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
}
}
TEST_F(TestMessages, SimpleSender)
TEST_F(TestMessageSendReceive, SimpleSender)
{
// Create a connection
@ -85,8 +93,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
MessageSender sender1(session.CreateMessageSender("MySource", {}, nullptr));
MessageSender sender2(session.CreateMessageSender("MySource", {}, nullptr));
}
GTEST_LOG_(INFO) << _internal::MessageSenderState::Invalid
<< _internal::MessageSenderState::Closing
<< _internal::MessageSenderState::Idle
<< _internal::MessageSenderState::Opening
<< _internal::MessageSenderState::Open << _internal::MessageSenderState::Error;
EXPECT_ANY_THROW(GTEST_LOG_(INFO) << static_cast<_internal::MessageSenderState>(5993));
}
TEST_F(TestMessages, SenderProperties)
TEST_F(TestMessageSendReceive, SenderProperties)
{ // Create a connection
Connection connection("localhost", nullptr, {});
Session session{connection.CreateSession()};
@ -98,7 +112,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
}
}
TEST_F(TestMessages, ReceiverOpenClose)
TEST_F(TestMessageSendReceive, ReceiverOpenClose)
{
MessageTests::AmqpServerMock mockServer;
@ -156,7 +170,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
context.Cancel();
}
TEST_F(TestMessages, SenderOpenClose)
TEST_F(TestMessageSendReceive, SenderOpenClose)
{
uint16_t testPort = FindAvailableSocket();
GTEST_LOG_(INFO) << "Test port: " << testPort;
@ -181,7 +195,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
listener.Stop();
}
TEST_F(TestMessages, TestLocalhostVsTls)
TEST_F(TestMessageSendReceive, TestLocalhostVsTls)
{
MessageTests::AmqpServerMock mockServer(5671);
@ -222,6 +236,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
session.CreateMessageSender("localhost/ingress", options, &senderEvents));
EXPECT_NO_THROW(sender.Open());
EXPECT_EQ(65536, sender.GetMaxMessageSize());
Azure::Core::Amqp::Models::AmqpMessage message;
message.SetBody(Azure::Core::Amqp::Models::AmqpBinaryData{'h', 'e', 'l', 'l', 'o'});
@ -249,7 +265,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
mockServer.StopListening();
}
TEST_F(TestMessages, SenderSendAsync)
TEST_F(TestMessageSendReceive, SenderSendAsync)
{
MessageTests::AmqpServerMock mockServer{};
@ -309,7 +325,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
mockServer.StopListening();
}
TEST_F(TestMessages, SenderSendSync)
TEST_F(TestMessageSendReceive, SenderSendSync)
{
MessageTests::AmqpServerMock mockServer{};
ConnectionOptions connectionOptions;
@ -352,7 +368,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
mockServer.StopListening();
}
TEST_F(TestMessages, AuthenticatedSender)
TEST_F(TestMessageSendReceive, AuthenticatedSender)
{
MessageTests::AmqpServerMock server;
@ -388,7 +404,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
server.StopListening();
}
TEST_F(TestMessages, AuthenticatedSenderAzureToken)
TEST_F(TestMessageSendReceive, AuthenticatedSenderAzureToken)
{
class AzureTokenCredential : public Azure::Core::Credentials::TokenCredential {
Azure::Core::Credentials::AccessToken GetToken(
@ -441,7 +457,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
server.StopListening();
}
TEST_F(TestMessages, AuthenticatedReceiver)
TEST_F(TestMessageSendReceive, AuthenticatedReceiver)
{
class ReceiverMock : public MessageTests::AmqpServerMock {
public:
@ -465,8 +481,13 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
sendMessage.SetBody(Azure::Core::Amqp::Models::AmqpValue{"This is a message body."});
if (m_linkMessageQueues.at(m_senderNodeName).LinkSender)
{
m_linkMessageQueues.at(m_senderNodeName).LinkSender->Send(sendMessage);
GTEST_LOG_(INFO) << "Sent, resetting should send.";
m_shouldSendMessage = false;
m_linkMessageQueues.at(m_senderNodeName).LinkSender->Send(sendMessage);
}
else
{
GTEST_LOG_(INFO) << "No sender, skipping";
}
}
}
@ -494,6 +515,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
receiverOptions.MessageTarget = "egress";
receiverOptions.SettleMode = Azure::Core::Amqp::_internal::ReceiverSettleMode::First;
receiverOptions.MaxMessageSize = 65536;
receiverOptions.MaxLinkCredit = 500; // We allow at most 500 messages to be received.
receiverOptions.Name = "receiver-link";
receiverOptions.EnableTrace = true;
MessageReceiver receiver(session.CreateMessageReceiver(
@ -505,6 +527,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
{
server.ShouldSendMessage(true);
auto message = receiver.WaitForIncomingMessage();
GTEST_LOG_(INFO) << "Received message.";
ASSERT_TRUE(message.first.HasValue());
ASSERT_FALSE(message.second);
EXPECT_EQ(
@ -519,11 +542,25 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
auto message = receiver.WaitForIncomingMessage(receiveContext),
Azure::Core::OperationCancelledException);
}
{
auto result = receiver.TryWaitForIncomingMessage();
EXPECT_FALSE(result.first.HasValue());
}
{
GTEST_LOG_(INFO) << "Trigger message send for polling.";
server.ShouldSendMessage(true);
std::this_thread::sleep_for(std::chrono::milliseconds(5000));
GTEST_LOG_(INFO) << "Message should have been sent and processed.";
auto result = receiver.TryWaitForIncomingMessage();
EXPECT_TRUE(result.first.HasValue());
}
receiver.Close();
server.StopListening();
}
TEST_F(TestMessages, AuthenticatedReceiverAzureToken)
TEST_F(TestMessageSendReceive, AuthenticatedReceiverAzureToken)
{
class ReceiverMock : public MessageTests::AmqpServerMock {
public:
@ -626,5 +663,89 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
receiver.Close();
server.StopListening();
}
TEST_F(TestMessageSendReceive, AuthenticatedReceiverTryReceive)
{
class ReceiverMock : public MessageTests::AmqpServerMock {
public:
void ShouldSendMessage(bool shouldSend) { m_shouldSendMessage = shouldSend; }
void SetSenderNodeName(std::string const& senderNodeName)
{
m_senderNodeName = senderNodeName;
}
private:
mutable bool m_shouldSendMessage{false};
std::string m_senderNodeName;
void Poll() const override
{
if (m_shouldSendMessage
&& m_linkMessageQueues.find(m_senderNodeName) != m_linkMessageQueues.end())
{
GTEST_LOG_(INFO) << "Sending message to client." + m_senderNodeName;
Azure::Core::Amqp::Models::AmqpMessage sendMessage;
sendMessage.SetBody(Azure::Core::Amqp::Models::AmqpValue{"This is a message body."});
if (m_linkMessageQueues.at(m_senderNodeName).LinkSender)
{
GTEST_LOG_(INFO) << "Sent, resetting should send.";
m_shouldSendMessage = false;
m_linkMessageQueues.at(m_senderNodeName).LinkSender->Send(sendMessage);
}
else
{
GTEST_LOG_(INFO) << "No sender, skipping";
}
}
}
};
ReceiverMock server;
auto sasCredential = std::make_shared<ServiceBusSasConnectionStringCredential>(
"Endpoint=amqp://localhost:" + std::to_string(server.GetPort())
+ "/;SharedAccessKeyName=MyTestKey;SharedAccessKey=abcdabcd;EntityPath=testLocation");
ConnectionOptions connectionOptions;
// connectionOptions.IdleTimeout = std::chrono::minutes(5);
connectionOptions.ContainerId = testing::UnitTest::GetInstance()->current_test_info()->name();
connectionOptions.Port = server.GetPort();
Connection connection("localhost", sasCredential, connectionOptions);
Session session{connection.CreateSession()};
server.SetSenderNodeName(sasCredential->GetEndpoint() + sasCredential->GetEntityPath());
server.StartListening();
MessageReceiverOptions receiverOptions;
receiverOptions.Name = "receiver-link";
receiverOptions.MessageTarget = "egress";
receiverOptions.SettleMode = Azure::Core::Amqp::_internal::ReceiverSettleMode::First;
receiverOptions.MaxMessageSize = 65536;
receiverOptions.MaxLinkCredit = 500; // We allow at most 500 messages to be received.
receiverOptions.Name = "receiver-link";
receiverOptions.EnableTrace = true;
MessageReceiver receiver(session.CreateMessageReceiver(
sasCredential->GetEndpoint() + sasCredential->GetEntityPath(), receiverOptions, nullptr));
receiver.Open();
{
auto result = receiver.TryWaitForIncomingMessage();
EXPECT_FALSE(result.first.HasValue());
}
{
GTEST_LOG_(INFO) << "Trigger message send for polling.";
server.ShouldSendMessage(true);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
GTEST_LOG_(INFO) << "Message should have been sent and processed.";
auto result = receiver.TryWaitForIncomingMessage();
EXPECT_TRUE(result.first.HasValue());
}
receiver.Close();
server.StopListening();
}
#endif // !defined(AZ_PLATFORM_MAC)
}}}} // namespace Azure::Core::Amqp::Tests

View File

@ -33,12 +33,12 @@ TEST_F(TestSourceTarget, SimpleSourceTarget)
{
EXPECT_ANY_THROW(MessageSource source(AmqpValue{}));
AmqpValue val = static_cast<AmqpValue>(AmqpArray());
AmqpValue val = AmqpArray().AsAmqpValue();
EXPECT_ANY_THROW(MessageSource source{val});
}
{
EXPECT_ANY_THROW(MessageTarget target(AmqpValue{}));
AmqpValue val = static_cast<AmqpValue>(AmqpArray());
AmqpValue val = AmqpArray().AsAmqpValue();
EXPECT_ANY_THROW(MessageTarget target(val));
}
}
@ -79,7 +79,7 @@ TEST_F(TestSourceTarget, TargetProperties)
{
MessageTargetOptions options;
options.Capabilities.push_back(static_cast<AmqpValue>(AmqpSymbol{"Test"}));
options.Capabilities.push_back(AmqpSymbol{"Test"}.AsAmqpValue());
MessageTarget target(options);
EXPECT_EQ(1, target.GetCapabilities().size());
EXPECT_EQ(AmqpValueType::Symbol, target.GetCapabilities()[0].GetType());
@ -201,6 +201,19 @@ TEST_F(TestSourceTarget, TargetProperties)
}
}
TEST_F(TestSourceTarget, TargetCreateCopy)
{
{
MessageTarget target("address1");
const AmqpValue v = target.AsAmqpValue();
// AmqpValue value(v);
// MessageTarget target2(value);
MessageTarget target2(v);
EXPECT_EQ(target.GetAddress(), target2.GetAddress());
}
}
TEST_F(TestSourceTarget, TargetThroughValue)
{
MessageTarget target("address1");
@ -238,7 +251,7 @@ TEST_F(TestSourceTarget, SourceProperties)
{
MessageSourceOptions options;
options.Capabilities.push_back(static_cast<AmqpValue>(AmqpSymbol{"Test"}));
options.Capabilities.push_back(AmqpSymbol{"Test"}.AsAmqpValue());
MessageSource source(options);
EXPECT_EQ(1, source.GetCapabilities().size());
EXPECT_EQ(AmqpValueType::Symbol, source.GetCapabilities()[0].GetType());
@ -374,7 +387,7 @@ TEST_F(TestSourceTarget, SourceProperties)
{
MessageSourceOptions options;
options.Outcomes.push_back(static_cast<AmqpValue>(AmqpSymbol("Test")));
options.Outcomes.push_back(AmqpSymbol("Test").AsAmqpValue());
MessageSource source(options);
EXPECT_EQ(1, source.GetOutcomes().size());
EXPECT_EQ(AmqpValueType::Symbol, source.GetOutcomes().at(0).GetType());

View File

@ -104,6 +104,24 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
= m_linkMessageQueues[nodeName].MessageSenderPresentQueue.WaitForResult(context);
return result != nullptr;
}
std::unique_ptr<Azure::Core::Amqp::Models::AmqpMessage> TryWaitForMessage(
std::string const& nodeName)
{
// Poll for completion on both the mock server and the connection, that ensures that
// we can implement unsolicited sends from the Poll function.
auto result = m_linkMessageQueues[nodeName].MessageQueue.TryWaitForResult();
if (result)
{
return std::move(std::get<0>(*result));
}
else
{
Poll();
return nullptr;
}
}
std::unique_ptr<Azure::Core::Amqp::Models::AmqpMessage> WaitForMessage(
std::string const& nodeName)
{
@ -133,13 +151,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
std::string const& nodeName,
MessageLinkComponents const& linkComponents)
{
GTEST_LOG_(INFO) << "Wait for incoming message.";
auto message = WaitForMessage(nodeName);
if (!message)
{
GTEST_LOG_(INFO) << "No message, canceling thread";
}
else
auto message = TryWaitForMessage(nodeName);
if (message)
{
GTEST_LOG_(INFO) << "Received message: " << *message;
if (nodeName == "$cbs" && IsCbsMessage(*message))
@ -151,6 +164,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
MessageReceived(nodeName, linkComponents, *message);
}
}
std::this_thread::yield();
};
public:

View File

@ -1731,7 +1731,8 @@ void link_dowork(LINK_HANDLE link)
{
LogError("NULL link");
}
else
// The caller of link_dowork has no way of determining the state of the link, so skip doing work if the link is not attached.
else if (link->link_state == LINK_STATE_ATTACHED)
{
tickcounter_ms_t current_tick;

View File

@ -155,6 +155,26 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models {
*/
ReceivedEventData(Azure::Core::Amqp::Models::AmqpMessage const& message);
// Destructor
~ReceivedEventData() = default;
/** @brief Copy an ReceivedEventData to another.
*/
ReceivedEventData(ReceivedEventData const&) = default;
/** @brief Assign an ReceivedEventData to another.
* /
*/
ReceivedEventData& operator=(ReceivedEventData const&) = default;
/** @brief Create an ReceivedEventData moving from another.
*/
ReceivedEventData(ReceivedEventData&&) = default;
/** @brief Move an ReceivedEventData to another.
*/
ReceivedEventData& operator=(ReceivedEventData&&) = default;
/** @brief Get the raw AMQP message.
*
* Returns the underlying AMQP message that was received from the Event Hubs service.

View File

@ -6,10 +6,16 @@
#include "private/eventhubs_constants.hpp"
#include "private/eventhubs_utilities.hpp"
#include <azure/core/diagnostics/logger.hpp>
#include <azure/core/internal/diagnostics/log.hpp>
#include <iomanip>
#include <iostream>
#include <sstream>
using namespace Azure::Core::Diagnostics::_internal;
using namespace Azure::Core::Diagnostics;
namespace Azure { namespace Messaging { namespace EventHubs { namespace Models {
EventData::EventData(Azure::Core::Amqp::Models::AmqpMessage const& message)
@ -86,7 +92,14 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models {
{
// The Key in MessageAnnotations is normally an AmqpSymbol, cast it to a string Key when
// placing in SystemProperties.
SystemProperties.emplace(static_cast<std::string>(key), item.second);
std::string keyName = static_cast<std::string>(key);
auto result{SystemProperties.emplace(keyName, item.second)};
if (!result.second)
{
// If the key already exists, log a warning.
Log::Stream(Logger::Level::Warning)
<< "Duplicate key in MessageAnnotations: " << key << std::endl;
}
}
}
}

View File

@ -27,7 +27,7 @@ namespace Azure { namespace Messaging { namespace EventHubs {
Azure::Core::Amqp::Models::AmqpValue const& filterValue)
{
Azure::Core::Amqp::Models::AmqpDescribed value{description.Code, filterValue};
sourceOptions.Filter.emplace(description.Name, value);
sourceOptions.Filter.emplace(description.Name, value.AsAmqpValue());
}
FilterDescription SelectorFilter{"apache.org:selector-filter:string", 0x0000468c00000004};

View File

@ -157,9 +157,9 @@ TEST_F(EventDataTest, ReceivedEventData)
{
{
Azure::Core::Amqp::Models::AmqpMessage message;
message.MessageAnnotations[static_cast<Azure::Core::Amqp::Models::AmqpValue>(
Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::PartitionKeyAnnotation})]
message.MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::PartitionKeyAnnotation}
.AsAmqpValue()]
= "PartitionKey";
Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message);
ASSERT_TRUE(receivedEventData.PartitionKey);
@ -176,12 +176,13 @@ TEST_F(EventDataTest, ReceivedEventData)
GTEST_LOG_(INFO) << "timeNow: " << timeNow.ToString();
message.MessageAnnotations[static_cast<Azure::Core::Amqp::Models::AmqpValue>(
Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::EnqueuedTimeAnnotation})]
= static_cast<Azure::Core::Amqp::Models::AmqpValue>(
Azure::Core::Amqp::Models::AmqpTimestamp{
std::chrono::duration_cast<std::chrono::milliseconds>(timeNow.time_since_epoch())});
message.MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::EnqueuedTimeAnnotation}
.AsAmqpValue()]
= Azure::Core::Amqp::Models::AmqpTimestamp{std::chrono::duration_cast<
std::chrono::milliseconds>(
timeNow.time_since_epoch())}
.AsAmqpValue();
Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message);
ASSERT_TRUE(receivedEventData.EnqueuedTime.HasValue());
GTEST_LOG_(INFO) << "EnqueuedTime: " << receivedEventData.EnqueuedTime.Value().ToString();
@ -193,9 +194,9 @@ TEST_F(EventDataTest, ReceivedEventData)
{
Azure::Core::Amqp::Models::AmqpMessage message;
message.MessageAnnotations[static_cast<Azure::Core::Amqp::Models::AmqpValue>(
Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::SequenceNumberAnnotation})]
message.MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::SequenceNumberAnnotation}
.AsAmqpValue()]
= static_cast<int64_t>(235);
Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message);
ASSERT_TRUE(receivedEventData.SequenceNumber);
@ -206,9 +207,9 @@ TEST_F(EventDataTest, ReceivedEventData)
}
{
Azure::Core::Amqp::Models::AmqpMessage message;
message.MessageAnnotations[static_cast<Azure::Core::Amqp::Models::AmqpValue>(
Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation})]
message.MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation}
.AsAmqpValue()]
= 54644;
Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message);
ASSERT_TRUE(receivedEventData.Offset);
@ -219,9 +220,9 @@ TEST_F(EventDataTest, ReceivedEventData)
}
{
Azure::Core::Amqp::Models::AmqpMessage message;
message.MessageAnnotations[static_cast<Azure::Core::Amqp::Models::AmqpValue>(
Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation})]
message.MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation}
.AsAmqpValue()]
= "54644";
Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message);
ASSERT_TRUE(receivedEventData.Offset);
@ -232,9 +233,9 @@ TEST_F(EventDataTest, ReceivedEventData)
}
{
Azure::Core::Amqp::Models::AmqpMessage message;
message.MessageAnnotations[static_cast<Azure::Core::Amqp::Models::AmqpValue>(
Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation})]
message.MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation}
.AsAmqpValue()]
= static_cast<uint32_t>(53);
Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message);
ASSERT_TRUE(receivedEventData.Offset);
@ -245,9 +246,9 @@ TEST_F(EventDataTest, ReceivedEventData)
}
{
Azure::Core::Amqp::Models::AmqpMessage message;
message.MessageAnnotations[static_cast<Azure::Core::Amqp::Models::AmqpValue>(
Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation})]
message.MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation}
.AsAmqpValue()]
= static_cast<int32_t>(57);
Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message);
EXPECT_TRUE(receivedEventData.Offset);
@ -258,9 +259,9 @@ TEST_F(EventDataTest, ReceivedEventData)
}
{
Azure::Core::Amqp::Models::AmqpMessage message;
message.MessageAnnotations[static_cast<Azure::Core::Amqp::Models::AmqpValue>(
Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation})]
message.MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation}
.AsAmqpValue()]
= static_cast<uint64_t>(661011);
Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message);
EXPECT_TRUE(receivedEventData.Offset);
@ -271,9 +272,9 @@ TEST_F(EventDataTest, ReceivedEventData)
}
{
Azure::Core::Amqp::Models::AmqpMessage message;
message.MessageAnnotations[static_cast<Azure::Core::Amqp::Models::AmqpValue>(
Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation})]
message.MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation}
.AsAmqpValue()]
= static_cast<int64_t>(1412612);
Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message);
EXPECT_TRUE(receivedEventData.Offset);

View File

@ -111,7 +111,6 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
ProcessorOptions processorOptions;
processorOptions.LoadBalancingStrategy = processorStrategy;
processorOptions.UpdateInterval = std::chrono::milliseconds(1000);
processorOptions.Prefetch = 1500; // Set the initial link credits to 1500.
Processor processor{consumerClient, checkpointStore, processorOptions};
@ -594,17 +593,17 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
processor.Stop();
}
// The processor balanced and greedy tests fail when run on Linux or Mac. The tests run fine on
// Windows. For now, disable the tests on Linux and Mac.
// The processor balanced and greedy tests fail when run on Linux or Mac. The tests run fine on
// Windows. For now, disable the tests on Linux and Mac.
#if !defined(AZ_PLATFORM_LINUX) && !defined(AZ_PLATFORM_MAC)
TEST_F(ProcessorTest, Processor_Balanced_LIVEONLY_)
{
TestWithLoadBalancer(Models::ProcessorStrategy::ProcessorStrategyBalanced);
}
TEST_F(ProcessorTest, Processor_Greedy_LIVEONLY_)
{
TestWithLoadBalancer(Models::ProcessorStrategy::ProcessorStrategyGreedy);
}
TEST_F(ProcessorTest, Processor_Balanced_LIVEONLY_)
{
TestWithLoadBalancer(Models::ProcessorStrategy::ProcessorStrategyBalanced);
}
TEST_F(ProcessorTest, Processor_Greedy_LIVEONLY_)
{
TestWithLoadBalancer(Models::ProcessorStrategy::ProcessorStrategyGreedy);
}
#endif
#if 0
TEST_F(ProcessorTest, Processor_Balanced_AcquisitionOnly_LIVEONLY_)