diff --git a/sdk/core/azure-core-amqp/inc/azure/core/amqp/management.hpp b/sdk/core/azure-core-amqp/inc/azure/core/amqp/management.hpp index 1729b098b..ec3502da1 100644 --- a/sdk/core/azure-core-amqp/inc/azure/core/amqp/management.hpp +++ b/sdk/core/azure-core-amqp/inc/azure/core/amqp/management.hpp @@ -3,6 +3,7 @@ #pragma once +#include "models/amqp_error.hpp" #include "models/amqp_message.hpp" #include "session.hpp" @@ -92,8 +93,11 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal { public: /** @brief Called when an error occurs. + * + * @param error - the error which occurred. + * */ - virtual void OnError() = 0; + virtual void OnError(Models::_internal::AmqpError const& error) = 0; }; /** diff --git a/sdk/core/azure-core-amqp/inc/azure/core/amqp/message_receiver.hpp b/sdk/core/azure-core-amqp/inc/azure/core/amqp/message_receiver.hpp index 5cf6333c9..4f2c6e5b0 100644 --- a/sdk/core/azure-core-amqp/inc/azure/core/amqp/message_receiver.hpp +++ b/sdk/core/azure-core-amqp/inc/azure/core/amqp/message_receiver.hpp @@ -41,32 +41,44 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal { class MessageReceiver; - struct MessageReceiverOptions + struct MessageReceiverOptions final { + /** @brief The name of the link associated with the message sender. + * + * Links are named so that they can be recovered when communication is interrupted. Link names + * MUST uniquely identify the link amongst all links of the same direction between the two + * participating containers. Link names are only used when attaching a link, so they can be + * arbitrarily long without a significant penalty. + * + */ + std::string Name; - std::vector AuthenticationScopes; + + /** @brief The settle mode for the link associated with the message sender. + * + * This field indicates how the deliveries sent over the link SHOULD be settled. When this + * field is set to "mixed", the unsettled map MUST be sent even if it is empty. When this + * field is set to "settled", the value of the unsettled map MUST NOT be sent. See + * http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transactions-v1.0-os.html#doc-idp145616 + * for more details. + * + */ ReceiverSettleMode SettleMode{ReceiverSettleMode::First}; + + /** @brief The target for the link associated with the message receiver. */ Models::_internal::MessageTarget MessageTarget; - bool EnableTrace{false}; + + /** @brief The initial delivery count for the link associated with the message receiver. */ Nullable InitialDeliveryCount; + + /** @brief The Maximum message size for the link associated with the message receiver. */ Nullable MaxMessageSize; - bool Batching{false}; - std::chrono::seconds BatchMaxAge{std::chrono::seconds(5)}; - std::vector Capabilities; - uint32_t Credit{1}; - LinkDurability Durability{}; - bool DynamicAddress{false}; - ExpiryPolicy SenderExpiryPolicy{}; - ExpiryPolicy ReceiverExpiryPolicy{}; - std::chrono::seconds ExpiryTimeout{std::chrono::seconds(0)}; - // LinkFilter - bool ManualCredits{}; - Models::AmqpValue Properties; + /** @brief If true, the message receiver will generate low level events */ + bool EnableTrace{false}; - std::vector SenderCapabilities; - LinkDurability SenderDurability{}; - std::chrono::seconds SenderExpiryTimeout{std::chrono::seconds(0)}; + /** @brief If true, require that the message sender be authenticated with the service. */ + bool AuthenticationRequired{true}; }; class MessageReceiverEvents { @@ -86,6 +98,28 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal { virtual void OnMessageReceiverDisconnected(Models::_internal::AmqpError const& error) = 0; }; + /** @brief MessageReceiver + * + * The MessageReceiver class is responsible for receiving messages from a remote AMQP node. It is + * constructed by the Session::CreateMessageReceiver method. + * + * The message receiver operates in one of two possible models. + * + * In the first model, the message receiver caller registers for incoming messages by providing a + * MessageReceiverEvents callback object, and processes incoming messages in the OnMessageReceived + * method. + * + * In the second model, the caller calls the WaitForIncomingMessage method to wait for the next + * incoming message. + * + * The primary difference between the two models is that the first model allows the caller to + * alter the disposition of a message when it is received, the second model accepts all incoming + * messages. + * + * @remarks If the caller provides a MessageReceiverEvents callback, then the + * WaitForIncomingMessage API will throw an exception. + * + */ class MessageReceiver final { public: ~MessageReceiver() noexcept; @@ -95,11 +129,35 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal { MessageReceiver(MessageReceiver&&) = default; MessageReceiver& operator=(MessageReceiver&&) = default; + /** @brief Opens the message receiver. + * + * @param context The context for cancelling operations. + */ void Open(Context const& context = {}); + + /** @brief Closes the message receiver. + * + */ void Close(); + + /** @brief Gets the name of the underlying link. + * + * @return The name of the underlying link object. + */ std::string GetLinkName() const; + + /** @brief Gets the Address of the message receiver's source node. + * + * @return The name of the source node. + */ std::string GetSourceName() const; + /** @brief Waits until a message has been received. + * + * @param context The context for cancelling operations. + * + * @return A pair of the received message and the error if any. + */ std::pair, Models::_internal::AmqpError> WaitForIncomingMessage(Context const& context = {}); diff --git a/sdk/core/azure-core-amqp/inc/azure/core/amqp/message_sender.hpp b/sdk/core/azure-core-amqp/inc/azure/core/amqp/message_sender.hpp index 010717080..1170df439 100644 --- a/sdk/core/azure-core-amqp/inc/azure/core/amqp/message_sender.hpp +++ b/sdk/core/azure-core-amqp/inc/azure/core/amqp/message_sender.hpp @@ -102,6 +102,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal { /** @brief If true, the message sender will log trace events. */ bool EnableTrace{false}; + + /** @brief If true, require that the message sender be authenticated with the service. */ + bool AuthenticationRequired{true}; }; class MessageSender final { diff --git a/sdk/core/azure-core-amqp/inc/azure/core/amqp/models/amqp_value.hpp b/sdk/core/azure-core-amqp/inc/azure/core/amqp/models/amqp_value.hpp index 2842928f8..848ef0303 100644 --- a/sdk/core/azure-core-amqp/inc/azure/core/amqp/models/amqp_value.hpp +++ b/sdk/core/azure-core-amqp/inc/azure/core/amqp/models/amqp_value.hpp @@ -579,7 +579,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { public: /** @brief Convert this collection type to an AMQP value.*/ - operator AmqpValue() const { return static_cast(*this).get(); } + explicit operator AmqpValue() const + { + return static_cast(*this).get(); + } /** @brief Returns the size of the underlying value.*/ inline typename T::size_type size() const { return m_value.size(); } @@ -787,7 +790,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { /** * @brief Convert an existing AmqpSymbol to an AmqpValue. */ - operator AmqpValue() const; + explicit operator AmqpValue() const; /** * @brief Convert an AmqpSymbol instance to a uAMQP AMQP_VALUE. @@ -839,6 +842,19 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { */ AmqpComposite(AMQP_VALUE_DATA_TAG* const value); + /** @brief Compare this AmqpComposite value with another. + * + * @param that - the AmqpComposite to compare with. + */ + bool operator==(AmqpComposite const& that) const + { + if (GetDescriptor() == that.GetDescriptor()) + { + return m_value == that.m_value; + } + return false; + } + /** @brief Returns the descriptor for this composite type. * * @returns The descriptor for this composite type. @@ -858,7 +874,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { /** * @brief Convert an existing AmqpComposite to an AmqpValue. */ - operator AmqpValue() const { return static_cast<_detail::UniqueAmqpValueHandle>(*this).get(); } + explicit operator AmqpValue() const + { + return static_cast<_detail::UniqueAmqpValueHandle>(*this).get(); + } private: AmqpValue m_descriptor; @@ -905,10 +924,23 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { */ AmqpDescribed(AMQP_VALUE_DATA_TAG* const value); + /** @brief Compare this AmqpDescribed value with another. + * + * @param that - the AmqpDescribed to compare with. + */ + bool operator==(AmqpDescribed const& that) const + { + if (GetDescriptor() == that.GetDescriptor()) + { + return GetValue() == that.GetValue(); + } + return false; + } + /** * @brief Convert an existing AmqpComposite to an AmqpValue. */ - operator AmqpValue const() const; + explicit operator AmqpValue const() const; /** @brief Returns the descriptor for this composite type. * @@ -944,5 +976,4 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { AmqpValue m_descriptor; AmqpValue m_value; }; - }}}} // namespace Azure::Core::Amqp::Models diff --git a/sdk/core/azure-core-amqp/samples/internal/eventhub_get_properties_sample/eventhub_get_eventhub_properties_sample.cpp b/sdk/core/azure-core-amqp/samples/internal/eventhub_get_properties_sample/eventhub_get_eventhub_properties_sample.cpp index 79ae9202a..3bdffa544 100644 --- a/sdk/core/azure-core-amqp/samples/internal/eventhub_get_properties_sample/eventhub_get_eventhub_properties_sample.cpp +++ b/sdk/core/azure-core-amqp/samples/internal/eventhub_get_properties_sample/eventhub_get_eventhub_properties_sample.cpp @@ -91,7 +91,7 @@ struct EventHubPartitionProperties final bool IsEmpty{}; }; -EventHubPartitionProperties GetPartitionProperties( +std::tuple GetPartitionProperties( Azure::Core::Amqp::_internal::Session const& session, std::string const& eventHubName, std::string const& partitionId) @@ -119,9 +119,11 @@ EventHubPartitionProperties GetPartitionProperties( message); EventHubPartitionProperties properties; + bool error{false}; if (result.Status == Azure::Core::Amqp::_internal::ManagementOperationStatus::Error) { - std::cerr << "Error: " << result.Message.ApplicationProperties["status-description"]; + std::cerr << "Error: " << result.Description; + error = true; } else { @@ -150,7 +152,7 @@ EventHubPartitionProperties GetPartitionProperties( } managementClient.Close(); - return properties; + return std::make_tuple(error, properties); } int main() @@ -194,16 +196,21 @@ int main() { std::cout << "Partition: " << partition << std::endl; auto partitionProperties = GetPartitionProperties(session, eventhubsEntity, partition); - std::cout << "Partition properties: " << std::endl; - std::cout << " Name: " << partitionProperties.Name << std::endl; - std::cout << " PartitionId: " << partitionProperties.PartitionId << std::endl; - std::cout << " BeginningSequenceNumber: " << partitionProperties.BeginningSequenceNumber - << std::endl; - std::cout << " LastEnqueuedSequenceNumber: " << partitionProperties.LastEnqueuedSequenceNumber - << std::endl; - std::cout << " LastEnqueuedOffset: " << partitionProperties.LastEnqueuedOffset << std::endl; - std::cout << " LastEnqueuedTimeUtc: " << partitionProperties.LastEnqueuedTimeUtc.ToString() - << std::endl; - std::cout << " IsEmpty: " << std::boolalpha << partitionProperties.IsEmpty << std::endl; + if (!std::get<0>(partitionProperties)) + { + std::cout << "Partition properties: " << std::endl; + std::cout << " Name: " << std::get<1>(partitionProperties).Name << std::endl; + std::cout << " PartitionId: " << std::get<1>(partitionProperties).PartitionId << std::endl; + std::cout << " BeginningSequenceNumber: " + << std::get<1>(partitionProperties).BeginningSequenceNumber << std::endl; + std::cout << " LastEnqueuedSequenceNumber: " + << std::get<1>(partitionProperties).LastEnqueuedSequenceNumber << std::endl; + std::cout << " LastEnqueuedOffset: " << std::get<1>(partitionProperties).LastEnqueuedOffset + << std::endl; + std::cout << " LastEnqueuedTimeUtc: " + << std::get<1>(partitionProperties).LastEnqueuedTimeUtc.ToString() << std::endl; + std::cout << " IsEmpty: " << std::boolalpha << std::get<1>(partitionProperties).IsEmpty + << std::endl; + } } } diff --git a/sdk/core/azure-core-amqp/src/amqp/management.cpp b/sdk/core/azure-core-amqp/src/amqp/management.cpp index ac4b91e2c..9fbbe392a 100644 --- a/sdk/core/azure-core-amqp/src/amqp/management.cpp +++ b/sdk/core/azure-core-amqp/src/amqp/management.cpp @@ -5,6 +5,7 @@ #include "azure/core/amqp/connection.hpp" #include "azure/core/amqp/models/amqp_message.hpp" +#include "azure/core/amqp/models/messaging_values.hpp" #include "azure/core/amqp/session.hpp" #include "private/connection_impl.hpp" #include "private/management_impl.hpp" @@ -23,11 +24,13 @@ using namespace Azure::Core::Diagnostics::_internal; using namespace Azure::Core::Diagnostics; +#if UAMQP_MANAGEMENT_CLIENT void Azure::Core::_internal::UniqueHandleHelper::FreeAmqpManagement( AMQP_MANAGEMENT_HANDLE value) { amqp_management_destroy(value); } +#endif namespace Azure { namespace Core { namespace Amqp { namespace _internal { @@ -71,9 +74,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { } ManagementClientImpl::~ManagementClientImpl() noexcept { m_eventHandler = nullptr; } +#if UAMQP_MANAGEMENT_CLIENT void ManagementClientImpl::CreateManagementClient() { - m_management.reset(amqp_management_create(*m_session, m_options.ManagementNodeName.c_str())); if (!m_management) { @@ -100,13 +103,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { } } } +#endif _internal::ManagementOpenStatus ManagementClientImpl::Open(Context const& context) { /** Authentication needs to happen *before* the management object is created. */ m_session->AuthenticateIfNeeded( m_managementEntityPath + "/" + m_options.ManagementNodeName, context); - +#if UAMQP_MANAGEMENT_CLIENT CreateManagementClient(); if (amqp_management_open_async( @@ -134,6 +138,51 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { } } throw Azure::Core::OperationCancelledException("Management Open operation was cancelled."); +#else + { + _internal::MessageSenderOptions messageSenderOptions; + messageSenderOptions.EnableTrace = m_options.EnableTrace; + messageSenderOptions.MessageSource = m_options.ManagementNodeName; + messageSenderOptions.Name = m_options.ManagementNodeName + "-sender"; + messageSenderOptions.AuthenticationRequired = false; + + m_messageSender = std::make_shared( + m_session, m_options.ManagementNodeName, messageSenderOptions, this); + } + { + _internal::MessageReceiverOptions messageReceiverOptions; + messageReceiverOptions.EnableTrace = m_options.EnableTrace; + messageReceiverOptions.MessageTarget = m_options.ManagementNodeName; + messageReceiverOptions.Name = m_options.ManagementNodeName + "-receiver"; + messageReceiverOptions.AuthenticationRequired = false; + + m_messageReceiver = std::make_shared( + m_session, m_options.ManagementNodeName, messageReceiverOptions, this); + } + + // Now open the message sender and receiver. + SetState(ManagementState::Opening); + m_messageSender->Open(context); + m_messageReceiver->Open(context); + + // And finally, wait for the message sender and receiver to finish opening before we return. + auto result = m_openCompleteQueue.WaitForPolledResult(context, *m_session->GetConnection()); + if (result) + { + // If the message sender or receiver failed to open, we need to close them + _internal::ManagementOpenStatus rv = std::get<0>(*result); + if (rv != _internal::ManagementOpenStatus::Ok) + { + m_messageSender->Close(); + m_messageSenderOpen = false; + m_messageReceiver->Close(); + m_messageReceiverOpen = false; + } + return rv; + } + // If result is null, then it means that the context was cancelled. + throw Azure::Core::OperationCancelledException("Management Open operation was cancelled."); +#endif // UAMQP_MANAGEMENT_CLIENT } _internal::ManagementOperationResult ManagementClientImpl::ExecuteOperation( @@ -148,6 +197,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { { messageToSend.ApplicationProperties["security_token"] = Models::AmqpValue{token}; } +#if UAMQP_MANAGEMENT_CLIENT if (!amqp_management_execute_operation_async( m_management.get(), operationToPerform.c_str(), @@ -174,10 +224,83 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { { throw Azure::Core::OperationCancelledException("Management operation cancelled."); } +#else + messageToSend.ApplicationProperties.emplace("operation", operationToPerform); + messageToSend.ApplicationProperties.emplace("type", typeOfOperation); + if (!locales.empty()) + { + messageToSend.ApplicationProperties.emplace("locales", locales); + } + messageToSend.Properties.MessageId = m_nextMessageId; + m_expectedMessageId = m_nextMessageId; + m_sendCompleted = false; + m_nextMessageId++; + + m_messageSender->QueueSend( + messageToSend, + [&](_internal::MessageSendStatus sendStatus, Models::AmqpValue const& deliveryState) { + m_sendCompleted = true; + Log::Stream(Logger::Level::Informational) + << "Management operation send complete. Status: " << static_cast(sendStatus) + << ", DeliveryState: " << deliveryState; + if (sendStatus != _internal::MessageSendStatus::Ok) + { + std::string errorDescription = "Send failed."; + auto deliveryStateAsList{deliveryState.AsList()}; + Models::AmqpValue firstState{deliveryStateAsList[0]}; + ERROR_HANDLE errorHandle; + if (!amqpvalue_get_error(firstState, &errorHandle)) + { + Models::_internal::UniqueAmqpErrorHandle uniqueError{ + errorHandle}; // This will free the error handle when it goes out of scope. + Models::_internal::AmqpError error{ + Models::_internal::AmqpErrorFactory::FromUamqp(errorHandle)}; + errorDescription = error.Description; + } + m_messageQueue.CompleteOperation( + _internal::ManagementOperationStatus::Error, + 500, + errorDescription, + Models::AmqpMessage{}); + } + }, + context); + auto result = m_messageQueue.WaitForPolledResult(context, *m_session->GetConnection()); + if (result) + { + _internal::ManagementOperationResult rv; + rv.Status = std::get<0>(*result); + rv.StatusCode = std::get<1>(*result); + rv.Description = std::get<2>(*result); + rv.Message = std::get<3>(*result); + return rv; + } + else + { + throw Azure::Core::OperationCancelledException("Management operation cancelled."); + } +#endif } - void ManagementClientImpl::Close() { amqp_management_close(m_management.get()); } + void ManagementClientImpl::SetState(ManagementState newState) { m_state = newState; } + void ManagementClientImpl::Close() + { +#if UAMQP_MANAGEMENT_CLIENT + amqp_management_close(m_management.get()); +#endif + SetState(ManagementState::Closing); + if (m_messageSender && m_messageSenderOpen) + { + m_messageSender->Close(); + } + if (m_messageReceiver && m_messageReceiverOpen) + { + m_messageReceiver->Close(); + } + } + +#if UAMQP_MANAGEMENT_IMPLEMENTATION void ManagementClientImpl::OnOpenCompleteFn(void* context, AMQP_MANAGEMENT_OPEN_RESULT openResult) { ManagementClientImpl* management = static_cast(context); @@ -188,7 +311,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { } management->m_openCompleteQueue.CompleteOperation(openResult); } - void ManagementClientImpl::OnManagementErrorFn(void* context) { ManagementClientImpl* management = static_cast(context); @@ -249,5 +371,328 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { throw std::runtime_error("Unknown management status."); // LCOV_EXCL_LINE } } +#else + + void ManagementClientImpl::OnMessageSenderStateChanged( + _internal::MessageSender const&, + _internal::MessageSenderState newState, + _internal::MessageSenderState oldState) + { + if (newState == oldState) + { + Log::Stream(Logger::Level::Verbose) + << "OnMessageSenderStateChanged: newState == oldState" << std::endl; + return; + } + + switch (m_state) + { + case ManagementState::Opening: + switch (newState) + { + // If the message sender is opening, we don't need to do anything. + case _internal::MessageSenderState::Opening: + break; + // If the message sender is open, remember it. If the message receiver is also + // open, complete the outstanding open. + case _internal::MessageSenderState::Open: + m_messageSenderOpen = true; + if (m_messageReceiverOpen) + { + SetState(ManagementState::Open); + m_openCompleteQueue.CompleteOperation(_internal::ManagementOpenStatus::Ok); + } + break; + // If the message sender is transitioning to an error or state other than open, + // it's an error. + default: + case _internal::MessageSenderState::Idle: + case _internal::MessageSenderState::Closing: + case _internal::MessageSenderState::Error: + Log::Stream(Logger::Level::Error) + << "Message Sender Changed State to " << static_cast(newState) + << " while management client is opening"; + SetState(ManagementState::Closing); + m_openCompleteQueue.CompleteOperation(_internal::ManagementOpenStatus::Error); + break; + } + break; + case ManagementState::Open: + switch (newState) + { + // If the message sender goes to a non-open state, it's an error. + default: + case _internal::MessageSenderState::Idle: + case _internal::MessageSenderState::Closing: + case _internal::MessageSenderState::Error: + Log::Stream(Logger::Level::Error) + << "Message Sender Changed State to " << static_cast(newState) + << " while management client is open"; + SetState(ManagementState::Closing); + if (m_eventHandler) + { + m_eventHandler->OnError(Models::_internal::AmqpError{}); + } + break; + // Ignore message sender open changes. + case _internal::MessageSenderState::Open: + break; + } + break; + case ManagementState::Closing: + switch (newState) + { + // If the message sender goes to a non-open state, it's an error. + default: + case _internal::MessageSenderState::Open: + case _internal::MessageSenderState::Opening: + case _internal::MessageSenderState::Error: + Log::Stream(Logger::Level::Error) + << "Message Sender Changed State to " << static_cast(newState) + << " while management client is closing"; + SetState(ManagementState::Closing); + if (m_eventHandler) + { + m_eventHandler->OnError(Models::_internal::AmqpError{}); + } + break; + // Ignore message sender closing or idle state changes. + case _internal::MessageSenderState::Idle: + case _internal::MessageSenderState::Closing: + break; + } + break; + case ManagementState::Idle: + case ManagementState::Error: + Log::Stream(Logger::Level::Error) + << "Message sender state changed to " << static_cast(newState) + << " when management client is in the error state, ignoring."; + break; + } + } + + void ManagementClientImpl::OnMessageSenderDisconnected(Models::_internal::AmqpError const& error) + { + Log::Stream(Logger::Level::Error) << "Message sender disconnected: " << error << std::endl; + SetState(ManagementState::Error); + if (m_eventHandler) + { + m_eventHandler->OnError(error); + } + } + + void ManagementClientImpl::OnMessageReceiverStateChanged( + _internal::MessageReceiver const&, + _internal::MessageReceiverState newState, + _internal::MessageReceiverState oldState) + { + if (newState == oldState) + { + Log::Stream(Logger::Level::Error) + << "OnMessageReceiverStateChanged: newState == oldState" << std::endl; + return; + } + + switch (m_state) + { + case ManagementState::Opening: + switch (newState) + { + // If the message sender is opening, we don't need to do anything. + case _internal::MessageReceiverState::Opening: + break; + // If the message receiver is open, remember it. If the message sender is also + // open, complete the outstanding open. + case _internal::MessageReceiverState::Open: + m_messageReceiverOpen = true; + if (m_messageSenderOpen) + { + SetState(ManagementState::Open); + m_openCompleteQueue.CompleteOperation(_internal::ManagementOpenStatus::Ok); + } + break; + // If the message receiver is transitioning to an error or state other than open, + // it's an error. + default: + case _internal::MessageReceiverState::Idle: + case _internal::MessageReceiverState::Closing: + case _internal::MessageReceiverState::Error: + Log::Stream(Logger::Level::Error) + << "Message Receiver Changed State to " << static_cast(newState) + << " while management client is opening"; + SetState(ManagementState::Closing); + m_openCompleteQueue.CompleteOperation(_internal::ManagementOpenStatus::Error); + break; + } + break; + case ManagementState::Open: + switch (newState) + { + // If the message sender goes to a non-open state, it's an error. + default: + case _internal::MessageReceiverState::Idle: + case _internal::MessageReceiverState::Closing: + case _internal::MessageReceiverState::Error: + Log::Stream(Logger::Level::Error) + << "Message Sender Changed State to " << static_cast(newState) + << " while management client is open"; + SetState(ManagementState::Closing); + if (m_eventHandler) + { + m_eventHandler->OnError(Models::_internal::AmqpError{}); + } + break; + // Ignore message sender open changes. + case _internal::MessageReceiverState::Open: + break; + } + break; + case ManagementState::Closing: + switch (newState) + { + // If the message sender goes to a non-open state, it's an error. + default: + case _internal::MessageReceiverState::Open: + case _internal::MessageReceiverState::Opening: + case _internal::MessageReceiverState::Error: + Log::Stream(Logger::Level::Error) + << "Message Sender Changed State to " << static_cast(newState) + << " while management client is closing"; + SetState(ManagementState::Closing); + if (m_eventHandler) + { + m_eventHandler->OnError(Models::_internal::AmqpError{}); + } + break; + // Ignore message sender closing or idle state changes. + case _internal::MessageReceiverState::Idle: + case _internal::MessageReceiverState::Closing: + break; + } + break; + case ManagementState::Idle: + case ManagementState::Error: + Log::Stream(Logger::Level::Error) + << "Message sender state changed to " << static_cast(newState) + << " when management client is in the error state, ignoring."; + break; + } + } + + Models::AmqpValue ManagementClientImpl::IndicateError( + std::string const& condition, + std::string const& description) + { + Log::Stream(Logger::Level::Error) + << "Indicate Management Error: " << condition << " " << description; + if (m_eventHandler) + { + // Let external callers know that the error was triggered. + Models::_internal::AmqpError error; + error.Condition = Models::_internal::AmqpErrorCondition(condition); + error.Description = "Message Delivery Rejected: " + description; + m_eventHandler->OnError(error); + } + + // Complete any outstanding receives with an error. + m_messageQueue.CompleteOperation( + _internal::ManagementOperationStatus::Error, 500, description, Models::AmqpMessage()); + + return Models::_internal::Messaging::DeliveryRejected(condition, description); + } + + Models::AmqpValue ManagementClientImpl::OnMessageReceived( + _internal::MessageReceiver const&, + Models::AmqpMessage const& message) + { + if (message.ApplicationProperties.empty()) + { + return IndicateError( + Models::_internal::AmqpErrorCondition::InternalError.ToString(), + "Received message does not have application properties."); + } + if (!message.Properties.CorrelationId.HasValue()) + { + return IndicateError( + Models::_internal::AmqpErrorCondition::InternalError.ToString(), + "Received message correlation ID not found."); + } + else if (message.Properties.CorrelationId.Value().GetType() != Models::AmqpValueType::Ulong) + { + return IndicateError( + Models::_internal::AmqpErrorCondition::InternalError.ToString(), + "Received message correlation ID is not a ulong."); + } + uint64_t correlationId = message.Properties.CorrelationId.Value(); + + auto statusCodeMap = message.ApplicationProperties.find(m_options.ExpectedStatusCodeKeyName); + if (statusCodeMap == message.ApplicationProperties.end()) + { + return IndicateError( + Models::_internal::AmqpErrorCondition::InternalError.ToString(), + "Received message does not have a " + m_options.ExpectedStatusCodeKeyName + + " status code key."); + } + else if (statusCodeMap->second.GetType() != Models::AmqpValueType::Int) + { + return IndicateError( + Models::_internal::AmqpErrorCondition::InternalError.ToString(), + "Received message " + m_options.ExpectedStatusCodeKeyName + " value is not an int."); + } + int32_t statusCode = statusCodeMap->second; + + // If the message has a status description, remember it. + auto statusDescription + = message.ApplicationProperties.find(m_options.ExpectedStatusDescriptionKeyName); + std::string description; + if (statusDescription != message.ApplicationProperties.end()) + { + if (statusDescription->second.GetType() != Models::AmqpValueType::String) + { + return IndicateError( + Models::_internal::AmqpErrorCondition::InternalError.ToString(), + "Received message " + m_options.ExpectedStatusDescriptionKeyName + + " value is not a string."); + } + description = static_cast(statusDescription->second); + } + + if (correlationId != m_expectedMessageId) + { + return IndicateError( + Models::_internal::AmqpErrorCondition::InternalError.ToString(), + "Received message correlation ID does not match request ID."); + } + if (!m_sendCompleted) + { + Log::Stream(Logger::Level::Error) << "Received message before send completed."; + } + + // AMQP management statusCode values are [RFC + // 2616](https://www.rfc-editor.org/rfc/rfc2616#section-6.1.1) status codes. + if ((statusCode < 200) || (statusCode > 299)) + { + m_messageQueue.CompleteOperation( + _internal::ManagementOperationStatus::FailedBadStatus, statusCode, description, message); + } + else + { + m_messageQueue.CompleteOperation( + _internal::ManagementOperationStatus::Ok, statusCode, description, message); + } + return Models::_internal::Messaging::DeliveryAccepted(); + } + + void ManagementClientImpl::OnMessageReceiverDisconnected( + Models::_internal::AmqpError const& error) + { + Log::Stream(Logger::Level::Error) << "Message receiver disconnected: " << error << std::endl; + SetState(ManagementState::Error); + if (m_eventHandler) + { + m_eventHandler->OnError(error); + } + } +#endif }}}} // namespace Azure::Core::Amqp::_detail diff --git a/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp b/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp index 5b9356738..13fa418e4 100644 --- a/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp +++ b/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp @@ -161,6 +161,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { std::pair, Models::_internal::AmqpError> MessageReceiverImpl::WaitForIncomingMessage(Context const& context) { + if (m_eventHandler) + { + throw std::runtime_error("Cannot call WaitForIncomingMessage when using an event handler."); + } auto result = m_messageQueue.WaitForPolledResult(context, *m_session->GetConnection()); if (result) { @@ -261,7 +265,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { void MessageReceiverImpl::Open(Azure::Core::Context const& context) { - m_session->AuthenticateIfNeeded(static_cast(m_source.GetAddress()), context); + if (m_options.AuthenticationRequired) + { + m_session->AuthenticateIfNeeded(static_cast(m_source.GetAddress()), context); + } // Once we've authenticated the connection, establish the link and receiver. // We cannot do this before authenticating the client. diff --git a/sdk/core/azure-core-amqp/src/amqp/message_sender.cpp b/sdk/core/azure-core-amqp/src/amqp/message_sender.cpp index 7873770f1..4fd98f4bd 100644 --- a/sdk/core/azure-core-amqp/src/amqp/message_sender.cpp +++ b/sdk/core/azure-core-amqp/src/amqp/message_sender.cpp @@ -188,9 +188,12 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { void MessageSenderImpl::Open(Context const& context) { - // If we need to authenticate with either ServiceBus or BearerToken, now is the time to do - // it. - m_session->AuthenticateIfNeeded(static_cast(m_target.GetAddress()), context); + if (m_options.AuthenticationRequired) + { + // If we need to authenticate with either ServiceBus or BearerToken, now is the time to do + // it. + m_session->AuthenticateIfNeeded(static_cast(m_target.GetAddress()), context); + } if (m_link == nullptr) { diff --git a/sdk/core/azure-core-amqp/src/amqp/private/management_impl.hpp b/sdk/core/azure-core-amqp/src/amqp/private/management_impl.hpp index c9b2c5239..e417fbae9 100644 --- a/sdk/core/azure-core-amqp/src/amqp/private/management_impl.hpp +++ b/sdk/core/azure-core-amqp/src/amqp/private/management_impl.hpp @@ -8,6 +8,7 @@ #include "azure/core/amqp/session.hpp" #include "connection_impl.hpp" #include "message_receiver_impl.hpp" +#include "message_sender_impl.hpp" #include "session_impl.hpp" #include @@ -15,8 +16,9 @@ #include #include +#include #include - +#if UAMQP_MANAGEMENT_IMPLEMENTATION template <> struct Azure::Core::_internal::UniqueHandleHelper { static void FreeAmqpManagement(AMQP_MANAGEMENT_INSTANCE_TAG* obj); @@ -27,6 +29,7 @@ template <> struct Azure::Core::_internal::UniqueHandleHelper; +#endif namespace Azure { namespace Core { namespace Amqp { namespace _detail { @@ -45,7 +48,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { } }; - class ManagementClientImpl final : public ::std::enable_shared_from_this { + class ManagementClientImpl final : public ::std::enable_shared_from_this, + public _internal::MessageSenderEvents, + public _internal::MessageReceiverEvents { public: ManagementClientImpl( std::shared_ptr session, @@ -76,15 +81,46 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { Context const& context); private: + enum class ManagementState + { + Idle, + Opening, + Closing, + Open, + Error + }; +#if UAMQP_MANAGEMENT_IMPLEMENTATION UniqueAmqpManagementHandle m_management{}; + Azure::Core::Amqp::Common::_internal::AsyncOperationQueue + m_openCompleteQueue; +#else + std::shared_ptr m_messageSender; + std::shared_ptr m_messageReceiver; + ManagementState m_state = ManagementState::Idle; + bool m_messageSenderOpen{false}; + bool m_messageReceiverOpen{false}; + Azure::Core::Amqp::Common::_internal::AsyncOperationQueue<_internal::ManagementOpenStatus> + m_openCompleteQueue; + + uint64_t m_nextMessageId{0}; + + // What is the message ID expected for the current outstanding operation? + uint64_t m_expectedMessageId; + bool m_sendCompleted{false}; + + void SetState(ManagementState newState); + // Reflect the error state to the OnError callback and return a delivery rejected status. + Models::AmqpValue IndicateError( + std::string const& errorCondition, + std::string const& errorDescription); + +#endif std::string m_managementNodeName; _internal::ManagementClientOptions m_options; std::string m_source; std::shared_ptr m_session; _internal::ManagementClientEvents* m_eventHandler{}; std::string m_managementEntityPath; - Azure::Core::Amqp::Common::_internal::AsyncOperationQueue - m_openCompleteQueue; Azure::Core::Amqp::Common::_internal::AsyncOperationQueue< _internal::ManagementOperationStatus, @@ -93,8 +129,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { Models::AmqpMessage> m_messageQueue; +#if UAMQP_MANAGEMENT_IMPLEMENTATION void CreateManagementClient(); - static void OnExecuteOperationCompleteFn( void* context, AMQP_MANAGEMENT_EXECUTE_OPERATION_RESULT executeResult, @@ -103,5 +139,24 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { MESSAGE_HANDLE messageHandle); static void OnManagementErrorFn(void* context); static void OnOpenCompleteFn(void* context, AMQP_MANAGEMENT_OPEN_RESULT openResult); +#else + + // Inherited via MessageSenderEvents + virtual void OnMessageSenderStateChanged( + _internal::MessageSender const& sender, + _internal::MessageSenderState newState, + _internal::MessageSenderState oldState) override; + virtual void OnMessageSenderDisconnected(Models::_internal::AmqpError const& error) override; + + // Inherited via MessageReceiverEvents + virtual void OnMessageReceiverStateChanged( + _internal::MessageReceiver const& receiver, + _internal::MessageReceiverState newState, + _internal::MessageReceiverState oldState) override; + virtual Models::AmqpValue OnMessageReceived( + _internal::MessageReceiver const& receiver, + Models::AmqpMessage const& message) override; + virtual void OnMessageReceiverDisconnected(Models::_internal::AmqpError const& error) override; +#endif }; }}}} // namespace Azure::Core::Amqp::_detail diff --git a/sdk/core/azure-core-amqp/src/common/global_state.cpp b/sdk/core/azure-core-amqp/src/common/global_state.cpp index 17596f0e1..e9f97cf9c 100644 --- a/sdk/core/azure-core-amqp/src/common/global_state.cpp +++ b/sdk/core/azure-core-amqp/src/common/global_state.cpp @@ -57,7 +57,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace // information gets in the way of the message. if (logCategory == AZ_LOG_TRACE && (strcmp(func, "log_outgoing_frame") == 0 || strcmp(func, "log_incoming_frame") == 0 - || strcmp(func, "log_message_chunk") == 0)) + || strcmp(func, "log_message_chunk") == 0 || strcmp(func, "_log_outgoing_frame") == 0 + || strcmp(func, "_log_incoming_frame") == 0)) { } else diff --git a/sdk/core/azure-core-amqp/src/models/amqp_message.cpp b/sdk/core/azure-core-amqp/src/models/amqp_message.cpp index f66633ced..d4daae49d 100644 --- a/sdk/core/azure-core-amqp/src/models/amqp_message.cpp +++ b/sdk/core/azure-core-amqp/src/models/amqp_message.cpp @@ -455,15 +455,16 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { // described body. AmqpDescribed describedBody( static_cast(AmqpDescriptors::DataAmqpValue), message.m_amqpValueBody); - auto serializedBodyValue = AmqpValue::Serialize(describedBody); + auto serializedBodyValue = AmqpValue::Serialize(static_cast(describedBody)); rv.insert(rv.end(), serializedBodyValue.begin(), serializedBodyValue.end()); } break; case MessageBodyType::Data: for (auto const& val : message.m_binaryDataBody) { - AmqpDescribed describedBody(static_cast(AmqpDescriptors::DataBinary), val); - auto serializedBodyValue = AmqpValue::Serialize(describedBody); + AmqpDescribed describedBody( + static_cast(AmqpDescriptors::DataBinary), static_cast(val)); + auto serializedBodyValue = AmqpValue::Serialize(static_cast(describedBody)); rv.insert(rv.end(), serializedBodyValue.begin(), serializedBodyValue.end()); } break; @@ -471,8 +472,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { for (auto const& val : message.m_amqpSequenceBody) { AmqpDescribed describedBody( - static_cast(AmqpDescriptors::DataAmqpSequence), val); - auto serializedBodyValue = AmqpValue::Serialize(describedBody); + static_cast(AmqpDescriptors::DataAmqpSequence), + static_cast(val)); + auto serializedBodyValue = AmqpValue::Serialize(static_cast(describedBody)); rv.insert(rv.end(), serializedBodyValue.begin(), serializedBodyValue.end()); } } diff --git a/sdk/core/azure-core-amqp/src/models/amqp_value.cpp b/sdk/core/azure-core-amqp/src/models/amqp_value.cpp index 353395286..ffde1eabd 100644 --- a/sdk/core/azure-core-amqp/src/models/amqp_value.cpp +++ b/sdk/core/azure-core-amqp/src/models/amqp_value.cpp @@ -251,6 +251,38 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { bool AmqpValue::operator==(AmqpValue const& that) const { + // If both values are null, they are equal. + if (IsNull() && that.IsNull()) + { + return true; + } + // If only one of the values is null, they are not equal. + if (IsNull() || that.IsNull()) + { + return false; + } + // If the types are not equal, they are not equal. + if (GetType() != that.GetType()) + { + return false; + } + // The uAMQP function `amqpvalue_are_equal` does not work for all types, so we need to do some + // special handling for those which are not handled properly. + if (GetType() == AmqpValueType::Composite || GetType() == AmqpValueType::Described) + { + if (GetType() == AmqpValueType::Composite) + { + AmqpComposite thisComposite{this->AsComposite()}; + AmqpComposite thatComposite{that.AsComposite()}; + return (thisComposite == thatComposite); + } + else + { + AmqpDescribed thisDescribed{this->AsDescribed()}; + AmqpDescribed thatDescribed{that.AsDescribed()}; + return (thisDescribed == thatDescribed); + } + } return amqpvalue_are_equal(m_value.get(), that.m_value.get()); } diff --git a/sdk/core/azure-core-amqp/test/ut/amqp_message_tests.cpp b/sdk/core/azure-core-amqp/test/ut/amqp_message_tests.cpp index e60b8b410..23dcfac63 100644 --- a/sdk/core/azure-core-amqp/test/ut/amqp_message_tests.cpp +++ b/sdk/core/azure-core-amqp/test/ut/amqp_message_tests.cpp @@ -21,6 +21,8 @@ TEST_F(TestMessage, SimpleCreate) { AmqpMessage message1; + message1.Properties.MessageId = 12345; + message1.SetBody("Hello world"); AmqpMessage message2(std::move(message1)); AmqpMessage message3(message2); AmqpMessage message4; @@ -126,7 +128,7 @@ TEST_F(TestMessage, TestBodyAmqpSequence) { AmqpMessage message; - message.SetBody({"Test", 95, AmqpMap{{3, 5}, {4, 9}}}); + message.SetBody({"Test", 95, static_cast(AmqpMap{{3, 5}, {4, 9}})}); EXPECT_EQ(1, message.GetBodyAsAmqpList().size()); EXPECT_EQ("Test", static_cast(message.GetBodyAsAmqpList()[0].at(0))); @@ -145,7 +147,8 @@ TEST_F(TestMessage, TestBodyAmqpSequence) } { AmqpMessage message; - message.SetBody({{1}, {"Test", 3}, {"Test", 95, AmqpMap{{3, 5}, {4, 9}}}}); + message.SetBody( + {{1}, {"Test", 3}, {"Test", 95, static_cast(AmqpMap{{3, 5}, {4, 9}})}}); EXPECT_EQ(3, message.GetBodyAsAmqpList().size()); EXPECT_EQ("Test", static_cast(message.GetBodyAsAmqpList()[1].at(0))); EXPECT_EQ(95, static_cast(message.GetBodyAsAmqpList()[2].at(1))); @@ -223,7 +226,7 @@ TEST_F(MessageSerialization, SerializeMessageBodyBinary) std::vector buffer; AmqpMessage message; message.Properties.MessageId = "12345"; - message.SetBody(AmqpMap{{"key1", "value1"}, {"key2", "value2"}}); + message.SetBody(static_cast(AmqpMap{{"key1", "value1"}, {"key2", "value2"}})); buffer = AmqpMessage::Serialize(message); AmqpMessage deserialized = AmqpMessage::Deserialize(buffer.data(), buffer.size()); EXPECT_EQ(message, deserialized); diff --git a/sdk/core/azure-core-amqp/test/ut/amqp_value_tests.cpp b/sdk/core/azure-core-amqp/test/ut/amqp_value_tests.cpp index 4d13b4578..6eaed5e08 100644 --- a/sdk/core/azure-core-amqp/test/ut/amqp_value_tests.cpp +++ b/sdk/core/azure-core-amqp/test/ut/amqp_value_tests.cpp @@ -2,6 +2,7 @@ // SPDX-Licence-Identifier: MIT #include "azure/core/amqp/common/global_state.hpp" +#include "azure/core/amqp/models/amqp_error.hpp" #include "azure/core/amqp/models/amqp_value.hpp" #include @@ -261,6 +262,42 @@ TEST_F(TestValues, TestList) EXPECT_EQ(AmqpValue('a'), list2.at(3)); EXPECT_FALSE(list1 < list2); } + + { + AmqpList test; + AmqpDescribed desc{ + static_cast(29), + static_cast(AmqpList{AmqpValue{"test:error"}, AmqpValue{"test description"}})}; + test.push_back(AmqpValue{desc}); + EXPECT_EQ(1, test.size()); + EXPECT_EQ(AmqpValueType::Described, test[0].GetType()); + + AmqpList list2{test}; + EXPECT_EQ(1, list2.size()); + EXPECT_EQ(AmqpValueType::Described, list2[0].GetType()); + AmqpDescribed desc2{list2[0].AsDescribed()}; + EXPECT_EQ(desc2.GetDescriptor(), AmqpValue{static_cast(29ll)}); + } + + { + AmqpList test; + _internal::AmqpError error; + error.Condition = _internal::AmqpErrorCondition("test:error"); + error.Description = "test description"; + test.push_back(_internal::AmqpErrorFactory::ToAmqp(error)); + EXPECT_EQ(1, test.size()); + EXPECT_EQ(AmqpValueType::Composite, test[0].GetType()); + AmqpComposite testAsComposite{test[0].AsComposite()}; + EXPECT_EQ(testAsComposite.GetDescriptor(), AmqpValue{static_cast(29ll)}); + { + AmqpValue testAsValue{test}; + EXPECT_EQ(AmqpValueType::List, testAsValue.GetType()); + + auto testAsList{testAsValue.AsList()}; + EXPECT_EQ(AmqpValueType::Composite, testAsList[0].GetType()); + EXPECT_EQ(test[0], testAsList[0]); + } + } } TEST_F(TestValues, TestMap) @@ -308,7 +345,7 @@ TEST_F(TestValues, TestArray) EXPECT_EQ(5, array1.size()); - AmqpValue value = array1; + AmqpValue value = static_cast(array1); EXPECT_EQ(AmqpValueType::Array, value.GetType()); const AmqpArray array2 = value.AsArray(); @@ -398,7 +435,7 @@ TEST_F(TestValues, TestCompositeValue) // Put some things in the map. { AmqpComposite compositeVal(static_cast(116ull), {25, 25.0f}); - AmqpValue value = compositeVal; + AmqpValue value = static_cast(compositeVal); AmqpComposite testVal(value.AsComposite()); EXPECT_EQ(compositeVal.size(), testVal.size()); @@ -419,7 +456,7 @@ TEST_F(TestValues, TestDescribed) EXPECT_EQ(AmqpSymbol("My Composite Type"), described1.GetDescriptor().AsSymbol()); EXPECT_EQ(5, static_cast(described1.GetValue())); - AmqpValue value = described1; + AmqpValue value = static_cast(described1); EXPECT_EQ(AmqpValueType::Described, value.GetType()); AmqpDescribed described2 = value.AsDescribed(); @@ -436,7 +473,7 @@ TEST_F(TestValues, TestDescribed) EXPECT_EQ(937, static_cast(value.GetDescriptor())); EXPECT_EQ(5, static_cast(value.GetValue())); - AmqpValue value2 = value; + AmqpValue value2 = static_cast(value); AmqpDescribed described2 = value2.AsDescribed(); EXPECT_EQ(AmqpValueType::Described, value2.GetType()); diff --git a/sdk/core/azure-core-amqp/test/ut/management_tests.cpp b/sdk/core/azure-core-amqp/test/ut/management_tests.cpp index 2eb3ce4d0..e039c3d65 100644 --- a/sdk/core/azure-core-amqp/test/ut/management_tests.cpp +++ b/sdk/core/azure-core-amqp/test/ut/management_tests.cpp @@ -59,7 +59,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { Session session{connection.CreateSession({})}; ManagementClientOptions options; options.EnableTrace = 1; - ManagementClient management(session.CreateManagementClient("Test", {})); + ManagementClient management(session.CreateManagementClient("Test", options)); mockServer.StartListening(); @@ -103,16 +103,20 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { if (receiver.GetSourceName() != "$management" && receiver.GetSourceName() != "$cbs") { GTEST_LOG_(INFO) << "Rejecting message because it is for an unexpected node name."; - return Azure::Core::Amqp::Models::_internal::Messaging::DeliveryRejected( + auto rv = Azure::Core::Amqp::Models::_internal::Messaging::DeliveryRejected( "test:Rejected", "Unknown message source."); + GTEST_LOG_(INFO) << "RV=" << rv; + return rv; } // If this is coming on the management node, we only support the Test operation. if (receiver.GetSourceName() == "$management" && incomingMessage.ApplicationProperties.at("operation") != "Test") { - GTEST_LOG_(INFO) << "Rejecting message because is for an unknown operation."; - return Azure::Core::Amqp::Models::_internal::Messaging::DeliveryRejected( + GTEST_LOG_(INFO) << "Rejecting message because it is for an unknown operation."; + auto rv = Azure::Core::Amqp::Models::_internal::Messaging::DeliveryRejected( "amqp:status:rejected", "Unknown Request operation"); + GTEST_LOG_(INFO) << "RV=" << rv; + return rv; } return AmqpServerMock::OnMessageReceived(receiver, incomingMessage); } @@ -283,8 +287,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { auto response = management.ExecuteOperation("Test", "Type", "Locales", messageToSend); EXPECT_EQ(response.Status, ManagementOperationStatus::Error); - EXPECT_EQ(response.StatusCode, 0); - EXPECT_EQ(response.Description, "Error processing management operation."); + EXPECT_EQ(response.StatusCode, 500); + EXPECT_EQ(response.Description, "Received message statusCode value is not an int."); management.Close(); mockServer.StopListening(); @@ -298,7 +302,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { struct ManagementEventsHandler : public ManagementClientEvents { - void OnError() override { Error = true; } + void OnError(Azure::Core::Amqp::Models::_internal::AmqpError const&) override + { + Error = true; + } bool Error{false}; }; ManagementEventsHandler managementEvents; @@ -330,8 +337,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { auto response = management.ExecuteOperation("Test", "Type", "Locales", messageToSend); EXPECT_EQ(response.Status, ManagementOperationStatus::Error); - EXPECT_EQ(response.StatusCode, 0); - EXPECT_EQ(response.Description, "Error processing management operation."); + EXPECT_EQ(response.StatusCode, 500); + EXPECT_EQ( + response.Description, "Received message does not have a statusCode status code key."); EXPECT_TRUE(managementEvents.Error); management.Close(); @@ -410,8 +418,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { Azure::Core::Context::ApplicationContext.WithDeadline( std::chrono::system_clock::now() + std::chrono::seconds(10))); EXPECT_EQ(response.Status, ManagementOperationStatus::Error); - EXPECT_EQ(response.StatusCode, 0); - EXPECT_EQ(response.Description, ""); + EXPECT_EQ(response.StatusCode, 500); + EXPECT_EQ(response.Description, "Unknown Request operation"); management.Close(); mockServer.StopListening(); diff --git a/sdk/core/azure-core-amqp/test/ut/message_sender_receiver.cpp b/sdk/core/azure-core-amqp/test/ut/message_sender_receiver.cpp index 1f6532e36..1d9957f81 100644 --- a/sdk/core/azure-core-amqp/test/ut/message_sender_receiver.cpp +++ b/sdk/core/azure-core-amqp/test/ut/message_sender_receiver.cpp @@ -226,13 +226,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { receiverOptions.MessageTarget = messageTarget; receiverOptions.Name = name; receiverOptions.SettleMode = Azure::Core::Amqp::_internal::ReceiverSettleMode::First; - receiverOptions.DynamicAddress = messageSource.GetDynamic(); receiverOptions.EnableTrace = true; - auto receiver = std::make_unique(session.CreateMessageReceiver( - newLinkInstance, - static_cast(messageSource.GetAddress()), - receiverOptions, - this)); + auto receiver = std::make_unique( + session.CreateMessageReceiver(newLinkInstance, messageSource, receiverOptions, this)); GTEST_LOG_(INFO) << "Opening the message receiver."; receiver->Open(); m_messageReceiverQueue.CompleteOperation(std::move(receiver)); diff --git a/sdk/core/azure-core-amqp/test/ut/message_source_target.cpp b/sdk/core/azure-core-amqp/test/ut/message_source_target.cpp index 88ba8da39..a1f0a65a8 100644 --- a/sdk/core/azure-core-amqp/test/ut/message_source_target.cpp +++ b/sdk/core/azure-core-amqp/test/ut/message_source_target.cpp @@ -33,12 +33,12 @@ TEST_F(TestSourceTarget, SimpleSourceTarget) { EXPECT_ANY_THROW(MessageSource source(AmqpValue{})); - AmqpValue val = AmqpArray(); + AmqpValue val = static_cast(AmqpArray()); EXPECT_ANY_THROW(MessageSource source{val}); } { EXPECT_ANY_THROW(MessageTarget target(AmqpValue{})); - AmqpValue val = AmqpArray(); + AmqpValue val = static_cast(AmqpArray()); EXPECT_ANY_THROW(MessageTarget target(val)); } } @@ -79,7 +79,7 @@ TEST_F(TestSourceTarget, TargetProperties) { MessageTargetOptions options; - options.Capabilities.push_back(AmqpSymbol{"Test"}); + options.Capabilities.push_back(static_cast(AmqpSymbol{"Test"})); MessageTarget target(options); EXPECT_EQ(1, target.GetCapabilities().size()); EXPECT_EQ(AmqpValueType::Symbol, target.GetCapabilities()[0].GetType()); @@ -238,7 +238,7 @@ TEST_F(TestSourceTarget, SourceProperties) { MessageSourceOptions options; - options.Capabilities.push_back(AmqpSymbol{"Test"}); + options.Capabilities.push_back(static_cast(AmqpSymbol{"Test"})); MessageSource source(options); EXPECT_EQ(1, source.GetCapabilities().size()); EXPECT_EQ(AmqpValueType::Symbol, source.GetCapabilities()[0].GetType()); @@ -374,7 +374,7 @@ TEST_F(TestSourceTarget, SourceProperties) { MessageSourceOptions options; - options.Outcomes.push_back(AmqpSymbol("Test")); + options.Outcomes.push_back(static_cast(AmqpSymbol("Test"))); MessageSource source(options); EXPECT_EQ(1, source.GetOutcomes().size()); EXPECT_EQ(AmqpValueType::Symbol, source.GetOutcomes().at(0).GetType()); diff --git a/sdk/core/azure-core-amqp/test/ut_uamqp/CMakeLists.txt b/sdk/core/azure-core-amqp/test/ut_uamqp/CMakeLists.txt index e6d2b3b53..2f787febe 100644 --- a/sdk/core/azure-core-amqp/test/ut_uamqp/CMakeLists.txt +++ b/sdk/core/azure-core-amqp/test/ut_uamqp/CMakeLists.txt @@ -13,6 +13,9 @@ include(AzureBuildTargetForCI) include(GoogleTest) find_package(uamqp CONFIG REQUIRED) +find_package(umock_c) +find_package(azure_macro_utils_c CONFIG REQUIRED) +find_package(azure_c_shared_utility CONFIG REQUIRED) # Unit Tests add_executable(azure-core-amqp-tests-uamqp @@ -39,7 +42,7 @@ endif() target_include_directories(azure-core-amqp-tests-uamqp PRIVATE Azure::azure-core-amqp) # Link test executable against gtest & gtest_main -target_link_libraries(azure-core-amqp-tests-uamqp PRIVATE GTest::gtest_main Azure::azure-core-amqp uamqp) +target_link_libraries(azure-core-amqp-tests-uamqp PRIVATE GTest::gtest_main Azure::azure-core-amqp uamqp umock_c azure_macro_utils_c) gtest_discover_tests( azure-core-amqp-tests-uamqp ) create_per_service_target_build(core azure-core-amqp-tests-uamqp) diff --git a/sdk/core/azure-core/test/ut/service_tracing_test.cpp b/sdk/core/azure-core/test/ut/service_tracing_test.cpp index b8d109d8b..37f5365f3 100644 --- a/sdk/core/azure-core/test/ut/service_tracing_test.cpp +++ b/sdk/core/azure-core/test/ut/service_tracing_test.cpp @@ -21,14 +21,14 @@ TEST(TracingContextFactory, ServiceTraceEnums) spanKind = SpanKind::Producer; spanKind = Azure::Core::Tracing::_internal::SpanKind::Server; int i = static_cast(spanKind); - i += 1; + (void)i; // silence clang 14 warning about unused variable i. } { SpanStatus spanStatus = SpanStatus::Unset; spanStatus = SpanStatus::Error; spanStatus = SpanStatus::Ok; int i = static_cast(spanStatus); - i += 1; + (void)i; // silence clang 14 warning about unused variable i. } Azure::Core::Tracing::_internal::CreateSpanOptions options; options.Kind = SpanKind::Internal;