Fixed #5536, fixed a test deadlock discovered while testing the fix for 5536, added some diagnostics for uAMQP to help track the problem. (#5651)

* Fixed 5536, cleaned up a bunch of test bugs found during testing of 5536 fix
This commit is contained in:
Larry Osterman 2024-05-22 11:39:03 -07:00 committed by GitHub
parent abd34abacf
commit 4fcf09d4ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 246 additions and 107 deletions

View File

@ -66,6 +66,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
};
#if defined(_azure_TESTING_BUILD)
// Note that this entire class is a test hook to enable testing of the Link family of apis. It is
// not exposed to customers because there are no customer scenarios for it.
class Link;
class LinkImplEvents;
class LinkImplEventsImpl;

View File

@ -23,6 +23,12 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
class MessageReceiverFactory;
}}}} // namespace Azure::Core::Amqp::_detail
#if defined(_azure_TESTING_BUILD)
namespace Azure { namespace Core { namespace Amqp { namespace Tests { namespace MessageTests {
class MockServiceEndpoint;
}}}}} // namespace Azure::Core::Amqp::Tests::MessageTests
#endif
namespace Azure { namespace Core { namespace Amqp { namespace _internal {
enum class MessageReceiverState
{
@ -180,5 +186,17 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
MessageReceiver(std::shared_ptr<_detail::MessageReceiverImpl> impl) : m_impl{impl} {}
friend class _detail::MessageReceiverFactory;
std::shared_ptr<_detail::MessageReceiverImpl> m_impl;
#if _azure_TESTING_BUILD
friend class Azure::Core::Amqp::Tests::MessageTests::MockServiceEndpoint;
// There is a deadlock associated with the link polling if it is enabled from an AMQP event
// callback. To work around this, link polling is disabled when creating a message receiver from
// an existing link endpoint. This method should be called to enable it at a time when it is
// safer to enable link polling.
// This is a test hook and should not be used outside of test code.
void EnableLinkPolling();
#endif
};
}}}} // namespace Azure::Core::Amqp::_internal

View File

