diff --git a/sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/link.hpp b/sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/link.hpp index 2c25ef492..e2692f492 100644 --- a/sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/link.hpp +++ b/sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/link.hpp @@ -66,6 +66,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { }; #if defined(_azure_TESTING_BUILD) + + // Note that this entire class is a test hook to enable testing of the Link family of apis. It is + // not exposed to customers because there are no customer scenarios for it. class Link; class LinkImplEvents; class LinkImplEventsImpl; diff --git a/sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/message_receiver.hpp b/sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/message_receiver.hpp index 8e17ed225..97aed6e9b 100644 --- a/sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/message_receiver.hpp +++ b/sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/message_receiver.hpp @@ -23,6 +23,12 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { class MessageReceiverFactory; }}}} // namespace Azure::Core::Amqp::_detail +#if defined(_azure_TESTING_BUILD) +namespace Azure { namespace Core { namespace Amqp { namespace Tests { namespace MessageTests { + class MockServiceEndpoint; +}}}}} // namespace Azure::Core::Amqp::Tests::MessageTests +#endif + namespace Azure { namespace Core { namespace Amqp { namespace _internal { enum class MessageReceiverState { @@ -180,5 +186,17 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal { MessageReceiver(std::shared_ptr<_detail::MessageReceiverImpl> impl) : m_impl{impl} {} friend class _detail::MessageReceiverFactory; std::shared_ptr<_detail::MessageReceiverImpl> m_impl; + +#if _azure_TESTING_BUILD + friend class Azure::Core::Amqp::Tests::MessageTests::MockServiceEndpoint; + + // There is a deadlock associated with the link polling if it is enabled from an AMQP event + // callback. To work around this, link polling is disabled when creating a message receiver from + // an existing link endpoint. This method should be called to enable it at a time when it is + // safer to enable link polling. + + // This is a test hook and should not be used outside of test code. + void EnableLinkPolling(); +#endif }; }}}} // namespace Azure::Core::Amqp::_internal diff --git a/sdk/core/azure-core-amqp/src/amqp/link.cpp b/sdk/core/azure-core-amqp/src/amqp/link.cpp index 75614595b..c7e5e29e2 100644 --- a/sdk/core/azure-core-amqp/src/amqp/link.cpp +++ b/sdk/core/azure-core-amqp/src/amqp/link.cpp @@ -7,6 +7,7 @@ #include "../models/private/performatives/transfer_impl.hpp" #include "../models/private/value_impl.hpp" #include "azure/core/amqp/internal/common/completion_operation.hpp" +#include "azure/core/amqp/internal/common/global_state.hpp" #include "azure/core/amqp/internal/models/message_source.hpp" #include "azure/core/amqp/internal/models/message_target.hpp" #include "private/link_impl.hpp" @@ -147,7 +148,12 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { uint32_t Link::GetReceivedMessageId() const { return m_impl->GetReceivedMessageId(); } - void Link::Attach() { return m_impl->Attach(); } + void Link::Attach() + { + Azure::Core::Amqp::Common::_detail::GlobalStateHolder::GlobalStateInstance()->AddPollable( + m_impl); + return m_impl->Attach(); + } std::tuple Link::Transfer( std::vector const& payload, @@ -162,7 +168,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { std::string const& errorDescription, Models::AmqpValue const& info) { - return m_impl->Detach(close, errorCondition, errorDescription, info); + m_impl->Detach(close, errorCondition, errorDescription, info); + Azure::Core::Amqp::Common::_detail::GlobalStateHolder::GlobalStateInstance()->RemovePollable( + m_impl); } #endif // _azure_TESTING_BUILD @@ -196,6 +204,23 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { return os; } + std::ostream& operator<<(std::ostream& os, LINK_TRANSFER_RESULT transfer) + { + switch (transfer) + { + case LINK_TRANSFER_BUSY: + os << "LINK_TRANSFER_BUSY"; + break; + case LINK_TRANSFER_ERROR: + os << "LINK_TRANSFER_ERROR"; + break; + default: + os << "Unknown (" << transfer << ")"; + break; + } + return os; + } + /****/ /* LINK Implementation */ @@ -210,6 +235,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { { Models::AmqpValue sourceValue{source.AsAmqpValue()}; Models::AmqpValue targetValue(target.AsAmqpValue()); + auto connectionLock{m_session->GetConnection()->Lock()}; m_link = link_create( *session, name.c_str(), @@ -230,6 +256,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { { Models::AmqpValue sourceValue(source.AsAmqpValue()); Models::AmqpValue targetValue(target.AsAmqpValue()); + auto connectionLock{m_session->GetConnection()->Lock()}; m_link = link_create_from_endpoint( *session, LinkEndpointFactory::Release(linkEndpoint), @@ -645,7 +672,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { 0 /*timeout*/); if (asyncResult == nullptr) { - throw std::runtime_error("Could not send message"); + std::stringstream ss; + ss << "Could not send message: " << transferResult; + throw std::runtime_error(ss.str()); } } diff --git a/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp b/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp index 05660ad79..bb6c29d1a 100644 --- a/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp +++ b/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp @@ -129,6 +129,20 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal { return stream; } +#if defined(_azure_TESTING_BUILD) + void MessageReceiver::EnableLinkPolling() + { + if (m_impl) + { + m_impl->EnableLinkPolling(); + } + else + { + AZURE_ASSERT_FALSE("MessageReceiver::EnableLinkPolling called on moved message receiver."); + } + } +#endif + }}}} // namespace Azure::Core::Amqp::_internal namespace Azure { namespace Core { namespace Amqp { namespace _detail { @@ -160,6 +174,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { *m_link, MessageReceiverImpl::OnMessageReceiverStateChangedFn, this)); messagereceiver_set_trace(m_messageReceiver.get(), options.EnableTrace); + + // When creating a message receiver from a link endpoint, we don't want to enable polling on the + // link at open time (because the Open call is made with the ConnectionLock held, resulting in a + // deadlock. + // + // Instead, we'll defer the link polling until after MessageReceiver is opened and it's safe to + // do so. + m_deferLinkPolling = true; } void MessageReceiverImpl::CreateLink(LinkEndpoint& endpoint) @@ -335,6 +357,16 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { } } + void MessageReceiverImpl::EnableLinkPolling() + { + std::unique_lock lock{m_mutableState}; + if (!m_linkPollingEnabled) + { + Common::_detail::GlobalStateHolder::GlobalStateInstance()->AddPollable(m_link); + m_linkPollingEnabled = true; + } + } + MessageReceiverImpl::~MessageReceiverImpl() noexcept { auto lock{m_session->GetConnection()->Lock()}; @@ -512,6 +544,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { { Log::Stream(Logger::Level::Verbose) << "Opening message receiver. Start async"; } + // Mark the connection as async so that we can use the async APIs. m_session->GetConnection()->EnableAsyncOperation(true); } @@ -524,7 +557,11 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { // // This can result in a deadlock because the polling thread is also going to acquire the // connection lock resulting in a deadlock. - Common::_detail::GlobalStateHolder::GlobalStateInstance()->AddPollable(m_link); + // If we're not deferring link polling, enable the async operation on the connection. + if (!m_deferLinkPolling) + { + EnableLinkPolling(); + } } void MessageReceiverImpl::Close(Context const& context) @@ -541,9 +578,15 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { bool shouldWaitForClose = m_currentState == _internal::MessageReceiverState::Closing || m_currentState == _internal::MessageReceiverState::Open; - Common::_detail::GlobalStateHolder::GlobalStateInstance()->RemovePollable( - m_link); // This will ensure that the link is cleaned up on the next poll() - + { + std::unique_lock lock{m_mutableState}; + if (m_linkPollingEnabled) + { + Common::_detail::GlobalStateHolder::GlobalStateInstance()->RemovePollable( + m_link); // This will ensure that the link is cleaned up on the next poll() + m_linkPollingEnabled = false; + } + } { auto lock{m_session->GetConnection()->Lock()}; diff --git a/sdk/core/azure-core-amqp/src/amqp/message_sender.cpp b/sdk/core/azure-core-amqp/src/amqp/message_sender.cpp index c33cf651e..30fe682c0 100644 --- a/sdk/core/azure-core-amqp/src/amqp/message_sender.cpp +++ b/sdk/core/azure-core-amqp/src/amqp/message_sender.cpp @@ -392,7 +392,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { auto result = m_openQueue.WaitForResult(context); if (!result || std::get<0>(*result)) { - if (m_options.EnableTrace) { Log::Stream(Logger::Level::Verbose) << "Opening message sender. Enable async operation."; diff --git a/sdk/core/azure-core-amqp/src/amqp/private/message_receiver_impl.hpp b/sdk/core/azure-core-amqp/src/amqp/private/message_receiver_impl.hpp index 0f4c479bb..aff182f87 100644 --- a/sdk/core/azure-core-amqp/src/amqp/private/message_receiver_impl.hpp +++ b/sdk/core/azure-core-amqp/src/amqp/private/message_receiver_impl.hpp @@ -70,6 +70,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { std::pair, Models::_internal::AmqpError> TryWaitForIncomingMessage(); + void EnableLinkPolling(); + private: UniqueMessageReceiver m_messageReceiver{}; bool m_receiverOpen{false}; @@ -79,6 +81,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { std::shared_ptr<_detail::SessionImpl> m_session; Models::_internal::AmqpError m_savedMessageError{}; _internal::MessageReceiverState m_currentState{}; + bool m_deferLinkPolling{false}; + + bool m_linkPollingEnabled{false}; + std::mutex m_mutableState; Azure::Core::Amqp::Common::_internal:: AsyncOperationQueue, Models::_internal::AmqpError> diff --git a/sdk/core/azure-core-amqp/src/models/amqp_error.cpp b/sdk/core/azure-core-amqp/src/models/amqp_error.cpp index f18afc183..602b228dd 100644 --- a/sdk/core/azure-core-amqp/src/models/amqp_error.cpp +++ b/sdk/core/azure-core-amqp/src/models/amqp_error.cpp @@ -99,7 +99,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace } else { - os << "Error {null}"; + os << "Error: {No Error}"; } return os; } diff --git a/sdk/core/azure-core-amqp/test/ut/link_tests.cpp b/sdk/core/azure-core-amqp/test/ut/link_tests.cpp index fea904c26..e222c3327 100644 --- a/sdk/core/azure-core-amqp/test/ut/link_tests.cpp +++ b/sdk/core/azure-core-amqp/test/ut/link_tests.cpp @@ -3,21 +3,13 @@ #include "azure/core/amqp/internal/common/async_operation_queue.hpp" #include "azure/core/amqp/internal/connection.hpp" -#include "azure/core/amqp/internal/message_receiver.hpp" -#include "azure/core/amqp/internal/message_sender.hpp" -#include "azure/core/amqp/internal/models/messaging_values.hpp" #include "azure/core/amqp/internal/models/performatives/amqp_transfer.hpp" #include "azure/core/amqp/internal/network/amqp_header_detect_transport.hpp" #include "azure/core/amqp/internal/network/socket_listener.hpp" -#include "azure/core/amqp/internal/network/socket_transport.hpp" #include "azure/core/amqp/internal/session.hpp" #include "mock_amqp_server.hpp" -#include - -#include #include -#include #include @@ -329,6 +321,16 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { return std::get<0>(*result); } + void WaitForLinkState(LinkState state, Azure::Core::Context const& context) + { + LinkState result; + do + { + result = WaitForLink(context); + GTEST_LOG_(INFO) << "Link state changed to: " << result; + } while (result != state); + } + private: void OnLinkFlowOn(Link const& link) override { @@ -357,7 +359,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { Azure::Core::Amqp::Common::_internal::AsyncOperationQueue m_linkStateQueue; }; - + Azure::Core::Context timeoutContext = Azure::Core::Context::ApplicationContext.WithDeadline( + Azure::DateTime::clock::now() + std::chrono::seconds(60)); Link keepAliveLink{ session, "KeepConnectionAlive", SessionRole::Receiver, "MyTarget", "TestReceiver"}; keepAliveLink.Attach(); @@ -367,49 +370,37 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { Link link(session, "MySession", SessionRole::Sender, "MySource", "MyTarget", &linkEvents); link.Attach(); - LinkState linkState; - // Iterate until the state changes to Attached. - do - { - linkState = linkEvents.WaitForLink({}); - } while (linkState != LinkState::Attached); + linkEvents.WaitForLinkState(LinkState::Attached, timeoutContext); Models::AmqpMessage message; message.SetBody("Hello"); - link.Transfer(Models::AmqpMessage::Serialize(message), {}); + link.Transfer(Models::AmqpMessage::Serialize(message), timeoutContext); + + Azure::Core::Amqp::Models::AmqpValue data; + + link.Detach(true, {}, {}, data); + // Iterate until the state changes to Detached. + linkEvents.WaitForLinkState(LinkState::Detached, timeoutContext); + } + + { + ClientLinkEvents linkEvents; + Link link(session, "MySession2", SessionRole::Sender, "MySource", "MyTarget", &linkEvents); + + link.Attach(); + + // Iterate until the state changes to Attached. + linkEvents.WaitForLinkState(LinkState::Attached, timeoutContext); Azure::Core::Amqp::Models::AmqpValue data; link.Detach(true, {}, {}, data); // Iterate until the state changes to Detached. - do - { - linkState = linkEvents.WaitForLink({}); - } while (linkState != LinkState::Detached); + linkEvents.WaitForLinkState(LinkState::Detached, timeoutContext); } - { - ClientLinkEvents linkEvents; - Link link(session, "MySession2", SessionRole::Sender, "MySource", "MyTarget", &linkEvents); - link.Attach(); - LinkState linkState; - - // Iterate until the state changes to Attached. - do - { - linkState = linkEvents.WaitForLink({}); - } while (linkState != LinkState::Attached); - - Azure::Core::Amqp::Models::AmqpValue data; - link.Detach(true, {}, {}, data); - // Iterate until the state changes to Attached. - do - { - linkState = linkEvents.WaitForLink({}); - } while (linkState != LinkState::Detached); - } { constexpr const size_t linkCount = 20; @@ -417,7 +408,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { std::vector> linkEvents; for (size_t i = 0; i < linkCount; i += 1) { - // Create 100 links on the session. + // Create linkCount links on the session. linkEvents.push_back(std::make_unique()); links.push_back(Link{ session, @@ -429,24 +420,16 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { } for (size_t i = 0; i < linkCount; i += 1) { - links[i].Attach(); // Iterate until the state changes to Attached. - LinkState linkState; - - // Wait for the links to attach. - do - { - linkState = linkEvents[i]->WaitForLink({}); - } while (linkState != LinkState::Attached); + links[i].Attach(); + // Iterate until the state changes to Attached. + linkEvents[i]->WaitForLinkState(LinkState::Attached, timeoutContext); } + for (size_t i = 0; i < linkCount; i += 1) { links[i].Detach(true, "", "", Models::AmqpValue{}); // Iterate until the state changes to Detached. - LinkState linkState; - do - { - linkState = linkEvents[i]->WaitForLink({}); - } while (linkState != LinkState::Detached); + linkEvents[i]->WaitForLinkState(LinkState::Detached, timeoutContext); } } diff --git a/sdk/core/azure-core-amqp/test/ut/mock_amqp_server.hpp b/sdk/core/azure-core-amqp/test/ut/mock_amqp_server.hpp index 7eae08146..99304fdd3 100644 --- a/sdk/core/azure-core-amqp/test/ut/mock_amqp_server.hpp +++ b/sdk/core/azure-core-amqp/test/ut/mock_amqp_server.hpp @@ -5,12 +5,10 @@ #include "azure/core/amqp/internal/models/amqp_error.hpp" #include "azure/core/amqp/models/amqp_message.hpp" -#include #include #include #include #include -#include #include #include #include @@ -91,7 +89,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { senderOptions.InitialDeliveryCount = 0; m_sender[linkName] = std::make_unique( session.CreateMessageSender(linkEndpoint, target, senderOptions, this)); - (void)!m_sender[linkName]->HalfOpen(); + // NOTE: The linkEndpoint needs to be attached before this function returns in order to + // correctly process incoming attach requests. Otherwise, the attach request will be + // discarded, and the link will be in a half attached state. + (void)!m_sender[linkName]->HalfOpen(m_listenerContext); } else { @@ -117,8 +118,20 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { receiverOptions.InitialDeliveryCount = 0; m_receiver[linkName] = std::make_unique( session.CreateMessageReceiver(linkEndpoint, source, receiverOptions, this)); - GTEST_LOG_(INFO) << "Open new message receiver."; - m_receiver[linkName]->Open(); + + // NOTE: The linkEndpoint needs to be attached before this function returns in order to + // correctly process incoming attach requests. Otherwise, the attach request will be + // discarded, and the link will be in a half attached state. + + // Note that there is a potential deadlock when opening the message receiver - the + // connection lock cannot be held when link polling is enabled on the incoming link. + // This is because the link polling will try to acquire the connection lock, which is + // already held by the current thread. To avoid this deadlock, we defer enabling link + // polling until later when it is safe to do so. + m_receiver[linkName]->Open(m_listenerContext); + + // Now that the receiver is open, we can enable link polling from the message loop. + m_receiverPollingEnableQueue.CompleteOperation(linkName); } else { @@ -247,6 +260,15 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { Azure::Core::Context& GetListenerContext() { return m_listenerContext; } + Azure::Core::Amqp::Models::AmqpValue OnMessageReceived( + Azure::Core::Amqp::_internal::MessageReceiver const& receiver, + std::shared_ptr const& message) override + { + GTEST_LOG_(INFO) << "MockServiceEndpoint(" << m_name << ") Received a message " << *message; + m_messageQueue.CompleteOperation(receiver.GetLinkName(), message); + return Azure::Core::Amqp::Models::_internal::Messaging::DeliveryAccepted(); + } + private: Azure::Core::Context m_listenerContext; // Used to cancel the listener if necessary. bool m_enableTrace{true}; @@ -256,6 +278,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { std::map> m_receiver; + Azure::Core::Amqp::Common::_internal::AsyncOperationQueue + m_receiverPollingEnableQueue; Azure::Core::Amqp::Common::_internal:: AsyncOperationQueue> m_messageQueue; @@ -283,6 +307,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { MessageReceived(std::get<0>(*message), std::get<1>(*message)); } + auto senderDisconnected = m_messageSenderDisconnectedQueue.TryWaitForResult(); if (senderDisconnected) { @@ -305,6 +330,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { receiver->Close(m_listenerContext); } + auto receiverPollingEnable = m_receiverPollingEnableQueue.TryWaitForResult(); + if (receiverPollingEnable) + { + std::string receiverName = std::get<0>(*receiverPollingEnable); + GTEST_LOG_(INFO) << "Enable link polling for receiver: " << receiverName; + m_receiver[receiverName]->EnableLinkPolling(); + } + if (m_receiver.empty() && m_sender.empty()) { GTEST_LOG_(INFO) << "No more links, exiting message loop."; @@ -326,17 +359,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { << " New state: " << newState; } - protected: - Azure::Core::Amqp::Models::AmqpValue OnMessageReceived( - Azure::Core::Amqp::_internal::MessageReceiver const& receiver, - std::shared_ptr const& message) override - { - GTEST_LOG_(INFO) << "MockServiceEndpoint(" << m_name << ") Received a message " << *message; - m_messageQueue.CompleteOperation(receiver.GetLinkName(), message); - return Azure::Core::Amqp::Models::_internal::Messaging::DeliveryAccepted(); - } - - private: virtual void OnMessageReceiverDisconnected( Azure::Core::Amqp::_internal::MessageReceiver const& receiver, Azure::Core::Amqp::Models::_internal::AmqpError const& error) override @@ -554,10 +576,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { m_serviceEndpoints.push_back(endpoint); } - private: - std::vector> m_serviceEndpoints; - - public: uint16_t GetPort() const { return m_testPort; } Azure::Core::Context& GetListenerContext() { return m_listenerContext; } @@ -649,29 +667,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { void EnableTrace(bool enableTrace) { m_enableTrace = enableTrace; } - private: - // The set of incoming connections, used when tearing down the mock server. - std::list> m_connections; - - // The set of sessions. - std::list> m_sessions; - - // bool m_connectionValid{false}; - bool m_enableTrace{true}; - bool m_listening{false}; - - std::string m_connectionId; - std::thread m_serverThread; - std::uint16_t m_testPort; - protected: - // For each incoming message source, we create a queue of messages intended for that - // message source. - // - // Each message queue is keyed by the message-id. - // std::map < std::string, MessageLinkComponents> m_linkMessageQueues; - Azure::Core::Context m_listenerContext; // Used to cancel the listener if necessary. - // Inherited from SocketListenerEvents virtual void OnSocketAccepted( std::shared_ptr transport) override @@ -764,6 +760,24 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { GTEST_LOG_(INFO) << "Unknown endpoint name: " << endpointName; return false; } + + private: + std::vector> m_serviceEndpoints; + + // The set of incoming connections, used when tearing down the mock server. + std::list> m_connections; + + // The set of sessions. + std::list> m_sessions; + + bool m_enableTrace{true}; + bool m_listening{false}; + + std::string m_connectionId; + std::thread m_serverThread; + std::uint16_t m_testPort; + + Azure::Core::Context m_listenerContext; // Used to cancel the listener if necessary. }; } // namespace MessageTests }}}} // namespace Azure::Core::Amqp::Tests diff --git a/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/inc/azure_uamqp_c/connection.h b/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/inc/azure_uamqp_c/connection.h index 310797c7c..6138ffde9 100644 --- a/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/inc/azure_uamqp_c/connection.h +++ b/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/inc/azure_uamqp_c/connection.h @@ -92,6 +92,7 @@ extern "C" { MOCKABLE_FUNCTION(, int, connection_get_properties, CONNECTION_HANDLE, connection, fields*, properties); MOCKABLE_FUNCTION(, int, connection_get_remote_max_frame_size, CONNECTION_HANDLE, connection, uint32_t*, remote_max_frame_size); MOCKABLE_FUNCTION(, int, connection_set_remote_idle_timeout_empty_frame_send_ratio, CONNECTION_HANDLE, connection, double, idle_timeout_empty_frame_send_ratio); + MOCKABLE_FUNCTION(, char*, connection_get_container_id, CONNECTION_HANDLE, connection); MOCKABLE_FUNCTION(, uint64_t, connection_handle_deadlines, CONNECTION_HANDLE, connection); MOCKABLE_FUNCTION(, void, connection_dowork, CONNECTION_HANDLE, connection); MOCKABLE_FUNCTION(, ENDPOINT_HANDLE, connection_create_endpoint, CONNECTION_HANDLE, connection); diff --git a/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/inc/azure_uamqp_c/session.h b/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/inc/azure_uamqp_c/session.h index e154d0ede..a38321eff 100644 --- a/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/inc/azure_uamqp_c/session.h +++ b/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/inc/azure_uamqp_c/session.h @@ -83,7 +83,7 @@ MU_DEFINE_ENUM(SESSION_SEND_TRANSFER_RESULT, SESSION_SEND_TRANSFER_RESULT_VALUES MOCKABLE_FUNCTION(, int, session_send_disposition, LINK_ENDPOINT_HANDLE, link_endpoint, DISPOSITION_HANDLE, disposition); MOCKABLE_FUNCTION(, int, session_send_detach, LINK_ENDPOINT_HANDLE, link_endpoint, DETACH_HANDLE, detach); MOCKABLE_FUNCTION(, SESSION_SEND_TRANSFER_RESULT, session_send_transfer, LINK_ENDPOINT_HANDLE, link_endpoint, TRANSFER_HANDLE, transfer, PAYLOAD*, payloads, size_t, payload_count, delivery_number*, delivery_id, ON_SEND_COMPLETE, on_send_complete, void*, callback_context); - + MOCKABLE_FUNCTION(, CONNECTION_HANDLE, session_get_connection, SESSION_HANDLE, session); #ifdef __cplusplus } #endif /* __cplusplus */ diff --git a/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/src/connection.c b/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/src/connection.c index 7feefd765..a25a3d99d 100644 --- a/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/src/connection.c +++ b/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/src/connection.c @@ -2481,6 +2481,23 @@ int connection_set_remote_idle_timeout_empty_frame_send_ratio(CONNECTION_HANDLE return result; } +char *connection_get_container_id(CONNECTION_HANDLE connection) +{ + char* result; + + if (connection == NULL) + { + LogError("NULL connection"); + result = NULL; + } + else + { + result = connection->container_id; + } + + return result; +} + ON_CONNECTION_CLOSED_EVENT_SUBSCRIPTION_HANDLE connection_subscribe_on_connection_close_received(CONNECTION_HANDLE connection, ON_CONNECTION_CLOSE_RECEIVED on_connection_close_received, void* context) { ON_CONNECTION_CLOSED_EVENT_SUBSCRIPTION_HANDLE result; diff --git a/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/src/link.c b/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/src/link.c index f867911c4..7b0d0e0d6 100644 --- a/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/src/link.c +++ b/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/src/link.c @@ -753,6 +753,7 @@ LINK_HANDLE link_create(SESSION_HANDLE session, const char* name, role role, AMQ result->target = amqpvalue_clone(target); result->session = session; result->handle = 0; + result->name = NULL; result->snd_settle_mode = sender_settle_mode_unsettled; result->rcv_settle_mode = receiver_settle_mode_first; result->delivery_count = 0; @@ -801,11 +802,12 @@ LINK_HANDLE link_create(SESSION_HANDLE session, const char* name, role role, AMQ } else { + (void)memcpy(result->name, name, name_length + 1); + result->on_link_state_changed = NULL; result->callback_context = NULL; set_link_state(result, LINK_STATE_DETACHED); - (void)memcpy(result->name, name, name_length + 1); result->link_endpoint = session_create_link_endpoint(session, name); if (result->link_endpoint == NULL) { @@ -906,6 +908,11 @@ LINK_HANDLE link_create_from_endpoint(SESSION_HANDLE session, LINK_ENDPOINT_HAND result->on_link_state_changed = NULL; result->callback_context = NULL; result->link_endpoint = link_endpoint; + + // This ensures link.c gets notified if the link endpoint is destroyed + // by uamqp (due to a DETACH from the hub, e.g.) to prevent a double free. + session_set_link_endpoint_callback( + result->link_endpoint, on_link_endpoint_destroyed, result); } } } diff --git a/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/src/session.c b/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/src/session.c index c35d21c3c..ab1d0802f 100644 --- a/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/src/session.c +++ b/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/src/session.c @@ -1027,6 +1027,25 @@ int session_end(SESSION_HANDLE session, const char* condition_value, const char* return result; } +CONNECTION_HANDLE session_get_connection(SESSION_HANDLE session) +{ + CONNECTION_HANDLE result; + + if (session == NULL) + { + result = NULL; + } + else + { + SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)session; + + result = session_instance->connection; + } + + return result; + +} + int session_set_incoming_window(SESSION_HANDLE session, uint32_t incoming_window) { int result;