Fixed test deadlock in processor test (#5513)

* Fixed test deadlock in processor test

* Updated changelog; Changed fatal to error in test logs

* Update sdk/core/azure-core-amqp/CHANGELOG.md

Co-authored-by: Anton Kolesnyk <41349689+antkmsft@users.noreply.github.com>

---------

Co-authored-by: Anton Kolesnyk <41349689+antkmsft@users.noreply.github.com>
This commit is contained in:
Larry Osterman 2024-04-12 13:34:20 -07:00 committed by GitHub
parent e2f0e0a50b
commit 3f6543d2a9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 62 additions and 37 deletions

View File

@ -8,6 +8,8 @@
### Bugs Fixed
- Fixed a potential deadlock where a message receiver Open call could block indefinitely when adding the Link to the Pollables.
### Other Changes
## 1.0.0-beta.8 (2024-04-09)

View File

@ -466,11 +466,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
LinkImpl* link = static_cast<LinkImpl*>(context);
if (link->m_eventHandler)
{
#if defined(BUILD_TESTING)
link->m_eventHandler->OnLinkFlowOn(Link{link->shared_from_this()});
#else
link->m_eventHandler->OnLinkFlowOn(link->shared_from_this());
#endif
}
}
namespace {

View File

@ -475,46 +475,55 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
m_session, static_cast<std::string>(m_source.GetAddress()), context);
}
auto lock{m_session->GetConnection()->Lock()};
// Once we've authenticated the connection, establish the link and receiver.
// We cannot do this before authenticating the client.
if (!m_link)
{
CreateLink();
}
if (m_messageReceiver == nullptr)
{
m_messageReceiver.reset(messagereceiver_create(
*m_link, MessageReceiverImpl::OnMessageReceiverStateChangedFn, this));
}
auto lock{m_session->GetConnection()->Lock()};
messagereceiver_set_trace(m_messageReceiver.get(), m_options.EnableTrace);
// Once we've authenticated the connection, establish the link and receiver.
// We cannot do this before authenticating the client.
if (!m_link)
{
CreateLink();
}
if (m_messageReceiver == nullptr)
{
m_messageReceiver.reset(messagereceiver_create(
*m_link, MessageReceiverImpl::OnMessageReceiverStateChangedFn, this));
}
if (messagereceiver_open(
m_messageReceiver.get(), MessageReceiverImpl::OnMessageReceivedFn, this))
{
messagereceiver_set_trace(m_messageReceiver.get(), m_options.EnableTrace);
auto err = errno;
if (messagereceiver_open(
m_messageReceiver.get(), MessageReceiverImpl::OnMessageReceivedFn, this))
{
auto err = errno;
#if defined(AZ_PLATFORM_WINDOWS)
char buf[256];
strerror_s(buf, sizeof(buf), err);
char buf[256];
strerror_s(buf, sizeof(buf), err);
#else
std::string buf{strerror(err)};
std::string buf{strerror(err)};
#endif
throw std::runtime_error(
"Could not open message receiver. errno=" + std::to_string(err) + ", \"" + buf + "\".");
}
m_receiverOpen = true;
throw std::runtime_error(
"Could not open message receiver. errno=" + std::to_string(err) + ", \"" + buf + "\".");
}
m_receiverOpen = true;
if (m_options.EnableTrace)
{
Log::Stream(Logger::Level::Verbose) << "Opening message receiver. Start async";
if (m_options.EnableTrace)
{
Log::Stream(Logger::Level::Verbose) << "Opening message receiver. Start async";
}
// Mark the connection as async so that we can use the async APIs.
m_session->GetConnection()->EnableAsyncOperation(true);
}
// Mark the connection as async so that we can use the async APIs.
m_session->GetConnection()->EnableAsyncOperation(true);
// And add the link to the list of pollable items.
//
// Note that you *cannot* hold any connection or link locks when calling AddPollable. This is
// because the the AddPollable function attempts to lock the pollable and the RemovePollable
// function blocks until any pollables have completed while holding the pollable lock.
//
// This can result in a deadlock because the polling thread is also going to acquire the
// connection lock resulting in a deadlock.
Common::_detail::GlobalStateHolder::GlobalStateInstance()->AddPollable(m_link);
}

View File

@ -141,6 +141,19 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace
#endif
}
/**
* @brief Adds a pollable object to the list of objects to be polled.
*
* @param pollable The pollable object to add.
*
* @note Note that you *cannot* hold any connection or link locks when calling AddPollable. This
* is because the the AddPollable function attempts to lock the pollable and the RemovePollable
* function blocks until any pollables have completed while holding the pollable lock.
*
* This can result in a deadlock because the polling thread is also going to acquire the
* connection lock resulting in a deadlock.
*
*/
void GlobalStateHolder::AddPollable(std::shared_ptr<Pollable> pollable)
{
std::lock_guard<std::mutex> lock(m_pollablesMutex);

View File

@ -144,6 +144,8 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
// Block until all the events have been processed.
waitGroup.Wait();
producerClient.Close(context);
// And wait until all the threads have completed.
for (auto& thread : processEventsThreads)
{
@ -212,6 +214,9 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
ProcessEventsForLoadBalancerTestSingleThreaded(producerClient, partitionClient, context);
}
producerClient.Close(context);
// Stop the processor, we're done with the test.
processor.Stop();
}
@ -307,7 +312,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
}
catch (std::runtime_error& ex)
{
GTEST_LOG_(FATAL) << "Exception thrown sending messages" << ex.what();
GTEST_LOG_(ERROR) << "Exception thrown sending messages" << ex.what();
}
});
std::vector<std::shared_ptr<const Models::ReceivedEventData>> allEvents;
@ -340,7 +345,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
}
catch (std::runtime_error const& ex)
{
GTEST_LOG_(FATAL) << "Exception thrown receiving messages." << ex.what();
GTEST_LOG_(ERROR) << "Exception thrown receiving messages." << ex.what();
producerContext.Cancel();
}
}
@ -384,7 +389,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
}
catch (std::runtime_error& ex)
{
GTEST_LOG_(FATAL) << "Exception thrown sending messages" << ex.what();
GTEST_LOG_(ERROR) << "Exception thrown sending messages" << ex.what();
}
std::vector<std::shared_ptr<const Models::ReceivedEventData>> allEvents;
@ -413,7 +418,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
}
catch (std::runtime_error const& ex)
{
GTEST_LOG_(FATAL) << "Exception thrown receiving messages." << ex.what();
GTEST_LOG_(ERROR) << "Exception thrown receiving messages." << ex.what();
producerContext.Cancel();
}
}