@ -7,6 +7,7 @@
#include "../models/private/performatives/transfer_impl.hpp"
#include "../models/private/value_impl.hpp"
#include "azure/core/amqp/internal/common/completion_operation.hpp"
#include "azure/core/amqp/internal/common/global_state.hpp"
#include "azure/core/amqp/internal/models/message_source.hpp"
#include "azure/core/amqp/internal/models/message_target.hpp"
#include "private/link_impl.hpp"
@ -147,7 +148,12 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
uint32_t Link::GetReceivedMessageId() const { return m_impl->GetReceivedMessageId(); }
void Link::Attach() { return m_impl->Attach(); }
void Link::Attach()
{
Azure::Core::Amqp::Common::_detail::GlobalStateHolder::GlobalStateInstance()->AddPollable(
m_impl);
return m_impl->Attach();
}
std::tuple<uint32_t, LinkDeliverySettleReason, Models::AmqpValue> Link::Transfer(
std::vector<uint8_t> const& payload,
@ -162,7 +168,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
std::string const& errorDescription,
Models::AmqpValue const& info)
{
return m_impl->Detach(close, errorCondition, errorDescription, info);
m_impl->Detach(close, errorCondition, errorDescription, info);
Azure::Core::Amqp::Common::_detail::GlobalStateHolder::GlobalStateInstance()->RemovePollable(
m_impl);
}
#endif // _azure_TESTING_BUILD
@ -196,6 +204,23 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
return os;
}
std::ostream& operator<<(std::ostream& os, LINK_TRANSFER_RESULT transfer)
{
switch (transfer)
{
case LINK_TRANSFER_BUSY:
os << "LINK_TRANSFER_BUSY";
break;
case LINK_TRANSFER_ERROR:
os << "LINK_TRANSFER_ERROR";
break;
default:
os << "Unknown (" << transfer << ")";
break;
}
return os;
}
/****/
/* LINK Implementation */
@ -210,6 +235,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
{
Models::AmqpValue sourceValue{source.AsAmqpValue()};
Models::AmqpValue targetValue(target.AsAmqpValue());
auto connectionLock{m_session->GetConnection()->Lock()};
m_link = link_create(
*session,
name.c_str(),
@ -230,6 +256,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
{
Models::AmqpValue sourceValue(source.AsAmqpValue());
Models::AmqpValue targetValue(target.AsAmqpValue());
auto connectionLock{m_session->GetConnection()->Lock()};
m_link = link_create_from_endpoint(
*session,
LinkEndpointFactory::Release(linkEndpoint),
@ -645,7 +672,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
0 /*timeout*/);
if (asyncResult == nullptr)
{
throw std::runtime_error("Could not send message");
std::stringstream ss;
ss << "Could not send message: " << transferResult;
throw std::runtime_error(ss.str());
}
}

View File

@ -129,6 +129,20 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
return stream;
}
#if defined(_azure_TESTING_BUILD)
void MessageReceiver::EnableLinkPolling()
{
if (m_impl)
{
m_impl->EnableLinkPolling();
}
else
{
AZURE_ASSERT_FALSE("MessageReceiver::EnableLinkPolling called on moved message receiver.");
}
}
#endif
}}}} // namespace Azure::Core::Amqp::_internal
namespace Azure { namespace Core { namespace Amqp { namespace _detail {
@ -160,6 +174,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
*m_link, MessageReceiverImpl::OnMessageReceiverStateChangedFn, this));
messagereceiver_set_trace(m_messageReceiver.get(), options.EnableTrace);
// When creating a message receiver from a link endpoint, we don't want to enable polling on the
// link at open time (because the Open call is made with the ConnectionLock held, resulting in a
// deadlock.
//
// Instead, we'll defer the link polling until after MessageReceiver is opened and it's safe to
// do so.
m_deferLinkPolling = true;
}
void MessageReceiverImpl::CreateLink(LinkEndpoint& endpoint)
@ -335,6 +357,16 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
}
}
void MessageReceiverImpl::EnableLinkPolling()
{
std::unique_lock<std::mutex> lock{m_mutableState};
if (!m_linkPollingEnabled)
{
Common::_detail::GlobalStateHolder::GlobalStateInstance()->AddPollable(m_link);
m_linkPollingEnabled = true;
}
}
MessageReceiverImpl::~MessageReceiverImpl() noexcept
{
auto lock{m_session->GetConnection()->Lock()};
@ -512,6 +544,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
{
Log::Stream(Logger::Level::Verbose) << "Opening message receiver. Start async";
}
// Mark the connection as async so that we can use the async APIs.
m_session->GetConnection()->EnableAsyncOperation(true);
}
@ -524,7 +557,11 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
//
// This can result in a deadlock because the polling thread is also going to acquire the
// connection lock resulting in a deadlock.
Common::_detail::GlobalStateHolder::GlobalStateInstance()->AddPollable(m_link);
// If we're not deferring link polling, enable the async operation on the connection.
if (!m_deferLinkPolling)
{
EnableLinkPolling();
}
}
void MessageReceiverImpl::Close(Context const& context)
@ -541,9 +578,15 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
bool shouldWaitForClose = m_currentState == _internal::MessageReceiverState::Closing
|| m_currentState == _internal::MessageReceiverState::Open;
Common::_detail::GlobalStateHolder::GlobalStateInstance()->RemovePollable(
m_link); // This will ensure that the link is cleaned up on the next poll()
{
std::unique_lock<std::mutex> lock{m_mutableState};
if (m_linkPollingEnabled)
{
Common::_detail::GlobalStateHolder::GlobalStateInstance()->RemovePollable(
m_link); // This will ensure that the link is cleaned up on the next poll()
m_linkPollingEnabled = false;
}
}
{
auto lock{m_session->GetConnection()->Lock()};

View File

@ -392,7 +392,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
auto result = m_openQueue.WaitForResult(context);
if (!result || std::get<0>(*result))
{
if (m_options.EnableTrace)
{
Log::Stream(Logger::Level::Verbose) << "Opening message sender. Enable async operation.";

View File

@ -70,6 +70,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
std::pair<std::shared_ptr<Models::AmqpMessage>, Models::_internal::AmqpError>
TryWaitForIncomingMessage();
void EnableLinkPolling();
private:
UniqueMessageReceiver m_messageReceiver{};
bool m_receiverOpen{false};
@ -79,6 +81,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
std::shared_ptr<_detail::SessionImpl> m_session;
Models::_internal::AmqpError m_savedMessageError{};
_internal::MessageReceiverState m_currentState{};
bool m_deferLinkPolling{false};
bool m_linkPollingEnabled{false};
std::mutex m_mutableState;
Azure::Core::Amqp::Common::_internal::
AsyncOperationQueue<std::shared_ptr<Models::AmqpMessage>, Models::_internal::AmqpError>

View File

@ -99,7 +99,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
}
else
{
os << "Error {null}";
os << "Error: {No Error}";
}
return os;
}

View File

@ -3,21 +3,13 @@
#include "azure/core/amqp/internal/common/async_operation_queue.hpp"
#include "azure/core/amqp/internal/connection.hpp"
#include "azure/core/amqp/internal/message_receiver.hpp"
#include "azure/core/amqp/internal/message_sender.hpp"
#include "azure/core/amqp/internal/models/messaging_values.hpp"
#include "azure/core/amqp/internal/models/performatives/amqp_transfer.hpp"
#include "azure/core/amqp/internal/network/amqp_header_detect_transport.hpp"
#include "azure/core/amqp/internal/network/socket_listener.hpp"
#include "azure/core/amqp/internal/network/socket_transport.hpp"
#include "azure/core/amqp/internal/session.hpp"
#include "mock_amqp_server.hpp"
#include <azure/core/platform.hpp>
#include <functional>
#include <memory>
#include <random>
#include <gtest/gtest.h>
@ -329,6 +321,16 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
return std::get<0>(*result);
}
void WaitForLinkState(LinkState state, Azure::Core::Context const& context)
{
LinkState result;
do
{
result = WaitForLink(context);
GTEST_LOG_(INFO) << "Link state changed to: " << result;
} while (result != state);
}
private:
void OnLinkFlowOn(Link const& link) override
{
@ -357,7 +359,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
Azure::Core::Amqp::Common::_internal::AsyncOperationQueue<LinkState> m_linkStateQueue;
};
Azure::Core::Context timeoutContext = Azure::Core::Context::ApplicationContext.WithDeadline(
Azure::DateTime::clock::now() + std::chrono::seconds(60));
Link keepAliveLink{
session, "KeepConnectionAlive", SessionRole::Receiver, "MyTarget", "TestReceiver"};
keepAliveLink.Attach();
@ -367,49 +370,37 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
Link link(session, "MySession", SessionRole::Sender, "MySource", "MyTarget", &linkEvents);
link.Attach();
LinkState linkState;
// Iterate until the state changes to Attached.
do
{
linkState = linkEvents.WaitForLink({});
} while (linkState != LinkState::Attached);
linkEvents.WaitForLinkState(LinkState::Attached, timeoutContext);
Models::AmqpMessage message;
message.SetBody("Hello");
link.Transfer(Models::AmqpMessage::Serialize(message), {});
link.Transfer(Models::AmqpMessage::Serialize(message), timeoutContext);
Azure::Core::Amqp::Models::AmqpValue data;
link.Detach(true, {}, {}, data);
// Iterate until the state changes to Detached.
linkEvents.WaitForLinkState(LinkState::Detached, timeoutContext);
}
{
ClientLinkEvents linkEvents;
Link link(session, "MySession2", SessionRole::Sender, "MySource", "MyTarget", &linkEvents);
link.Attach();
// Iterate until the state changes to Attached.
linkEvents.WaitForLinkState(LinkState::Attached, timeoutContext);
Azure::Core::Amqp::Models::AmqpValue data;
link.Detach(true, {}, {}, data);
// Iterate until the state changes to Detached.
do
{
linkState = linkEvents.WaitForLink({});
} while (linkState != LinkState::Detached);
linkEvents.WaitForLinkState(LinkState::Detached, timeoutContext);
}
{
ClientLinkEvents linkEvents;
Link link(session, "MySession2", SessionRole::Sender, "MySource", "MyTarget", &linkEvents);
link.Attach();
LinkState linkState;
// Iterate until the state changes to Attached.
do
{
linkState = linkEvents.WaitForLink({});
} while (linkState != LinkState::Attached);
Azure::Core::Amqp::Models::AmqpValue data;
link.Detach(true, {}, {}, data);
// Iterate until the state changes to Attached.
do
{
linkState = linkEvents.WaitForLink({});
} while (linkState != LinkState::Detached);
}
{
constexpr const size_t linkCount = 20;
@ -417,7 +408,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
std::vector<std::unique_ptr<ClientLinkEvents>> linkEvents;
for (size_t i = 0; i < linkCount; i += 1)
{
// Create 100 links on the session.
// Create linkCount links on the session.
linkEvents.push_back(std::make_unique<ClientLinkEvents>());
links.push_back(Link{
session,
@ -429,24 +420,16 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
}
for (size_t i = 0; i < linkCount; i += 1)
{
links[i].Attach(); // Iterate until the state changes to Attached.
LinkState linkState;
// Wait for the links to attach.
do
{
linkState = linkEvents[i]->WaitForLink({});
} while (linkState != LinkState::Attached);
links[i].Attach();
// Iterate until the state changes to Attached.
linkEvents[i]->WaitForLinkState(LinkState::Attached, timeoutContext);
}
for (size_t i = 0; i < linkCount; i += 1)
{
links[i].Detach(true, "", "", Models::AmqpValue{});
// Iterate until the state changes to Detached.
LinkState linkState;
do
{
linkState = linkEvents[i]->WaitForLink({});
} while (linkState != LinkState::Detached);
linkEvents[i]->WaitForLinkState(LinkState::Detached, timeoutContext);
}
}

View File

@ -5,12 +5,10 @@
#include "azure/core/amqp/internal/models/amqp_error.hpp"
#include "azure/core/amqp/models/amqp_message.hpp"
#include <azure/core/amqp/internal/claims_based_security.hpp>
#include <azure/core/amqp/internal/connection.hpp>
#include <azure/core/amqp/internal/link.hpp>
#include <azure/core/amqp/internal/message_receiver.hpp>
#include <azure/core/amqp/internal/message_sender.hpp>
#include <azure/core/amqp/internal/models/amqp_protocol.hpp>
#include <azure/core/amqp/internal/models/message_source.hpp>
#include <azure/core/amqp/internal/models/message_target.hpp>
#include <azure/core/amqp/internal/models/messaging_values.hpp>
@ -91,7 +89,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
senderOptions.InitialDeliveryCount = 0;
m_sender[linkName] = std::make_unique<Azure::Core::Amqp::_internal::MessageSender>(
session.CreateMessageSender(linkEndpoint, target, senderOptions, this));
(void)!m_sender[linkName]->HalfOpen();
// NOTE: The linkEndpoint needs to be attached before this function returns in order to
// correctly process incoming attach requests. Otherwise, the attach request will be
// discarded, and the link will be in a half attached state.
(void)!m_sender[linkName]->HalfOpen(m_listenerContext);
}
else
{
@ -117,8 +118,20 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
receiverOptions.InitialDeliveryCount = 0;
m_receiver[linkName] = std::make_unique<Azure::Core::Amqp::_internal::MessageReceiver>(
session.CreateMessageReceiver(linkEndpoint, source, receiverOptions, this));
GTEST_LOG_(INFO) << "Open new message receiver.";
m_receiver[linkName]->Open();
// NOTE: The linkEndpoint needs to be attached before this function returns in order to
// correctly process incoming attach requests. Otherwise, the attach request will be
// discarded, and the link will be in a half attached state.
// Note that there is a potential deadlock when opening the message receiver - the
// connection lock cannot be held when link polling is enabled on the incoming link.
// This is because the link polling will try to acquire the connection lock, which is
// already held by the current thread. To avoid this deadlock, we defer enabling link
// polling until later when it is safe to do so.
m_receiver[linkName]->Open(m_listenerContext);
// Now that the receiver is open, we can enable link polling from the message loop.
m_receiverPollingEnableQueue.CompleteOperation(linkName);
}
else
{
@ -247,6 +260,15 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
Azure::Core::Context& GetListenerContext() { return m_listenerContext; }
Azure::Core::Amqp::Models::AmqpValue OnMessageReceived(
Azure::Core::Amqp::_internal::MessageReceiver const& receiver,
std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage> const& message) override
{
GTEST_LOG_(INFO) << "MockServiceEndpoint(" << m_name << ") Received a message " << *message;
m_messageQueue.CompleteOperation(receiver.GetLinkName(), message);
return Azure::Core::Amqp::Models::_internal::Messaging::DeliveryAccepted();
}
private:
Azure::Core::Context m_listenerContext; // Used to cancel the listener if necessary.
bool m_enableTrace{true};
@ -256,6 +278,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
std::map<std::string, std::unique_ptr<Azure::Core::Amqp::_internal::MessageReceiver>>
m_receiver;
Azure::Core::Amqp::Common::_internal::AsyncOperationQueue<std::string>
m_receiverPollingEnableQueue;
Azure::Core::Amqp::Common::_internal::
AsyncOperationQueue<std::string, std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage>>
m_messageQueue;
@ -283,6 +307,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
MessageReceived(std::get<0>(*message), std::get<1>(*message));
}
auto senderDisconnected = m_messageSenderDisconnectedQueue.TryWaitForResult();
if (senderDisconnected)
{
@ -305,6 +330,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
receiver->Close(m_listenerContext);
}
auto receiverPollingEnable = m_receiverPollingEnableQueue.TryWaitForResult();
if (receiverPollingEnable)
{
std::string receiverName = std::get<0>(*receiverPollingEnable);
GTEST_LOG_(INFO) << "Enable link polling for receiver: " << receiverName;
m_receiver[receiverName]->EnableLinkPolling();
}
if (m_receiver.empty() && m_sender.empty())
{
GTEST_LOG_(INFO) << "No more links, exiting message loop.";
@ -326,17 +359,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
<< " New state: " << newState;
}
protected:
Azure::Core::Amqp::Models::AmqpValue OnMessageReceived(
Azure::Core::Amqp::_internal::MessageReceiver const& receiver,
std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage> const& message) override
{
GTEST_LOG_(INFO) << "MockServiceEndpoint(" << m_name << ") Received a message " << *message;
m_messageQueue.CompleteOperation(receiver.GetLinkName(), message);
return Azure::Core::Amqp::Models::_internal::Messaging::DeliveryAccepted();
}
private:
virtual void OnMessageReceiverDisconnected(
Azure::Core::Amqp::_internal::MessageReceiver const& receiver,
Azure::Core::Amqp::Models::_internal::AmqpError const& error) override
@ -554,10 +576,6 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
m_serviceEndpoints.push_back(endpoint);
}
private:
std::vector<std::shared_ptr<MockServiceEndpoint>> m_serviceEndpoints;
public:
uint16_t GetPort() const { return m_testPort; }
Azure::Core::Context& GetListenerContext() { return m_listenerContext; }
@ -649,29 +667,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
void EnableTrace(bool enableTrace) { m_enableTrace = enableTrace; }
private:
// The set of incoming connections, used when tearing down the mock server.
std::list<std::shared_ptr<Azure::Core::Amqp::_internal::Connection>> m_connections;
// The set of sessions.
std::list<std::shared_ptr<Azure::Core::Amqp::_internal::Session>> m_sessions;
// bool m_connectionValid{false};
bool m_enableTrace{true};
bool m_listening{false};
std::string m_connectionId;
std::thread m_serverThread;
std::uint16_t m_testPort;
protected:
// For each incoming message source, we create a queue of messages intended for that
// message source.
//
// Each message queue is keyed by the message-id.
// std::map < std::string, MessageLinkComponents> m_linkMessageQueues;
Azure::Core::Context m_listenerContext; // Used to cancel the listener if necessary.
// Inherited from SocketListenerEvents
virtual void OnSocketAccepted(
std::shared_ptr<Azure::Core::Amqp::Network::_internal::Transport> transport) override
@ -764,6 +760,24 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
GTEST_LOG_(INFO) << "Unknown endpoint name: " << endpointName;
return false;
}
private:
std::vector<std::shared_ptr<MockServiceEndpoint>> m_serviceEndpoints;
// The set of incoming connections, used when tearing down the mock server.
std::list<std::shared_ptr<Azure::Core::Amqp::_internal::Connection>> m_connections;
// The set of sessions.
std::list<std::shared_ptr<Azure::Core::Amqp::_internal::Session>> m_sessions;
bool m_enableTrace{true};
bool m_listening{false};
std::string m_connectionId;
std::thread m_serverThread;
std::uint16_t m_testPort;
Azure::Core::Context m_listenerContext; // Used to cancel the listener if necessary.
};
} // namespace MessageTests
}}}} // namespace Azure::Core::Amqp::Tests

