Convert AMQP Management APIs to use MessageSender and MessageReceiver instead of uAMQP version. (#4706)
* Switched management implementation to use message sender and receiver instead of uAMQP implementation. * Made value conversion to AmqpValue explicit to work around what appears to be a gcc9 bug Co-authored-by: Ahson Khan <ahkha@microsoft.com> * pull request feedback --------- Co-authored-by: Ahson Khan <ahkha@microsoft.com>
This commit is contained in:
parent
9d173dcd20
commit
5a9106f2a5
@ -3,6 +3,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "models/amqp_error.hpp"
|
||||
#include "models/amqp_message.hpp"
|
||||
#include "session.hpp"
|
||||
|
||||
@ -92,8 +93,11 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
|
||||
|
||||
public:
|
||||
/** @brief Called when an error occurs.
|
||||
*
|
||||
* @param error - the error which occurred.
|
||||
*
|
||||
*/
|
||||
virtual void OnError() = 0;
|
||||
virtual void OnError(Models::_internal::AmqpError const& error) = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
|
||||
@ -41,32 +41,44 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
|
||||
|
||||
class MessageReceiver;
|
||||
|
||||
struct MessageReceiverOptions
|
||||
struct MessageReceiverOptions final
|
||||
{
|
||||
/** @brief The name of the link associated with the message sender.
|
||||
*
|
||||
* Links are named so that they can be recovered when communication is interrupted. Link names
|
||||
* MUST uniquely identify the link amongst all links of the same direction between the two
|
||||
* participating containers. Link names are only used when attaching a link, so they can be
|
||||
* arbitrarily long without a significant penalty.
|
||||
*
|
||||
*/
|
||||
|
||||
std::string Name;
|
||||
std::vector<std::string> AuthenticationScopes;
|
||||
|
||||
/** @brief The settle mode for the link associated with the message sender.
|
||||
*
|
||||
* This field indicates how the deliveries sent over the link SHOULD be settled. When this
|
||||
* field is set to "mixed", the unsettled map MUST be sent even if it is empty. When this
|
||||
* field is set to "settled", the value of the unsettled map MUST NOT be sent. See
|
||||
* http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transactions-v1.0-os.html#doc-idp145616
|
||||
* for more details.
|
||||
*
|
||||
*/
|
||||
ReceiverSettleMode SettleMode{ReceiverSettleMode::First};
|
||||
|
||||
/** @brief The target for the link associated with the message receiver. */
|
||||
Models::_internal::MessageTarget MessageTarget;
|
||||
bool EnableTrace{false};
|
||||
|
||||
/** @brief The initial delivery count for the link associated with the message receiver. */
|
||||
Nullable<uint32_t> InitialDeliveryCount;
|
||||
|
||||
/** @brief The Maximum message size for the link associated with the message receiver. */
|
||||
Nullable<uint64_t> MaxMessageSize;
|
||||
|
||||
bool Batching{false};
|
||||
std::chrono::seconds BatchMaxAge{std::chrono::seconds(5)};
|
||||
std::vector<std::string> Capabilities;
|
||||
uint32_t Credit{1};
|
||||
LinkDurability Durability{};
|
||||
bool DynamicAddress{false};
|
||||
ExpiryPolicy SenderExpiryPolicy{};
|
||||
ExpiryPolicy ReceiverExpiryPolicy{};
|
||||
std::chrono::seconds ExpiryTimeout{std::chrono::seconds(0)};
|
||||
// LinkFilter
|
||||
bool ManualCredits{};
|
||||
Models::AmqpValue Properties;
|
||||
/** @brief If true, the message receiver will generate low level events */
|
||||
bool EnableTrace{false};
|
||||
|
||||
std::vector<std::string> SenderCapabilities;
|
||||
LinkDurability SenderDurability{};
|
||||
std::chrono::seconds SenderExpiryTimeout{std::chrono::seconds(0)};
|
||||
/** @brief If true, require that the message sender be authenticated with the service. */
|
||||
bool AuthenticationRequired{true};
|
||||
};
|
||||
|
||||
class MessageReceiverEvents {
|
||||
@ -86,6 +98,28 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
|
||||
virtual void OnMessageReceiverDisconnected(Models::_internal::AmqpError const& error) = 0;
|
||||
};
|
||||
|
||||
/** @brief MessageReceiver
|
||||
*
|
||||
* The MessageReceiver class is responsible for receiving messages from a remote AMQP node. It is
|
||||
* constructed by the Session::CreateMessageReceiver method.
|
||||
*
|
||||
* The message receiver operates in one of two possible models.
|
||||
*
|
||||
* In the first model, the message receiver caller registers for incoming messages by providing a
|
||||
* MessageReceiverEvents callback object, and processes incoming messages in the OnMessageReceived
|
||||
* method.
|
||||
*
|
||||
* In the second model, the caller calls the WaitForIncomingMessage method to wait for the next
|
||||
* incoming message.
|
||||
*
|
||||
* The primary difference between the two models is that the first model allows the caller to
|
||||
* alter the disposition of a message when it is received, the second model accepts all incoming
|
||||
* messages.
|
||||
*
|
||||
* @remarks If the caller provides a MessageReceiverEvents callback, then the
|
||||
* WaitForIncomingMessage API will throw an exception.
|
||||
*
|
||||
*/
|
||||
class MessageReceiver final {
|
||||
public:
|
||||
~MessageReceiver() noexcept;
|
||||
@ -95,11 +129,35 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
|
||||
MessageReceiver(MessageReceiver&&) = default;
|
||||
MessageReceiver& operator=(MessageReceiver&&) = default;
|
||||
|
||||
/** @brief Opens the message receiver.
|
||||
*
|
||||
* @param context The context for cancelling operations.
|
||||
*/
|
||||
void Open(Context const& context = {});
|
||||
|
||||
/** @brief Closes the message receiver.
|
||||
*
|
||||
*/
|
||||
void Close();
|
||||
|
||||
/** @brief Gets the name of the underlying link.
|
||||
*
|
||||
* @return The name of the underlying link object.
|
||||
*/
|
||||
std::string GetLinkName() const;
|
||||
|
||||
/** @brief Gets the Address of the message receiver's source node.
|
||||
*
|
||||
* @return The name of the source node.
|
||||
*/
|
||||
std::string GetSourceName() const;
|
||||
|
||||
/** @brief Waits until a message has been received.
|
||||
*
|
||||
* @param context The context for cancelling operations.
|
||||
*
|
||||
* @return A pair of the received message and the error if any.
|
||||
*/
|
||||
std::pair<Azure::Nullable<Models::AmqpMessage>, Models::_internal::AmqpError>
|
||||
WaitForIncomingMessage(Context const& context = {});
|
||||
|
||||
|
||||
@ -102,6 +102,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
|
||||
|
||||
/** @brief If true, the message sender will log trace events. */
|
||||
bool EnableTrace{false};
|
||||
|
||||
/** @brief If true, require that the message sender be authenticated with the service. */
|
||||
bool AuthenticationRequired{true};
|
||||
};
|
||||
|
||||
class MessageSender final {
|
||||
|
||||
@ -579,7 +579,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
|
||||
|
||||
public:
|
||||
/** @brief Convert this collection type to an AMQP value.*/
|
||||
operator AmqpValue() const { return static_cast<UniqueAmqpValueHandle>(*this).get(); }
|
||||
explicit operator AmqpValue() const
|
||||
{
|
||||
return static_cast<UniqueAmqpValueHandle>(*this).get();
|
||||
}
|
||||
|
||||
/** @brief Returns the size of the underlying value.*/
|
||||
inline typename T::size_type size() const { return m_value.size(); }
|
||||
@ -787,7 +790,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
|
||||
/**
|
||||
* @brief Convert an existing AmqpSymbol to an AmqpValue.
|
||||
*/
|
||||
operator AmqpValue() const;
|
||||
explicit operator AmqpValue() const;
|
||||
|
||||
/**
|
||||
* @brief Convert an AmqpSymbol instance to a uAMQP AMQP_VALUE.
|
||||
@ -839,6 +842,19 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
|
||||
*/
|
||||
AmqpComposite(AMQP_VALUE_DATA_TAG* const value);
|
||||
|
||||
/** @brief Compare this AmqpComposite value with another.
|
||||
*
|
||||
* @param that - the AmqpComposite to compare with.
|
||||
*/
|
||||
bool operator==(AmqpComposite const& that) const
|
||||
{
|
||||
if (GetDescriptor() == that.GetDescriptor())
|
||||
{
|
||||
return m_value == that.m_value;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/** @brief Returns the descriptor for this composite type.
|
||||
*
|
||||
* @returns The descriptor for this composite type.
|
||||
@ -858,7 +874,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
|
||||
/**
|
||||
* @brief Convert an existing AmqpComposite to an AmqpValue.
|
||||
*/
|
||||
operator AmqpValue() const { return static_cast<_detail::UniqueAmqpValueHandle>(*this).get(); }
|
||||
explicit operator AmqpValue() const
|
||||
{
|
||||
return static_cast<_detail::UniqueAmqpValueHandle>(*this).get();
|
||||
}
|
||||
|
||||
private:
|
||||
AmqpValue m_descriptor;
|
||||
@ -905,10 +924,23 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
|
||||
*/
|
||||
AmqpDescribed(AMQP_VALUE_DATA_TAG* const value);
|
||||
|
||||
/** @brief Compare this AmqpDescribed value with another.
|
||||
*
|
||||
* @param that - the AmqpDescribed to compare with.
|
||||
*/
|
||||
bool operator==(AmqpDescribed const& that) const
|
||||
{
|
||||
if (GetDescriptor() == that.GetDescriptor())
|
||||
{
|
||||
return GetValue() == that.GetValue();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Convert an existing AmqpComposite to an AmqpValue.
|
||||
*/
|
||||
operator AmqpValue const() const;
|
||||
explicit operator AmqpValue const() const;
|
||||
|
||||
/** @brief Returns the descriptor for this composite type.
|
||||
*
|
||||
@ -944,5 +976,4 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
|
||||
AmqpValue m_descriptor;
|
||||
AmqpValue m_value;
|
||||
};
|
||||
|
||||
}}}} // namespace Azure::Core::Amqp::Models
|
||||
|
||||
@ -91,7 +91,7 @@ struct EventHubPartitionProperties final
|
||||
bool IsEmpty{};
|
||||
};
|
||||
|
||||
EventHubPartitionProperties GetPartitionProperties(
|
||||
std::tuple<bool, EventHubPartitionProperties> GetPartitionProperties(
|
||||
Azure::Core::Amqp::_internal::Session const& session,
|
||||
std::string const& eventHubName,
|
||||
std::string const& partitionId)
|
||||
@ -119,9 +119,11 @@ EventHubPartitionProperties GetPartitionProperties(
|
||||
message);
|
||||
|
||||
EventHubPartitionProperties properties;
|
||||
bool error{false};
|
||||
if (result.Status == Azure::Core::Amqp::_internal::ManagementOperationStatus::Error)
|
||||
{
|
||||
std::cerr << "Error: " << result.Message.ApplicationProperties["status-description"];
|
||||
std::cerr << "Error: " << result.Description;
|
||||
error = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -150,7 +152,7 @@ EventHubPartitionProperties GetPartitionProperties(
|
||||
}
|
||||
managementClient.Close();
|
||||
|
||||
return properties;
|
||||
return std::make_tuple(error, properties);
|
||||
}
|
||||
|
||||
int main()
|
||||
@ -194,16 +196,21 @@ int main()
|
||||
{
|
||||
std::cout << "Partition: " << partition << std::endl;
|
||||
auto partitionProperties = GetPartitionProperties(session, eventhubsEntity, partition);
|
||||
std::cout << "Partition properties: " << std::endl;
|
||||
std::cout << " Name: " << partitionProperties.Name << std::endl;
|
||||
std::cout << " PartitionId: " << partitionProperties.PartitionId << std::endl;
|
||||
std::cout << " BeginningSequenceNumber: " << partitionProperties.BeginningSequenceNumber
|
||||
<< std::endl;
|
||||
std::cout << " LastEnqueuedSequenceNumber: " << partitionProperties.LastEnqueuedSequenceNumber
|
||||
<< std::endl;
|
||||
std::cout << " LastEnqueuedOffset: " << partitionProperties.LastEnqueuedOffset << std::endl;
|
||||
std::cout << " LastEnqueuedTimeUtc: " << partitionProperties.LastEnqueuedTimeUtc.ToString()
|
||||
<< std::endl;
|
||||
std::cout << " IsEmpty: " << std::boolalpha << partitionProperties.IsEmpty << std::endl;
|
||||
if (!std::get<0>(partitionProperties))
|
||||
{
|
||||
std::cout << "Partition properties: " << std::endl;
|
||||
std::cout << " Name: " << std::get<1>(partitionProperties).Name << std::endl;
|
||||
std::cout << " PartitionId: " << std::get<1>(partitionProperties).PartitionId << std::endl;
|
||||
std::cout << " BeginningSequenceNumber: "
|
||||
<< std::get<1>(partitionProperties).BeginningSequenceNumber << std::endl;
|
||||
std::cout << " LastEnqueuedSequenceNumber: "
|
||||
<< std::get<1>(partitionProperties).LastEnqueuedSequenceNumber << std::endl;
|
||||
std::cout << " LastEnqueuedOffset: " << std::get<1>(partitionProperties).LastEnqueuedOffset
|
||||
<< std::endl;
|
||||
std::cout << " LastEnqueuedTimeUtc: "
|
||||
<< std::get<1>(partitionProperties).LastEnqueuedTimeUtc.ToString() << std::endl;
|
||||
std::cout << " IsEmpty: " << std::boolalpha << std::get<1>(partitionProperties).IsEmpty
|
||||
<< std::endl;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -5,6 +5,7 @@
|
||||
|
||||
#include "azure/core/amqp/connection.hpp"
|
||||
#include "azure/core/amqp/models/amqp_message.hpp"
|
||||
#include "azure/core/amqp/models/messaging_values.hpp"
|
||||
#include "azure/core/amqp/session.hpp"
|
||||
#include "private/connection_impl.hpp"
|
||||
#include "private/management_impl.hpp"
|
||||
@ -23,11 +24,13 @@
|
||||
using namespace Azure::Core::Diagnostics::_internal;
|
||||
using namespace Azure::Core::Diagnostics;
|
||||
|
||||
#if UAMQP_MANAGEMENT_CLIENT
|
||||
void Azure::Core::_internal::UniqueHandleHelper<AMQP_MANAGEMENT_INSTANCE_TAG>::FreeAmqpManagement(
|
||||
AMQP_MANAGEMENT_HANDLE value)
|
||||
{
|
||||
amqp_management_destroy(value);
|
||||
}
|
||||
#endif
|
||||
|
||||
namespace Azure { namespace Core { namespace Amqp { namespace _internal {
|
||||
|
||||
@ -71,9 +74,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
|
||||
}
|
||||
ManagementClientImpl::~ManagementClientImpl() noexcept { m_eventHandler = nullptr; }
|
||||
|
||||
#if UAMQP_MANAGEMENT_CLIENT
|
||||
void ManagementClientImpl::CreateManagementClient()
|
||||
{
|
||||
|
||||
m_management.reset(amqp_management_create(*m_session, m_options.ManagementNodeName.c_str()));
|
||||
if (!m_management)
|
||||
{
|
||||
@ -100,13 +103,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
_internal::ManagementOpenStatus ManagementClientImpl::Open(Context const& context)
|
||||
{
|
||||
/** Authentication needs to happen *before* the management object is created. */
|
||||
m_session->AuthenticateIfNeeded(
|
||||
m_managementEntityPath + "/" + m_options.ManagementNodeName, context);
|
||||
|
||||
#if UAMQP_MANAGEMENT_CLIENT
|
||||
CreateManagementClient();
|
||||
|
||||
if (amqp_management_open_async(
|
||||
@ -134,6 +138,51 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
|
||||
}
|
||||
}
|
||||
throw Azure::Core::OperationCancelledException("Management Open operation was cancelled.");
|
||||
#else
|
||||
{
|
||||
_internal::MessageSenderOptions messageSenderOptions;
|
||||
messageSenderOptions.EnableTrace = m_options.EnableTrace;
|
||||
messageSenderOptions.MessageSource = m_options.ManagementNodeName;
|
||||
messageSenderOptions.Name = m_options.ManagementNodeName + "-sender";
|
||||
messageSenderOptions.AuthenticationRequired = false;
|
||||
|
||||
m_messageSender = std::make_shared<MessageSenderImpl>(
|
||||
m_session, m_options.ManagementNodeName, messageSenderOptions, this);
|
||||
}
|
||||
{
|
||||
_internal::MessageReceiverOptions messageReceiverOptions;
|
||||
messageReceiverOptions.EnableTrace = m_options.EnableTrace;
|
||||
messageReceiverOptions.MessageTarget = m_options.ManagementNodeName;
|
||||
messageReceiverOptions.Name = m_options.ManagementNodeName + "-receiver";
|
||||
messageReceiverOptions.AuthenticationRequired = false;
|
||||
|
||||
m_messageReceiver = std::make_shared<MessageReceiverImpl>(
|
||||
m_session, m_options.ManagementNodeName, messageReceiverOptions, this);
|
||||
}
|
||||
|
||||
// Now open the message sender and receiver.
|
||||
SetState(ManagementState::Opening);
|
||||
m_messageSender->Open(context);
|
||||
m_messageReceiver->Open(context);
|
||||
|
||||
// And finally, wait for the message sender and receiver to finish opening before we return.
|
||||
auto result = m_openCompleteQueue.WaitForPolledResult(context, *m_session->GetConnection());
|
||||
if (result)
|
||||
{
|
||||
// If the message sender or receiver failed to open, we need to close them
|
||||
_internal::ManagementOpenStatus rv = std::get<0>(*result);
|
||||
if (rv != _internal::ManagementOpenStatus::Ok)
|
||||
{
|
||||
m_messageSender->Close();
|
||||
m_messageSenderOpen = false;
|
||||
m_messageReceiver->Close();
|
||||
m_messageReceiverOpen = false;
|
||||
}
|
||||
return rv;
|
||||
}
|
||||
// If result is null, then it means that the context was cancelled.
|
||||
throw Azure::Core::OperationCancelledException("Management Open operation was cancelled.");
|
||||
#endif // UAMQP_MANAGEMENT_CLIENT
|
||||
}
|
||||
|
||||
_internal::ManagementOperationResult ManagementClientImpl::ExecuteOperation(
|
||||
@ -148,6 +197,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
|
||||
{
|
||||
messageToSend.ApplicationProperties["security_token"] = Models::AmqpValue{token};
|
||||
}
|
||||
#if UAMQP_MANAGEMENT_CLIENT
|
||||
if (!amqp_management_execute_operation_async(
|
||||
m_management.get(),
|
||||
operationToPerform.c_str(),
|
||||
@ -174,10 +224,83 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
|
||||
{
|
||||
throw Azure::Core::OperationCancelledException("Management operation cancelled.");
|
||||
}
|
||||
#else
|
||||
messageToSend.ApplicationProperties.emplace("operation", operationToPerform);
|
||||
messageToSend.ApplicationProperties.emplace("type", typeOfOperation);
|
||||
if (!locales.empty())
|
||||
{
|
||||
messageToSend.ApplicationProperties.emplace("locales", locales);
|
||||
}
|
||||
messageToSend.Properties.MessageId = m_nextMessageId;
|
||||
m_expectedMessageId = m_nextMessageId;
|
||||
m_sendCompleted = false;
|
||||
m_nextMessageId++;
|
||||
|
||||
m_messageSender->QueueSend(
|
||||
messageToSend,
|
||||
[&](_internal::MessageSendStatus sendStatus, Models::AmqpValue const& deliveryState) {
|
||||
m_sendCompleted = true;
|
||||
Log::Stream(Logger::Level::Informational)
|
||||
<< "Management operation send complete. Status: " << static_cast<int>(sendStatus)
|
||||
<< ", DeliveryState: " << deliveryState;
|
||||
if (sendStatus != _internal::MessageSendStatus::Ok)
|
||||
{
|
||||
std::string errorDescription = "Send failed.";
|
||||
auto deliveryStateAsList{deliveryState.AsList()};
|
||||
Models::AmqpValue firstState{deliveryStateAsList[0]};
|
||||
ERROR_HANDLE errorHandle;
|
||||
if (!amqpvalue_get_error(firstState, &errorHandle))
|
||||
{
|
||||
Models::_internal::UniqueAmqpErrorHandle uniqueError{
|
||||
errorHandle}; // This will free the error handle when it goes out of scope.
|
||||
Models::_internal::AmqpError error{
|
||||
Models::_internal::AmqpErrorFactory::FromUamqp(errorHandle)};
|
||||
errorDescription = error.Description;
|
||||
}
|
||||
m_messageQueue.CompleteOperation(
|
||||
_internal::ManagementOperationStatus::Error,
|
||||
500,
|
||||
errorDescription,
|
||||
Models::AmqpMessage{});
|
||||
}
|
||||
},
|
||||
context);
|
||||
auto result = m_messageQueue.WaitForPolledResult(context, *m_session->GetConnection());
|
||||
if (result)
|
||||
{
|
||||
_internal::ManagementOperationResult rv;
|
||||
rv.Status = std::get<0>(*result);
|
||||
rv.StatusCode = std::get<1>(*result);
|
||||
rv.Description = std::get<2>(*result);
|
||||
rv.Message = std::get<3>(*result);
|
||||
return rv;
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Azure::Core::OperationCancelledException("Management operation cancelled.");
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
void ManagementClientImpl::Close() { amqp_management_close(m_management.get()); }
|
||||
void ManagementClientImpl::SetState(ManagementState newState) { m_state = newState; }
|
||||
|
||||
void ManagementClientImpl::Close()
|
||||
{
|
||||
#if UAMQP_MANAGEMENT_CLIENT
|
||||
amqp_management_close(m_management.get());
|
||||
#endif
|
||||
SetState(ManagementState::Closing);
|
||||
if (m_messageSender && m_messageSenderOpen)
|
||||
{
|
||||
m_messageSender->Close();
|
||||
}
|
||||
if (m_messageReceiver && m_messageReceiverOpen)
|
||||
{
|
||||
m_messageReceiver->Close();
|
||||
}
|
||||
}
|
||||
|
||||
#if UAMQP_MANAGEMENT_IMPLEMENTATION
|
||||
void ManagementClientImpl::OnOpenCompleteFn(void* context, AMQP_MANAGEMENT_OPEN_RESULT openResult)
|
||||
{
|
||||
ManagementClientImpl* management = static_cast<ManagementClientImpl*>(context);
|
||||
@ -188,7 +311,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
|
||||
}
|
||||
management->m_openCompleteQueue.CompleteOperation(openResult);
|
||||
}
|
||||
|
||||
void ManagementClientImpl::OnManagementErrorFn(void* context)
|
||||
{
|
||||
ManagementClientImpl* management = static_cast<ManagementClientImpl*>(context);
|
||||
@ -249,5 +371,328 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
|
||||
throw std::runtime_error("Unknown management status."); // LCOV_EXCL_LINE
|
||||
}
|
||||
}
|
||||
#else
|
||||
|
||||
void ManagementClientImpl::OnMessageSenderStateChanged(
|
||||
_internal::MessageSender const&,
|
||||
_internal::MessageSenderState newState,
|
||||
_internal::MessageSenderState oldState)
|
||||
{
|
||||
if (newState == oldState)
|
||||
{
|
||||
Log::Stream(Logger::Level::Verbose)
|
||||
<< "OnMessageSenderStateChanged: newState == oldState" << std::endl;
|
||||
return;
|
||||
}
|
||||
|
||||
switch (m_state)
|
||||
{
|
||||
case ManagementState::Opening:
|
||||
switch (newState)
|
||||
{
|
||||
// If the message sender is opening, we don't need to do anything.
|
||||
case _internal::MessageSenderState::Opening:
|
||||
break;
|
||||
// If the message sender is open, remember it. If the message receiver is also
|
||||
// open, complete the outstanding open.
|
||||
case _internal::MessageSenderState::Open:
|
||||
m_messageSenderOpen = true;
|
||||
if (m_messageReceiverOpen)
|
||||
{
|
||||
SetState(ManagementState::Open);
|
||||
m_openCompleteQueue.CompleteOperation(_internal::ManagementOpenStatus::Ok);
|
||||
}
|
||||
break;
|
||||
// If the message sender is transitioning to an error or state other than open,
|
||||
// it's an error.
|
||||
default:
|
||||
case _internal::MessageSenderState::Idle:
|
||||
case _internal::MessageSenderState::Closing:
|
||||
case _internal::MessageSenderState::Error:
|
||||
Log::Stream(Logger::Level::Error)
|
||||
<< "Message Sender Changed State to " << static_cast<int>(newState)
|
||||
<< " while management client is opening";
|
||||
SetState(ManagementState::Closing);
|
||||
m_openCompleteQueue.CompleteOperation(_internal::ManagementOpenStatus::Error);
|
||||
break;
|
||||
}
|
||||
break;
|
||||
case ManagementState::Open:
|
||||
switch (newState)
|
||||
{
|
||||
// If the message sender goes to a non-open state, it's an error.
|
||||
default:
|
||||
case _internal::MessageSenderState::Idle:
|
||||
case _internal::MessageSenderState::Closing:
|
||||
case _internal::MessageSenderState::Error:
|
||||
Log::Stream(Logger::Level::Error)
|
||||
<< "Message Sender Changed State to " << static_cast<int>(newState)
|
||||
<< " while management client is open";
|
||||
SetState(ManagementState::Closing);
|
||||
if (m_eventHandler)
|
||||
{
|
||||
m_eventHandler->OnError(Models::_internal::AmqpError{});
|
||||
}
|
||||
break;
|
||||
// Ignore message sender open changes.
|
||||
case _internal::MessageSenderState::Open:
|
||||
break;
|
||||
}
|
||||
break;
|
||||
case ManagementState::Closing:
|
||||
switch (newState)
|
||||
{
|
||||
// If the message sender goes to a non-open state, it's an error.
|
||||
default:
|
||||
case _internal::MessageSenderState::Open:
|
||||
case _internal::MessageSenderState::Opening:
|
||||
case _internal::MessageSenderState::Error:
|
||||
Log::Stream(Logger::Level::Error)
|
||||
<< "Message Sender Changed State to " << static_cast<int>(newState)
|
||||
<< " while management client is closing";
|
||||
SetState(ManagementState::Closing);
|
||||
if (m_eventHandler)
|
||||
{
|
||||
m_eventHandler->OnError(Models::_internal::AmqpError{});
|
||||
}
|
||||
break;
|
||||
// Ignore message sender closing or idle state changes.
|
||||
case _internal::MessageSenderState::Idle:
|
||||
case _internal::MessageSenderState::Closing:
|
||||
break;
|
||||
}
|
||||
break;
|
||||
case ManagementState::Idle:
|
||||
case ManagementState::Error:
|
||||
Log::Stream(Logger::Level::Error)
|
||||
<< "Message sender state changed to " << static_cast<int>(newState)
|
||||
<< " when management client is in the error state, ignoring.";
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void ManagementClientImpl::OnMessageSenderDisconnected(Models::_internal::AmqpError const& error)
|
||||
{
|
||||
Log::Stream(Logger::Level::Error) << "Message sender disconnected: " << error << std::endl;
|
||||
SetState(ManagementState::Error);
|
||||
if (m_eventHandler)
|
||||
{
|
||||
m_eventHandler->OnError(error);
|
||||
}
|
||||
}
|
||||
|
||||
void ManagementClientImpl::OnMessageReceiverStateChanged(
|
||||
_internal::MessageReceiver const&,
|
||||
_internal::MessageReceiverState newState,
|
||||
_internal::MessageReceiverState oldState)
|
||||
{
|
||||
if (newState == oldState)
|
||||
{
|
||||
Log::Stream(Logger::Level::Error)
|
||||
<< "OnMessageReceiverStateChanged: newState == oldState" << std::endl;
|
||||
return;
|
||||
}
|
||||
|
||||
switch (m_state)
|
||||
{
|
||||
case ManagementState::Opening:
|
||||
switch (newState)
|
||||
{
|
||||
// If the message sender is opening, we don't need to do anything.
|
||||
case _internal::MessageReceiverState::Opening:
|
||||
break;
|
||||
// If the message receiver is open, remember it. If the message sender is also
|
||||
// open, complete the outstanding open.
|
||||
case _internal::MessageReceiverState::Open:
|
||||
m_messageReceiverOpen = true;
|
||||
if (m_messageSenderOpen)
|
||||
{
|
||||
SetState(ManagementState::Open);
|
||||
m_openCompleteQueue.CompleteOperation(_internal::ManagementOpenStatus::Ok);
|
||||
}
|
||||
break;
|
||||
// If the message receiver is transitioning to an error or state other than open,
|
||||
// it's an error.
|
||||
default:
|
||||
case _internal::MessageReceiverState::Idle:
|
||||
case _internal::MessageReceiverState::Closing:
|
||||
case _internal::MessageReceiverState::Error:
|
||||
Log::Stream(Logger::Level::Error)
|
||||
<< "Message Receiver Changed State to " << static_cast<int>(newState)
|
||||
<< " while management client is opening";
|
||||
SetState(ManagementState::Closing);
|
||||
m_openCompleteQueue.CompleteOperation(_internal::ManagementOpenStatus::Error);
|
||||
break;
|
||||
}
|
||||
break;
|
||||
case ManagementState::Open:
|
||||
switch (newState)
|
||||
{
|
||||
// If the message sender goes to a non-open state, it's an error.
|
||||
default:
|
||||
case _internal::MessageReceiverState::Idle:
|
||||
case _internal::MessageReceiverState::Closing:
|
||||
case _internal::MessageReceiverState::Error:
|
||||
Log::Stream(Logger::Level::Error)
|
||||
<< "Message Sender Changed State to " << static_cast<int>(newState)
|
||||
<< " while management client is open";
|
||||
SetState(ManagementState::Closing);
|
||||
if (m_eventHandler)
|
||||
{
|
||||
m_eventHandler->OnError(Models::_internal::AmqpError{});
|
||||
}
|
||||
break;
|
||||
// Ignore message sender open changes.
|
||||
case _internal::MessageReceiverState::Open:
|
||||
break;
|
||||
}
|
||||
break;
|
||||
case ManagementState::Closing:
|
||||
switch (newState)
|
||||
{
|
||||
// If the message sender goes to a non-open state, it's an error.
|
||||
default:
|
||||
case _internal::MessageReceiverState::Open:
|
||||
case _internal::MessageReceiverState::Opening:
|
||||
case _internal::MessageReceiverState::Error:
|
||||
Log::Stream(Logger::Level::Error)
|
||||
<< "Message Sender Changed State to " << static_cast<int>(newState)
|
||||
<< " while management client is closing";
|
||||
SetState(ManagementState::Closing);
|
||||
if (m_eventHandler)
|
||||
{
|
||||
m_eventHandler->OnError(Models::_internal::AmqpError{});
|
||||
}
|
||||
break;
|
||||
// Ignore message sender closing or idle state changes.
|
||||
case _internal::MessageReceiverState::Idle:
|
||||
case _internal::MessageReceiverState::Closing:
|
||||
break;
|
||||
}
|
||||
break;
|
||||
case ManagementState::Idle:
|
||||
case ManagementState::Error:
|
||||
Log::Stream(Logger::Level::Error)
|
||||
<< "Message sender state changed to " << static_cast<int>(newState)
|
||||
<< " when management client is in the error state, ignoring.";
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Models::AmqpValue ManagementClientImpl::IndicateError(
|
||||
std::string const& condition,
|
||||
std::string const& description)
|
||||
{
|
||||
Log::Stream(Logger::Level::Error)
|
||||
<< "Indicate Management Error: " << condition << " " << description;
|
||||
if (m_eventHandler)
|
||||
{
|
||||
// Let external callers know that the error was triggered.
|
||||
Models::_internal::AmqpError error;
|
||||
error.Condition = Models::_internal::AmqpErrorCondition(condition);
|
||||
error.Description = "Message Delivery Rejected: " + description;
|
||||
m_eventHandler->OnError(error);
|
||||
}
|
||||
|
||||
// Complete any outstanding receives with an error.
|
||||
m_messageQueue.CompleteOperation(
|
||||
_internal::ManagementOperationStatus::Error, 500, description, Models::AmqpMessage());
|
||||
|
||||
return Models::_internal::Messaging::DeliveryRejected(condition, description);
|
||||
}
|
||||
|
||||
Models::AmqpValue ManagementClientImpl::OnMessageReceived(
|
||||
_internal::MessageReceiver const&,
|
||||
Models::AmqpMessage const& message)
|
||||
{
|
||||
if (message.ApplicationProperties.empty())
|
||||
{
|
||||
return IndicateError(
|
||||
Models::_internal::AmqpErrorCondition::InternalError.ToString(),
|
||||
"Received message does not have application properties.");
|
||||
}
|
||||
if (!message.Properties.CorrelationId.HasValue())
|
||||
{
|
||||
return IndicateError(
|
||||
Models::_internal::AmqpErrorCondition::InternalError.ToString(),
|
||||
"Received message correlation ID not found.");
|
||||
}
|
||||
else if (message.Properties.CorrelationId.Value().GetType() != Models::AmqpValueType::Ulong)
|
||||
{
|
||||
return IndicateError(
|
||||
Models::_internal::AmqpErrorCondition::InternalError.ToString(),
|
||||
"Received message correlation ID is not a ulong.");
|
||||
}
|
||||
uint64_t correlationId = message.Properties.CorrelationId.Value();
|
||||
|
||||
auto statusCodeMap = message.ApplicationProperties.find(m_options.ExpectedStatusCodeKeyName);
|
||||
if (statusCodeMap == message.ApplicationProperties.end())
|
||||
{
|
||||
return IndicateError(
|
||||
Models::_internal::AmqpErrorCondition::InternalError.ToString(),
|
||||
"Received message does not have a " + m_options.ExpectedStatusCodeKeyName
|
||||
+ " status code key.");
|
||||
}
|
||||
else if (statusCodeMap->second.GetType() != Models::AmqpValueType::Int)
|
||||
{
|
||||
return IndicateError(
|
||||
Models::_internal::AmqpErrorCondition::InternalError.ToString(),
|
||||
"Received message " + m_options.ExpectedStatusCodeKeyName + " value is not an int.");
|
||||
}
|
||||
int32_t statusCode = statusCodeMap->second;
|
||||
|
||||
// If the message has a status description, remember it.
|
||||
auto statusDescription
|
||||
= message.ApplicationProperties.find(m_options.ExpectedStatusDescriptionKeyName);
|
||||
std::string description;
|
||||
if (statusDescription != message.ApplicationProperties.end())
|
||||
{
|
||||
if (statusDescription->second.GetType() != Models::AmqpValueType::String)
|
||||
{
|
||||
return IndicateError(
|
||||
Models::_internal::AmqpErrorCondition::InternalError.ToString(),
|
||||
"Received message " + m_options.ExpectedStatusDescriptionKeyName
|
||||
+ " value is not a string.");
|
||||
}
|
||||
description = static_cast<std::string>(statusDescription->second);
|
||||
}
|
||||
|
||||
if (correlationId != m_expectedMessageId)
|
||||
{
|
||||
return IndicateError(
|
||||
Models::_internal::AmqpErrorCondition::InternalError.ToString(),
|
||||
"Received message correlation ID does not match request ID.");
|
||||
}
|
||||
if (!m_sendCompleted)
|
||||
{
|
||||
Log::Stream(Logger::Level::Error) << "Received message before send completed.";
|
||||
}
|
||||
|
||||
// AMQP management statusCode values are [RFC
|
||||
// 2616](https://www.rfc-editor.org/rfc/rfc2616#section-6.1.1) status codes.
|
||||
if ((statusCode < 200) || (statusCode > 299))
|
||||
{
|
||||
m_messageQueue.CompleteOperation(
|
||||
_internal::ManagementOperationStatus::FailedBadStatus, statusCode, description, message);
|
||||
}
|
||||
else
|
||||
{
|
||||
m_messageQueue.CompleteOperation(
|
||||
_internal::ManagementOperationStatus::Ok, statusCode, description, message);
|
||||
}
|
||||
return Models::_internal::Messaging::DeliveryAccepted();
|
||||
}
|
||||
|
||||
void ManagementClientImpl::OnMessageReceiverDisconnected(
|
||||
Models::_internal::AmqpError const& error)
|
||||
{
|
||||
Log::Stream(Logger::Level::Error) << "Message receiver disconnected: " << error << std::endl;
|
||||
SetState(ManagementState::Error);
|
||||
if (m_eventHandler)
|
||||
{
|
||||
m_eventHandler->OnError(error);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
}}}} // namespace Azure::Core::Amqp::_detail
|
||||
|
||||
@ -161,6 +161,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
|
||||
std::pair<Azure::Nullable<Models::AmqpMessage>, Models::_internal::AmqpError>
|
||||
MessageReceiverImpl::WaitForIncomingMessage(Context const& context)
|
||||
{
|
||||
if (m_eventHandler)
|
||||
{
|
||||
throw std::runtime_error("Cannot call WaitForIncomingMessage when using an event handler.");
|
||||
}
|
||||
auto result = m_messageQueue.WaitForPolledResult(context, *m_session->GetConnection());
|
||||
if (result)
|
||||
{
|
||||
@ -261,7 +265,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
|
||||
|
||||
void MessageReceiverImpl::Open(Azure::Core::Context const& context)
|
||||
{
|
||||
m_session->AuthenticateIfNeeded(static_cast<std::string>(m_source.GetAddress()), context);
|
||||
if (m_options.AuthenticationRequired)
|
||||
{
|
||||
m_session->AuthenticateIfNeeded(static_cast<std::string>(m_source.GetAddress()), context);
|
||||
}
|
||||
|
||||
// Once we've authenticated the connection, establish the link and receiver.
|
||||
// We cannot do this before authenticating the client.
|
||||
|
||||
@ -188,9 +188,12 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
|
||||
|
||||
void MessageSenderImpl::Open(Context const& context)
|
||||
{
|
||||
// If we need to authenticate with either ServiceBus or BearerToken, now is the time to do
|
||||
// it.
|
||||
m_session->AuthenticateIfNeeded(static_cast<std::string>(m_target.GetAddress()), context);
|
||||
if (m_options.AuthenticationRequired)
|
||||
{
|
||||
// If we need to authenticate with either ServiceBus or BearerToken, now is the time to do
|
||||
// it.
|
||||
m_session->AuthenticateIfNeeded(static_cast<std::string>(m_target.GetAddress()), context);
|
||||
}
|
||||
|
||||
if (m_link == nullptr)
|
||||
{
|
||||
|
||||
@ -8,6 +8,7 @@
|
||||
#include "azure/core/amqp/session.hpp"
|
||||
#include "connection_impl.hpp"
|
||||
#include "message_receiver_impl.hpp"
|
||||
#include "message_sender_impl.hpp"
|
||||
#include "session_impl.hpp"
|
||||
|
||||
#include <azure/core/credentials/credentials.hpp>
|
||||
@ -15,8 +16,9 @@
|
||||
#include <azure_uamqp_c/amqp_management.h>
|
||||
|
||||
#include <memory>
|
||||
#include <queue>
|
||||
#include <vector>
|
||||
|
||||
#if UAMQP_MANAGEMENT_IMPLEMENTATION
|
||||
template <> struct Azure::Core::_internal::UniqueHandleHelper<AMQP_MANAGEMENT_INSTANCE_TAG>
|
||||
{
|
||||
static void FreeAmqpManagement(AMQP_MANAGEMENT_INSTANCE_TAG* obj);
|
||||
@ -27,6 +29,7 @@ template <> struct Azure::Core::_internal::UniqueHandleHelper<AMQP_MANAGEMENT_IN
|
||||
|
||||
using UniqueAmqpManagementHandle
|
||||
= Azure::Core::_internal::UniqueHandle<AMQP_MANAGEMENT_INSTANCE_TAG>;
|
||||
#endif
|
||||
|
||||
namespace Azure { namespace Core { namespace Amqp { namespace _detail {
|
||||
|
||||
@ -45,7 +48,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
|
||||
}
|
||||
};
|
||||
|
||||
class ManagementClientImpl final : public ::std::enable_shared_from_this<ManagementClientImpl> {
|
||||
class ManagementClientImpl final : public ::std::enable_shared_from_this<ManagementClientImpl>,
|
||||
public _internal::MessageSenderEvents,
|
||||
public _internal::MessageReceiverEvents {
|
||||
public:
|
||||
ManagementClientImpl(
|
||||
std::shared_ptr<SessionImpl> session,
|
||||
@ -76,15 +81,46 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
|
||||
Context const& context);
|
||||
|
||||
private:
|
||||
enum class ManagementState
|
||||
{
|
||||
Idle,
|
||||
Opening,
|
||||
Closing,
|
||||
Open,
|
||||
Error
|
||||
};
|
||||
#if UAMQP_MANAGEMENT_IMPLEMENTATION
|
||||
UniqueAmqpManagementHandle m_management{};
|
||||
Azure::Core::Amqp::Common::_internal::AsyncOperationQueue<AMQP_MANAGEMENT_OPEN_RESULT>
|
||||
m_openCompleteQueue;
|
||||
#else
|
||||
std::shared_ptr<MessageSenderImpl> m_messageSender;
|
||||
std::shared_ptr<MessageReceiverImpl> m_messageReceiver;
|
||||
ManagementState m_state = ManagementState::Idle;
|
||||
bool m_messageSenderOpen{false};
|
||||
bool m_messageReceiverOpen{false};
|
||||
Azure::Core::Amqp::Common::_internal::AsyncOperationQueue<_internal::ManagementOpenStatus>
|
||||
m_openCompleteQueue;
|
||||
|
||||
uint64_t m_nextMessageId{0};
|
||||
|
||||
// What is the message ID expected for the current outstanding operation?
|
||||
uint64_t m_expectedMessageId;
|
||||
bool m_sendCompleted{false};
|
||||
|
||||
void SetState(ManagementState newState);
|
||||
// Reflect the error state to the OnError callback and return a delivery rejected status.
|
||||
Models::AmqpValue IndicateError(
|
||||
std::string const& errorCondition,
|
||||
std::string const& errorDescription);
|
||||
|
||||
#endif
|
||||
std::string m_managementNodeName;
|
||||
_internal::ManagementClientOptions m_options;
|
||||
std::string m_source;
|
||||
std::shared_ptr<SessionImpl> m_session;
|
||||
_internal::ManagementClientEvents* m_eventHandler{};
|
||||
std::string m_managementEntityPath;
|
||||
Azure::Core::Amqp::Common::_internal::AsyncOperationQueue<AMQP_MANAGEMENT_OPEN_RESULT>
|
||||
m_openCompleteQueue;
|
||||
|
||||
Azure::Core::Amqp::Common::_internal::AsyncOperationQueue<
|
||||
_internal::ManagementOperationStatus,
|
||||
@ -93,8 +129,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
|
||||
Models::AmqpMessage>
|
||||
m_messageQueue;
|
||||
|
||||
#if UAMQP_MANAGEMENT_IMPLEMENTATION
|
||||
void CreateManagementClient();
|
||||
|
||||
static void OnExecuteOperationCompleteFn(
|
||||
void* context,
|
||||
AMQP_MANAGEMENT_EXECUTE_OPERATION_RESULT executeResult,
|
||||
@ -103,5 +139,24 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
|
||||
MESSAGE_HANDLE messageHandle);
|
||||
static void OnManagementErrorFn(void* context);
|
||||
static void OnOpenCompleteFn(void* context, AMQP_MANAGEMENT_OPEN_RESULT openResult);
|
||||
#else
|
||||
|
||||
// Inherited via MessageSenderEvents
|
||||
virtual void OnMessageSenderStateChanged(
|
||||
_internal::MessageSender const& sender,
|
||||
_internal::MessageSenderState newState,
|
||||
_internal::MessageSenderState oldState) override;
|
||||
virtual void OnMessageSenderDisconnected(Models::_internal::AmqpError const& error) override;
|
||||
|
||||
// Inherited via MessageReceiverEvents
|
||||
virtual void OnMessageReceiverStateChanged(
|
||||
_internal::MessageReceiver const& receiver,
|
||||
_internal::MessageReceiverState newState,
|
||||
_internal::MessageReceiverState oldState) override;
|
||||
virtual Models::AmqpValue OnMessageReceived(
|
||||
_internal::MessageReceiver const& receiver,
|
||||
Models::AmqpMessage const& message) override;
|
||||
virtual void OnMessageReceiverDisconnected(Models::_internal::AmqpError const& error) override;
|
||||
#endif
|
||||
};
|
||||
}}}} // namespace Azure::Core::Amqp::_detail
|
||||
|
||||
@ -57,7 +57,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace
|
||||
// information gets in the way of the message.
|
||||
if (logCategory == AZ_LOG_TRACE
|
||||
&& (strcmp(func, "log_outgoing_frame") == 0 || strcmp(func, "log_incoming_frame") == 0
|
||||
|| strcmp(func, "log_message_chunk") == 0))
|
||||
|| strcmp(func, "log_message_chunk") == 0 || strcmp(func, "_log_outgoing_frame") == 0
|
||||
|| strcmp(func, "_log_incoming_frame") == 0))
|
||||
{
|
||||
}
|
||||
else
|
||||
|
||||
@ -455,15 +455,16 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
|
||||
// described body.
|
||||
AmqpDescribed describedBody(
|
||||
static_cast<std::uint64_t>(AmqpDescriptors::DataAmqpValue), message.m_amqpValueBody);
|
||||
auto serializedBodyValue = AmqpValue::Serialize(describedBody);
|
||||
auto serializedBodyValue = AmqpValue::Serialize(static_cast<AmqpValue>(describedBody));
|
||||
rv.insert(rv.end(), serializedBodyValue.begin(), serializedBodyValue.end());
|
||||
}
|
||||
break;
|
||||
case MessageBodyType::Data:
|
||||
for (auto const& val : message.m_binaryDataBody)
|
||||
{
|
||||
AmqpDescribed describedBody(static_cast<std::uint64_t>(AmqpDescriptors::DataBinary), val);
|
||||
auto serializedBodyValue = AmqpValue::Serialize(describedBody);
|
||||
AmqpDescribed describedBody(
|
||||
static_cast<std::uint64_t>(AmqpDescriptors::DataBinary), static_cast<AmqpValue>(val));
|
||||
auto serializedBodyValue = AmqpValue::Serialize(static_cast<AmqpValue>(describedBody));
|
||||
rv.insert(rv.end(), serializedBodyValue.begin(), serializedBodyValue.end());
|
||||
}
|
||||
break;
|
||||
@ -471,8 +472,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
|
||||
for (auto const& val : message.m_amqpSequenceBody)
|
||||
{
|
||||
AmqpDescribed describedBody(
|
||||
static_cast<std::uint64_t>(AmqpDescriptors::DataAmqpSequence), val);
|
||||
auto serializedBodyValue = AmqpValue::Serialize(describedBody);
|
||||
static_cast<std::uint64_t>(AmqpDescriptors::DataAmqpSequence),
|
||||
static_cast<AmqpValue>(val));
|
||||
auto serializedBodyValue = AmqpValue::Serialize(static_cast<AmqpValue>(describedBody));
|
||||
rv.insert(rv.end(), serializedBodyValue.begin(), serializedBodyValue.end());
|
||||
}
|
||||
}
|
||||
|
||||
@ -251,6 +251,38 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
|
||||
|
||||
bool AmqpValue::operator==(AmqpValue const& that) const
|
||||
{
|
||||
// If both values are null, they are equal.
|
||||
if (IsNull() && that.IsNull())
|
||||
{
|
||||
return true;
|
||||
}
|
||||
// If only one of the values is null, they are not equal.
|
||||
if (IsNull() || that.IsNull())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
// If the types are not equal, they are not equal.
|
||||
if (GetType() != that.GetType())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
// The uAMQP function `amqpvalue_are_equal` does not work for all types, so we need to do some
|
||||
// special handling for those which are not handled properly.
|
||||
if (GetType() == AmqpValueType::Composite || GetType() == AmqpValueType::Described)
|
||||
{
|
||||
if (GetType() == AmqpValueType::Composite)
|
||||
{
|
||||
AmqpComposite thisComposite{this->AsComposite()};
|
||||
AmqpComposite thatComposite{that.AsComposite()};
|
||||
return (thisComposite == thatComposite);
|
||||
}
|
||||
else
|
||||
{
|
||||
AmqpDescribed thisDescribed{this->AsDescribed()};
|
||||
AmqpDescribed thatDescribed{that.AsDescribed()};
|
||||
return (thisDescribed == thatDescribed);
|
||||
}
|
||||
}
|
||||
return amqpvalue_are_equal(m_value.get(), that.m_value.get());
|
||||
}
|
||||
|
||||
|
||||
@ -21,6 +21,8 @@ TEST_F(TestMessage, SimpleCreate)
|
||||
|
||||
{
|
||||
AmqpMessage message1;
|
||||
message1.Properties.MessageId = 12345;
|
||||
message1.SetBody("Hello world");
|
||||
AmqpMessage message2(std::move(message1));
|
||||
AmqpMessage message3(message2);
|
||||
AmqpMessage message4;
|
||||
@ -126,7 +128,7 @@ TEST_F(TestMessage, TestBodyAmqpSequence)
|
||||
{
|
||||
AmqpMessage message;
|
||||
|
||||
message.SetBody({"Test", 95, AmqpMap{{3, 5}, {4, 9}}});
|
||||
message.SetBody({"Test", 95, static_cast<AmqpValue>(AmqpMap{{3, 5}, {4, 9}})});
|
||||
|
||||
EXPECT_EQ(1, message.GetBodyAsAmqpList().size());
|
||||
EXPECT_EQ("Test", static_cast<std::string>(message.GetBodyAsAmqpList()[0].at(0)));
|
||||
@ -145,7 +147,8 @@ TEST_F(TestMessage, TestBodyAmqpSequence)
|
||||
}
|
||||
{
|
||||
AmqpMessage message;
|
||||
message.SetBody({{1}, {"Test", 3}, {"Test", 95, AmqpMap{{3, 5}, {4, 9}}}});
|
||||
message.SetBody(
|
||||
{{1}, {"Test", 3}, {"Test", 95, static_cast<AmqpValue>(AmqpMap{{3, 5}, {4, 9}})}});
|
||||
EXPECT_EQ(3, message.GetBodyAsAmqpList().size());
|
||||
EXPECT_EQ("Test", static_cast<std::string>(message.GetBodyAsAmqpList()[1].at(0)));
|
||||
EXPECT_EQ(95, static_cast<int32_t>(message.GetBodyAsAmqpList()[2].at(1)));
|
||||
@ -223,7 +226,7 @@ TEST_F(MessageSerialization, SerializeMessageBodyBinary)
|
||||
std::vector<uint8_t> buffer;
|
||||
AmqpMessage message;
|
||||
message.Properties.MessageId = "12345";
|
||||
message.SetBody(AmqpMap{{"key1", "value1"}, {"key2", "value2"}});
|
||||
message.SetBody(static_cast<AmqpValue>(AmqpMap{{"key1", "value1"}, {"key2", "value2"}}));
|
||||
buffer = AmqpMessage::Serialize(message);
|
||||
AmqpMessage deserialized = AmqpMessage::Deserialize(buffer.data(), buffer.size());
|
||||
EXPECT_EQ(message, deserialized);
|
||||
|
||||
@ -2,6 +2,7 @@
|
||||
// SPDX-Licence-Identifier: MIT
|
||||
|
||||
#include "azure/core/amqp/common/global_state.hpp"
|
||||
#include "azure/core/amqp/models/amqp_error.hpp"
|
||||
#include "azure/core/amqp/models/amqp_value.hpp"
|
||||
|
||||
#include <algorithm>
|
||||
@ -261,6 +262,42 @@ TEST_F(TestValues, TestList)
|
||||
EXPECT_EQ(AmqpValue('a'), list2.at(3));
|
||||
EXPECT_FALSE(list1 < list2);
|
||||
}
|
||||
|
||||
{
|
||||
AmqpList test;
|
||||
AmqpDescribed desc{
|
||||
static_cast<uint64_t>(29),
|
||||
static_cast<AmqpValue>(AmqpList{AmqpValue{"test:error"}, AmqpValue{"test description"}})};
|
||||
test.push_back(AmqpValue{desc});
|
||||
EXPECT_EQ(1, test.size());
|
||||
EXPECT_EQ(AmqpValueType::Described, test[0].GetType());
|
||||
|
||||
AmqpList list2{test};
|
||||
EXPECT_EQ(1, list2.size());
|
||||
EXPECT_EQ(AmqpValueType::Described, list2[0].GetType());
|
||||
AmqpDescribed desc2{list2[0].AsDescribed()};
|
||||
EXPECT_EQ(desc2.GetDescriptor(), AmqpValue{static_cast<uint64_t>(29ll)});
|
||||
}
|
||||
|
||||
{
|
||||
AmqpList test;
|
||||
_internal::AmqpError error;
|
||||
error.Condition = _internal::AmqpErrorCondition("test:error");
|
||||
error.Description = "test description";
|
||||
test.push_back(_internal::AmqpErrorFactory::ToAmqp(error));
|
||||
EXPECT_EQ(1, test.size());
|
||||
EXPECT_EQ(AmqpValueType::Composite, test[0].GetType());
|
||||
AmqpComposite testAsComposite{test[0].AsComposite()};
|
||||
EXPECT_EQ(testAsComposite.GetDescriptor(), AmqpValue{static_cast<uint64_t>(29ll)});
|
||||
{
|
||||
AmqpValue testAsValue{test};
|
||||
EXPECT_EQ(AmqpValueType::List, testAsValue.GetType());
|
||||
|
||||
auto testAsList{testAsValue.AsList()};
|
||||
EXPECT_EQ(AmqpValueType::Composite, testAsList[0].GetType());
|
||||
EXPECT_EQ(test[0], testAsList[0]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TestValues, TestMap)
|
||||
@ -308,7 +345,7 @@ TEST_F(TestValues, TestArray)
|
||||
|
||||
EXPECT_EQ(5, array1.size());
|
||||
|
||||
AmqpValue value = array1;
|
||||
AmqpValue value = static_cast<AmqpValue>(array1);
|
||||
EXPECT_EQ(AmqpValueType::Array, value.GetType());
|
||||
|
||||
const AmqpArray array2 = value.AsArray();
|
||||
@ -398,7 +435,7 @@ TEST_F(TestValues, TestCompositeValue)
|
||||
// Put some things in the map.
|
||||
{
|
||||
AmqpComposite compositeVal(static_cast<uint64_t>(116ull), {25, 25.0f});
|
||||
AmqpValue value = compositeVal;
|
||||
AmqpValue value = static_cast<AmqpValue>(compositeVal);
|
||||
AmqpComposite testVal(value.AsComposite());
|
||||
|
||||
EXPECT_EQ(compositeVal.size(), testVal.size());
|
||||
@ -419,7 +456,7 @@ TEST_F(TestValues, TestDescribed)
|
||||
EXPECT_EQ(AmqpSymbol("My Composite Type"), described1.GetDescriptor().AsSymbol());
|
||||
EXPECT_EQ(5, static_cast<int32_t>(described1.GetValue()));
|
||||
|
||||
AmqpValue value = described1;
|
||||
AmqpValue value = static_cast<AmqpValue>(described1);
|
||||
EXPECT_EQ(AmqpValueType::Described, value.GetType());
|
||||
|
||||
AmqpDescribed described2 = value.AsDescribed();
|
||||
@ -436,7 +473,7 @@ TEST_F(TestValues, TestDescribed)
|
||||
EXPECT_EQ(937, static_cast<uint64_t>(value.GetDescriptor()));
|
||||
EXPECT_EQ(5, static_cast<int32_t>(value.GetValue()));
|
||||
|
||||
AmqpValue value2 = value;
|
||||
AmqpValue value2 = static_cast<AmqpValue>(value);
|
||||
|
||||
AmqpDescribed described2 = value2.AsDescribed();
|
||||
EXPECT_EQ(AmqpValueType::Described, value2.GetType());
|
||||
|
||||
@ -59,7 +59,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
|
||||
Session session{connection.CreateSession({})};
|
||||
ManagementClientOptions options;
|
||||
options.EnableTrace = 1;
|
||||
ManagementClient management(session.CreateManagementClient("Test", {}));
|
||||
ManagementClient management(session.CreateManagementClient("Test", options));
|
||||
|
||||
mockServer.StartListening();
|
||||
|
||||
@ -103,16 +103,20 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
|
||||
if (receiver.GetSourceName() != "$management" && receiver.GetSourceName() != "$cbs")
|
||||
{
|
||||
GTEST_LOG_(INFO) << "Rejecting message because it is for an unexpected node name.";
|
||||
return Azure::Core::Amqp::Models::_internal::Messaging::DeliveryRejected(
|
||||
auto rv = Azure::Core::Amqp::Models::_internal::Messaging::DeliveryRejected(
|
||||
"test:Rejected", "Unknown message source.");
|
||||
GTEST_LOG_(INFO) << "RV=" << rv;
|
||||
return rv;
|
||||
}
|
||||
// If this is coming on the management node, we only support the Test operation.
|
||||
if (receiver.GetSourceName() == "$management"
|
||||
&& incomingMessage.ApplicationProperties.at("operation") != "Test")
|
||||
{
|
||||
GTEST_LOG_(INFO) << "Rejecting message because is for an unknown operation.";
|
||||
return Azure::Core::Amqp::Models::_internal::Messaging::DeliveryRejected(
|
||||
GTEST_LOG_(INFO) << "Rejecting message because it is for an unknown operation.";
|
||||
auto rv = Azure::Core::Amqp::Models::_internal::Messaging::DeliveryRejected(
|
||||
"amqp:status:rejected", "Unknown Request operation");
|
||||
GTEST_LOG_(INFO) << "RV=" << rv;
|
||||
return rv;
|
||||
}
|
||||
return AmqpServerMock::OnMessageReceived(receiver, incomingMessage);
|
||||
}
|
||||
@ -283,8 +287,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
|
||||
|
||||
auto response = management.ExecuteOperation("Test", "Type", "Locales", messageToSend);
|
||||
EXPECT_EQ(response.Status, ManagementOperationStatus::Error);
|
||||
EXPECT_EQ(response.StatusCode, 0);
|
||||
EXPECT_EQ(response.Description, "Error processing management operation.");
|
||||
EXPECT_EQ(response.StatusCode, 500);
|
||||
EXPECT_EQ(response.Description, "Received message statusCode value is not an int.");
|
||||
management.Close();
|
||||
|
||||
mockServer.StopListening();
|
||||
@ -298,7 +302,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
|
||||
|
||||
struct ManagementEventsHandler : public ManagementClientEvents
|
||||
{
|
||||
void OnError() override { Error = true; }
|
||||
void OnError(Azure::Core::Amqp::Models::_internal::AmqpError const&) override
|
||||
{
|
||||
Error = true;
|
||||
}
|
||||
bool Error{false};
|
||||
};
|
||||
ManagementEventsHandler managementEvents;
|
||||
@ -330,8 +337,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
|
||||
|
||||
auto response = management.ExecuteOperation("Test", "Type", "Locales", messageToSend);
|
||||
EXPECT_EQ(response.Status, ManagementOperationStatus::Error);
|
||||
EXPECT_EQ(response.StatusCode, 0);
|
||||
EXPECT_EQ(response.Description, "Error processing management operation.");
|
||||
EXPECT_EQ(response.StatusCode, 500);
|
||||
EXPECT_EQ(
|
||||
response.Description, "Received message does not have a statusCode status code key.");
|
||||
EXPECT_TRUE(managementEvents.Error);
|
||||
management.Close();
|
||||
|
||||
@ -410,8 +418,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
|
||||
Azure::Core::Context::ApplicationContext.WithDeadline(
|
||||
std::chrono::system_clock::now() + std::chrono::seconds(10)));
|
||||
EXPECT_EQ(response.Status, ManagementOperationStatus::Error);
|
||||
EXPECT_EQ(response.StatusCode, 0);
|
||||
EXPECT_EQ(response.Description, "");
|
||||
EXPECT_EQ(response.StatusCode, 500);
|
||||
EXPECT_EQ(response.Description, "Unknown Request operation");
|
||||
management.Close();
|
||||
|
||||
mockServer.StopListening();
|
||||
|
||||
@ -226,13 +226,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
|
||||
receiverOptions.MessageTarget = messageTarget;
|
||||
receiverOptions.Name = name;
|
||||
receiverOptions.SettleMode = Azure::Core::Amqp::_internal::ReceiverSettleMode::First;
|
||||
receiverOptions.DynamicAddress = messageSource.GetDynamic();
|
||||
receiverOptions.EnableTrace = true;
|
||||
auto receiver = std::make_unique<MessageReceiver>(session.CreateMessageReceiver(
|
||||
newLinkInstance,
|
||||
static_cast<std::string>(messageSource.GetAddress()),
|
||||
receiverOptions,
|
||||
this));
|
||||
auto receiver = std::make_unique<MessageReceiver>(
|
||||
session.CreateMessageReceiver(newLinkInstance, messageSource, receiverOptions, this));
|
||||
GTEST_LOG_(INFO) << "Opening the message receiver.";
|
||||
receiver->Open();
|
||||
m_messageReceiverQueue.CompleteOperation(std::move(receiver));
|
||||
|
||||
@ -33,12 +33,12 @@ TEST_F(TestSourceTarget, SimpleSourceTarget)
|
||||
|
||||
{
|
||||
EXPECT_ANY_THROW(MessageSource source(AmqpValue{}));
|
||||
AmqpValue val = AmqpArray();
|
||||
AmqpValue val = static_cast<AmqpValue>(AmqpArray());
|
||||
EXPECT_ANY_THROW(MessageSource source{val});
|
||||
}
|
||||
{
|
||||
EXPECT_ANY_THROW(MessageTarget target(AmqpValue{}));
|
||||
AmqpValue val = AmqpArray();
|
||||
AmqpValue val = static_cast<AmqpValue>(AmqpArray());
|
||||
EXPECT_ANY_THROW(MessageTarget target(val));
|
||||
}
|
||||
}
|
||||
@ -79,7 +79,7 @@ TEST_F(TestSourceTarget, TargetProperties)
|
||||
|
||||
{
|
||||
MessageTargetOptions options;
|
||||
options.Capabilities.push_back(AmqpSymbol{"Test"});
|
||||
options.Capabilities.push_back(static_cast<AmqpValue>(AmqpSymbol{"Test"}));
|
||||
MessageTarget target(options);
|
||||
EXPECT_EQ(1, target.GetCapabilities().size());
|
||||
EXPECT_EQ(AmqpValueType::Symbol, target.GetCapabilities()[0].GetType());
|
||||
@ -238,7 +238,7 @@ TEST_F(TestSourceTarget, SourceProperties)
|
||||
|
||||
{
|
||||
MessageSourceOptions options;
|
||||
options.Capabilities.push_back(AmqpSymbol{"Test"});
|
||||
options.Capabilities.push_back(static_cast<AmqpValue>(AmqpSymbol{"Test"}));
|
||||
MessageSource source(options);
|
||||
EXPECT_EQ(1, source.GetCapabilities().size());
|
||||
EXPECT_EQ(AmqpValueType::Symbol, source.GetCapabilities()[0].GetType());
|
||||
@ -374,7 +374,7 @@ TEST_F(TestSourceTarget, SourceProperties)
|
||||
|
||||
{
|
||||
MessageSourceOptions options;
|
||||
options.Outcomes.push_back(AmqpSymbol("Test"));
|
||||
options.Outcomes.push_back(static_cast<AmqpValue>(AmqpSymbol("Test")));
|
||||
MessageSource source(options);
|
||||
EXPECT_EQ(1, source.GetOutcomes().size());
|
||||
EXPECT_EQ(AmqpValueType::Symbol, source.GetOutcomes().at(0).GetType());
|
||||
|
||||
@ -13,6 +13,9 @@ include(AzureBuildTargetForCI)
|
||||
include(GoogleTest)
|
||||
|
||||
find_package(uamqp CONFIG REQUIRED)
|
||||
find_package(umock_c)
|
||||
find_package(azure_macro_utils_c CONFIG REQUIRED)
|
||||
find_package(azure_c_shared_utility CONFIG REQUIRED)
|
||||
|
||||
# Unit Tests
|
||||
add_executable(azure-core-amqp-tests-uamqp
|
||||
@ -39,7 +42,7 @@ endif()
|
||||
target_include_directories(azure-core-amqp-tests-uamqp PRIVATE Azure::azure-core-amqp)
|
||||
|
||||
# Link test executable against gtest & gtest_main
|
||||
target_link_libraries(azure-core-amqp-tests-uamqp PRIVATE GTest::gtest_main Azure::azure-core-amqp uamqp)
|
||||
target_link_libraries(azure-core-amqp-tests-uamqp PRIVATE GTest::gtest_main Azure::azure-core-amqp uamqp umock_c azure_macro_utils_c)
|
||||
gtest_discover_tests( azure-core-amqp-tests-uamqp )
|
||||
|
||||
create_per_service_target_build(core azure-core-amqp-tests-uamqp)
|
||||
|
||||
@ -21,14 +21,14 @@ TEST(TracingContextFactory, ServiceTraceEnums)
|
||||
spanKind = SpanKind::Producer;
|
||||
spanKind = Azure::Core::Tracing::_internal::SpanKind::Server;
|
||||
int i = static_cast<int>(spanKind);
|
||||
i += 1;
|
||||
(void)i; // silence clang 14 warning about unused variable i.
|
||||
}
|
||||
{
|
||||
SpanStatus spanStatus = SpanStatus::Unset;
|
||||
spanStatus = SpanStatus::Error;
|
||||
spanStatus = SpanStatus::Ok;
|
||||
int i = static_cast<int>(spanStatus);
|
||||
i += 1;
|
||||
(void)i; // silence clang 14 warning about unused variable i.
|
||||
}
|
||||
Azure::Core::Tracing::_internal::CreateSpanOptions options;
|
||||
options.Kind = SpanKind::Internal;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user