diff --git a/sdk/core/azure-core-amqp/CHANGELOG.md b/sdk/core/azure-core-amqp/CHANGELOG.md index a4ccdd5ca..127c3084b 100644 --- a/sdk/core/azure-core-amqp/CHANGELOG.md +++ b/sdk/core/azure-core-amqp/CHANGELOG.md @@ -8,6 +8,8 @@ ### Bugs Fixed +- Fixed a potential deadlock where a message receiver Open call could block indefinitely when adding the Link to the Pollables. + ### Other Changes ## 1.0.0-beta.8 (2024-04-09) diff --git a/sdk/core/azure-core-amqp/src/amqp/link.cpp b/sdk/core/azure-core-amqp/src/amqp/link.cpp index b7d32a795..75614595b 100644 --- a/sdk/core/azure-core-amqp/src/amqp/link.cpp +++ b/sdk/core/azure-core-amqp/src/amqp/link.cpp @@ -466,11 +466,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { LinkImpl* link = static_cast(context); if (link->m_eventHandler) { -#if defined(BUILD_TESTING) - link->m_eventHandler->OnLinkFlowOn(Link{link->shared_from_this()}); -#else link->m_eventHandler->OnLinkFlowOn(link->shared_from_this()); -#endif } } namespace { diff --git a/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp b/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp index 2762bb5d3..05660ad79 100644 --- a/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp +++ b/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp @@ -475,46 +475,55 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { m_session, static_cast(m_source.GetAddress()), context); } - auto lock{m_session->GetConnection()->Lock()}; - - // Once we've authenticated the connection, establish the link and receiver. - // We cannot do this before authenticating the client. - if (!m_link) { - CreateLink(); - } - if (m_messageReceiver == nullptr) - { - m_messageReceiver.reset(messagereceiver_create( - *m_link, MessageReceiverImpl::OnMessageReceiverStateChangedFn, this)); - } + auto lock{m_session->GetConnection()->Lock()}; - messagereceiver_set_trace(m_messageReceiver.get(), m_options.EnableTrace); + // Once we've authenticated the connection, establish the link and receiver. + // We cannot do this before authenticating the client. + if (!m_link) + { + CreateLink(); + } + if (m_messageReceiver == nullptr) + { + m_messageReceiver.reset(messagereceiver_create( + *m_link, MessageReceiverImpl::OnMessageReceiverStateChangedFn, this)); + } - if (messagereceiver_open( - m_messageReceiver.get(), MessageReceiverImpl::OnMessageReceivedFn, this)) - { + messagereceiver_set_trace(m_messageReceiver.get(), m_options.EnableTrace); - auto err = errno; + if (messagereceiver_open( + m_messageReceiver.get(), MessageReceiverImpl::OnMessageReceivedFn, this)) + { + + auto err = errno; #if defined(AZ_PLATFORM_WINDOWS) - char buf[256]; - strerror_s(buf, sizeof(buf), err); + char buf[256]; + strerror_s(buf, sizeof(buf), err); #else - std::string buf{strerror(err)}; + std::string buf{strerror(err)}; #endif - throw std::runtime_error( - "Could not open message receiver. errno=" + std::to_string(err) + ", \"" + buf + "\"."); - } - m_receiverOpen = true; + throw std::runtime_error( + "Could not open message receiver. errno=" + std::to_string(err) + ", \"" + buf + "\"."); + } + m_receiverOpen = true; - if (m_options.EnableTrace) - { - Log::Stream(Logger::Level::Verbose) << "Opening message receiver. Start async"; + if (m_options.EnableTrace) + { + Log::Stream(Logger::Level::Verbose) << "Opening message receiver. Start async"; + } + // Mark the connection as async so that we can use the async APIs. + m_session->GetConnection()->EnableAsyncOperation(true); } - // Mark the connection as async so that we can use the async APIs. - m_session->GetConnection()->EnableAsyncOperation(true); // And add the link to the list of pollable items. + // + // Note that you *cannot* hold any connection or link locks when calling AddPollable. This is + // because the the AddPollable function attempts to lock the pollable and the RemovePollable + // function blocks until any pollables have completed while holding the pollable lock. + // + // This can result in a deadlock because the polling thread is also going to acquire the + // connection lock resulting in a deadlock. Common::_detail::GlobalStateHolder::GlobalStateInstance()->AddPollable(m_link); } diff --git a/sdk/core/azure-core-amqp/src/common/global_state.cpp b/sdk/core/azure-core-amqp/src/common/global_state.cpp index e2dfa5b5a..468fe35e7 100644 --- a/sdk/core/azure-core-amqp/src/common/global_state.cpp +++ b/sdk/core/azure-core-amqp/src/common/global_state.cpp @@ -141,6 +141,19 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace #endif } + /** + * @brief Adds a pollable object to the list of objects to be polled. + * + * @param pollable The pollable object to add. + * + * @note Note that you *cannot* hold any connection or link locks when calling AddPollable. This + * is because the the AddPollable function attempts to lock the pollable and the RemovePollable + * function blocks until any pollables have completed while holding the pollable lock. + * + * This can result in a deadlock because the polling thread is also going to acquire the + * connection lock resulting in a deadlock. + * + */ void GlobalStateHolder::AddPollable(std::shared_ptr pollable) { std::lock_guard lock(m_pollablesMutex); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/processor_test.cpp b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/processor_test.cpp index 7e3a5ebc7..05c3a6338 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/processor_test.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/processor_test.cpp @@ -144,6 +144,8 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { // Block until all the events have been processed. waitGroup.Wait(); + producerClient.Close(context); + // And wait until all the threads have completed. for (auto& thread : processEventsThreads) { @@ -212,6 +214,9 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { ProcessEventsForLoadBalancerTestSingleThreaded(producerClient, partitionClient, context); } + + producerClient.Close(context); + // Stop the processor, we're done with the test. processor.Stop(); } @@ -307,7 +312,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { } catch (std::runtime_error& ex) { - GTEST_LOG_(FATAL) << "Exception thrown sending messages" << ex.what(); + GTEST_LOG_(ERROR) << "Exception thrown sending messages" << ex.what(); } }); std::vector> allEvents; @@ -340,7 +345,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { } catch (std::runtime_error const& ex) { - GTEST_LOG_(FATAL) << "Exception thrown receiving messages." << ex.what(); + GTEST_LOG_(ERROR) << "Exception thrown receiving messages." << ex.what(); producerContext.Cancel(); } } @@ -384,7 +389,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { } catch (std::runtime_error& ex) { - GTEST_LOG_(FATAL) << "Exception thrown sending messages" << ex.what(); + GTEST_LOG_(ERROR) << "Exception thrown sending messages" << ex.what(); } std::vector> allEvents; @@ -413,7 +418,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { } catch (std::runtime_error const& ex) { - GTEST_LOG_(FATAL) << "Exception thrown receiving messages." << ex.what(); + GTEST_LOG_(ERROR) << "Exception thrown receiving messages." << ex.what(); producerContext.Cancel(); } }