View File

@ -92,6 +92,7 @@ extern "C" {
MOCKABLE_FUNCTION(, int, connection_get_properties, CONNECTION_HANDLE, connection, fields*, properties);
MOCKABLE_FUNCTION(, int, connection_get_remote_max_frame_size, CONNECTION_HANDLE, connection, uint32_t*, remote_max_frame_size);
MOCKABLE_FUNCTION(, int, connection_set_remote_idle_timeout_empty_frame_send_ratio, CONNECTION_HANDLE, connection, double, idle_timeout_empty_frame_send_ratio);
MOCKABLE_FUNCTION(, char*, connection_get_container_id, CONNECTION_HANDLE, connection);
MOCKABLE_FUNCTION(, uint64_t, connection_handle_deadlines, CONNECTION_HANDLE, connection);
MOCKABLE_FUNCTION(, void, connection_dowork, CONNECTION_HANDLE, connection);
MOCKABLE_FUNCTION(, ENDPOINT_HANDLE, connection_create_endpoint, CONNECTION_HANDLE, connection);

View File

@ -83,7 +83,7 @@ MU_DEFINE_ENUM(SESSION_SEND_TRANSFER_RESULT, SESSION_SEND_TRANSFER_RESULT_VALUES
MOCKABLE_FUNCTION(, int, session_send_disposition, LINK_ENDPOINT_HANDLE, link_endpoint, DISPOSITION_HANDLE, disposition);
MOCKABLE_FUNCTION(, int, session_send_detach, LINK_ENDPOINT_HANDLE, link_endpoint, DETACH_HANDLE, detach);
MOCKABLE_FUNCTION(, SESSION_SEND_TRANSFER_RESULT, session_send_transfer, LINK_ENDPOINT_HANDLE, link_endpoint, TRANSFER_HANDLE, transfer, PAYLOAD*, payloads, size_t, payload_count, delivery_number*, delivery_id, ON_SEND_COMPLETE, on_send_complete, void*, callback_context);
MOCKABLE_FUNCTION(, CONNECTION_HANDLE, session_get_connection, SESSION_HANDLE, session);
#ifdef __cplusplus
}
#endif /* __cplusplus */

View File

@ -2481,6 +2481,23 @@ int connection_set_remote_idle_timeout_empty_frame_send_ratio(CONNECTION_HANDLE
return result;
}
char *connection_get_container_id(CONNECTION_HANDLE connection)
{
char* result;
if (connection == NULL)
{
LogError("NULL connection");
result = NULL;
}
else
{
result = connection->container_id;
}
return result;
}
ON_CONNECTION_CLOSED_EVENT_SUBSCRIPTION_HANDLE connection_subscribe_on_connection_close_received(CONNECTION_HANDLE connection, ON_CONNECTION_CLOSE_RECEIVED on_connection_close_received, void* context)
{
ON_CONNECTION_CLOSED_EVENT_SUBSCRIPTION_HANDLE result;

View File

@ -753,6 +753,7 @@ LINK_HANDLE link_create(SESSION_HANDLE session, const char* name, role role, AMQ
result->target = amqpvalue_clone(target);
result->session = session;
result->handle = 0;
result->name = NULL;
result->snd_settle_mode = sender_settle_mode_unsettled;
result->rcv_settle_mode = receiver_settle_mode_first;
result->delivery_count = 0;
@ -801,11 +802,12 @@ LINK_HANDLE link_create(SESSION_HANDLE session, const char* name, role role, AMQ
}
else
{
(void)memcpy(result->name, name, name_length + 1);
result->on_link_state_changed = NULL;
result->callback_context = NULL;
set_link_state(result, LINK_STATE_DETACHED);
(void)memcpy(result->name, name, name_length + 1);
result->link_endpoint = session_create_link_endpoint(session, name);
if (result->link_endpoint == NULL)
{
@ -906,6 +908,11 @@ LINK_HANDLE link_create_from_endpoint(SESSION_HANDLE session, LINK_ENDPOINT_HAND
result->on_link_state_changed = NULL;
result->callback_context = NULL;
result->link_endpoint = link_endpoint;
// This ensures link.c gets notified if the link endpoint is destroyed
// by uamqp (due to a DETACH from the hub, e.g.) to prevent a double free.
session_set_link_endpoint_callback(
result->link_endpoint, on_link_endpoint_destroyed, result);
}
}
}

View File

@ -1027,6 +1027,25 @@ int session_end(SESSION_HANDLE session, const char* condition_value, const char*
return result;
}
CONNECTION_HANDLE session_get_connection(SESSION_HANDLE session)
{
CONNECTION_HANDLE result;
if (session == NULL)
{
result = NULL;
}
else
{
SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)session;
result = session_instance->connection;
}
return result;
}
int session_set_incoming_window(SESSION_HANDLE session, uint32_t incoming_window)
{
int result;