Port uAMQPPython fixes to uAMQP to vendored uAMQP (#5059)

* Port uAMQPPython fixes to uAMQP to vendored uAMQP

* Fixed unit test failures

* Diff ported fixes against python codebase
This commit is contained in:
Larry Osterman 2023-10-27 09:17:26 -07:00 committed by GitHub
parent 2337ee796e
commit 102c6f3d95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 619 additions and 35 deletions

View File

@ -139,6 +139,13 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
*/
std::map<std::string, AmqpValue> ApplicationProperties;
/** @brief Delivery Tag for the message.
*
* For more information, see [AMQP Application
* Properties](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-application-properties).
*/
AmqpValue DeliveryTag;
/** @brief Footer for the message.
*
* For more information, see [AMQP

View File

@ -11,7 +11,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
static Models::AmqpValue DeliveryAccepted();
static Models::AmqpValue DeliveryRejected(
std::string const& errorCondition,
std::string const& errorDescription);
std::string const& errorDescription,
Models::AmqpValue const& errorInformation);
static Models::AmqpValue DeliveryReleased();
static Models::AmqpValue DeliveryModified(
bool deliveryFailed,

View File

@ -312,6 +312,25 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
}
}
void LinkImpl::SetDesiredCapabilities(Models::AmqpValue desiredCapabilities)
{
if (link_set_desired_capabilities(m_link, desiredCapabilities))
{
throw std::runtime_error("Could not set desired capabilities.");
}
}
Models::AmqpValue LinkImpl::GetDesiredCapabilities() const
{
AMQP_VALUE desiredCapabilitiesVal;
if (link_get_desired_capabilities(m_link, &desiredCapabilitiesVal))
{
throw std::runtime_error("Could not convert field to header.");
}
return Models::AmqpValue{desiredCapabilitiesVal};
}
void LinkImpl::SubscribeToDetachEvent(OnLinkDetachEvent onLinkDetach)
{
m_onLinkDetachEvent = std::move(onLinkDetach);
@ -336,6 +355,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
}
}
void LinkImpl::ResetLinkCredit(std::uint32_t linkCredit, bool drain)
{
if (link_reset_link_credit(m_link, linkCredit, drain))
{
throw std::runtime_error("Could not reset link credit.");
}
}
void LinkImpl::Attach()
{
if (link_attach(m_link, nullptr, nullptr, nullptr, this))
@ -359,5 +386,4 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
throw std::runtime_error("Could not set attach properties.");
}
}
}}}} // namespace Azure::Core::Amqp::_detail

View File

@ -451,7 +451,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
m_messageQueue.CompleteOperation(
_internal::ManagementOperationStatus::Error, 500, error, Models::AmqpMessage());
return Models::_internal::Messaging::DeliveryRejected(condition, description);
return Models::_internal::Messaging::DeliveryRejected(condition, description, {});
}
Models::AmqpValue ManagementClientImpl::OnMessageReceived(

View File

@ -239,7 +239,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
return Models::_internal::Messaging::DeliveryRejected(
Models::_internal::AmqpErrorCondition::ConnectionForced.ToString(),
"Message Receiver is closed.");
"Message Receiver is closed.",
{});
}
Models::AmqpValue MessageReceiverImpl::OnMessageReceived(Models::AmqpMessage message)

View File

@ -61,6 +61,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
void SetAttachProperties(Models::AmqpValue attachProperties);
void SetMaxLinkCredit(uint32_t maxLinkCredit);
void SetDesiredCapabilities(Models::AmqpValue desiredCapabilities);
Models::AmqpValue GetDesiredCapabilities() const;
/** @brief Subscribe to link detach events. */
void SubscribeToDetachEvent(OnLinkDetachEvent onLinkDetachEvent);
void UnsubscribeFromDetachEvent();
@ -74,6 +77,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
std::shared_ptr<_detail::SessionImpl> const& GetSession() const { return m_session; }
void ResetLinkCredit(std::uint32_t linkCredit, bool drain);
void Attach();
void Detach(

View File

@ -154,6 +154,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
}
}
}
{
AMQP_VALUE deliveryTagVal;
if (!message_get_delivery_tag(message, &deliveryTagVal))
{
UniqueAmqpValueHandle deliveryTag(deliveryTagVal);
rv.DeliveryTag = AmqpValue{deliveryTag.get()};
}
}
{
annotations footerVal;
if (!message_get_footer(message, &footerVal) && footerVal)
@ -256,6 +264,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
throw std::runtime_error("Could not set delivery annotations.");
}
}
if (!message.MessageAnnotations.empty())
{
if (message_set_message_annotations(
@ -264,6 +273,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
throw std::runtime_error("Could not set message annotations.");
}
}
if (!message.ApplicationProperties.empty())
{
AmqpMap appProperties;
@ -285,6 +295,16 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
throw std::runtime_error("Could not set application properties.");
}
}
if (!message.DeliveryTag.IsNull())
{
if (message_set_delivery_tag(
rv.get(), static_cast<UniqueAmqpValueHandle>(message.DeliveryTag).get()))
{
throw std::runtime_error("Could not set delivery tag.");
}
}
if (!message.Footer.empty())
{
if (message_set_footer(rv.get(), static_cast<UniqueAmqpValueHandle>(message.Footer).get()))
@ -749,6 +769,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models {
os << "{" << val.first << ", " << val.second << "}";
}
}
if (!message.DeliveryTag.IsNull())
{
os << ", deliveryTag=" << message.DeliveryTag;
}
if (!message.Footer.empty())
{
os << std::endl << " Footer: ";

View File

@ -37,11 +37,13 @@ namespace Azure { namespace Core { namespace Amqp { namespace Models { namespace
}
Models::AmqpValue Messaging::DeliveryRejected(
std::string const& errorCondition,
std::string const& errorDescription)
std::string const& errorDescription,
AmqpValue const& errorInfo)
{
auto rv = messaging_delivery_rejected(
errorCondition.empty() ? nullptr : errorCondition.c_str(),
errorDescription.empty() ? nullptr : errorDescription.c_str());
errorDescription.empty() ? nullptr : errorDescription.c_str(),
errorInfo);
if (!rv)
{
throw std::runtime_error("Could not allocate delivery rejected described value.");

View File

@ -102,7 +102,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
{
GTEST_LOG_(INFO) << "Rejecting message because it is for an unexpected node name.";
auto rv = Azure::Core::Amqp::Models::_internal::Messaging::DeliveryRejected(
"test:Rejected", "Unknown message source.");
"test:Rejected", "Unknown message source.", {});
GTEST_LOG_(INFO) << "RV=" << rv;
return rv;
}
@ -112,7 +112,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
{
GTEST_LOG_(INFO) << "Rejecting message because it is for an unknown operation.";
auto rv = Azure::Core::Amqp::Models::_internal::Messaging::DeliveryRejected(
"amqp:status:rejected", "Unknown Request operation");
"amqp:status:rejected", "Unknown Request operation", {});
GTEST_LOG_(INFO) << "RV=" << rv;
return rv;
}

View File

@ -64,7 +64,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
{
auto accepted{Models::_internal::Messaging::DeliveryAccepted()};
auto released{Models::_internal::Messaging::DeliveryReleased()};
auto rejected{Models::_internal::Messaging::DeliveryRejected("error", "description")};
auto rejected{Models::_internal::Messaging::DeliveryRejected("error", "description", {})};
auto modified{Models::_internal::Messaging::DeliveryModified(true, false, "Annotations")};
auto received{Models::_internal::Messaging::DeliveryReceived(3, 24)};
}

View File

@ -69,6 +69,7 @@ extern "C" {
MOCKABLE_FUNCTION(, int, amqpvalue_get_list_item_count, AMQP_VALUE, list, uint32_t*, count);
MOCKABLE_FUNCTION(, int, amqpvalue_set_list_item, AMQP_VALUE, list, uint32_t, index, AMQP_VALUE, list_item_value);
MOCKABLE_FUNCTION(, AMQP_VALUE, amqpvalue_get_list_item, AMQP_VALUE, list, size_t, index);
MOCKABLE_FUNCTION(, int, amqpvalue_get_list, AMQP_VALUE, from_value, AMQP_VALUE*, list);
MOCKABLE_FUNCTION(, AMQP_VALUE, amqpvalue_create_map);
MOCKABLE_FUNCTION(, int, amqpvalue_set_map_value, AMQP_VALUE, map, AMQP_VALUE, key, AMQP_VALUE, value);
MOCKABLE_FUNCTION(, AMQP_VALUE, amqpvalue_get_map_value, AMQP_VALUE, map, AMQP_VALUE, key);

View File

@ -67,6 +67,8 @@ MOCKABLE_FUNCTION(, int, link_set_max_message_size, LINK_HANDLE, link, uint64_t,
MOCKABLE_FUNCTION(, int, link_get_max_message_size, LINK_HANDLE, link, uint64_t*, max_message_size);
MOCKABLE_FUNCTION(, int, link_get_peer_max_message_size, LINK_HANDLE, link, uint64_t*, peer_max_message_size);
MOCKABLE_FUNCTION(, int, link_set_attach_properties, LINK_HANDLE, link, fields, attach_properties);
MOCKABLE_FUNCTION(, int, link_set_desired_capabilities, LINK_HANDLE, link, AMQP_VALUE, desired_capabilities);
MOCKABLE_FUNCTION(, int, link_get_desired_capabilities, LINK_HANDLE, link, AMQP_VALUE*, desired_capabilities);
MOCKABLE_FUNCTION(, int, link_set_max_link_credit, LINK_HANDLE, link, uint32_t, max_link_credit);
MOCKABLE_FUNCTION(, int, link_get_name, LINK_HANDLE, link, const char**, link_name);
MOCKABLE_FUNCTION(, int, link_get_received_message_id, LINK_HANDLE, link, delivery_number*, message_id);
@ -75,6 +77,7 @@ MOCKABLE_FUNCTION(, int, link_attach, LINK_HANDLE, link, ON_TRANSFER_RECEIVED, o
MOCKABLE_FUNCTION(, int, link_detach, LINK_HANDLE, link, bool, close, const char*, error_condition, const char*, error_description, AMQP_VALUE, info);
MOCKABLE_FUNCTION(, ASYNC_OPERATION_HANDLE, link_transfer_async, LINK_HANDLE, handle, message_format, message_format, PAYLOAD*, payloads, size_t, payload_count, ON_DELIVERY_SETTLED, on_delivery_settled, void*, callback_context, LINK_TRANSFER_RESULT*, link_transfer_result,tickcounter_ms_t, timeout);
MOCKABLE_FUNCTION(, void, link_dowork, LINK_HANDLE, link);
MOCKABLE_FUNCTION(, int, link_reset_link_credit, LINK_HANDLE, link, uint32_t, link_credit, bool, drain);
MOCKABLE_FUNCTION(, ON_LINK_DETACH_EVENT_SUBSCRIPTION_HANDLE, link_subscribe_on_link_detach_received, LINK_HANDLE, link, ON_LINK_DETACH_RECEIVED, on_link_detach_received, void*, context);
MOCKABLE_FUNCTION(, void, link_unsubscribe_on_link_detach_received, ON_LINK_DETACH_EVENT_SUBSCRIPTION_HANDLE, event_subscription);

View File

@ -67,6 +67,8 @@ extern "C" {
MOCKABLE_FUNCTION(, int, message_get_body_type, MESSAGE_HANDLE, message, MESSAGE_BODY_TYPE*, body_type);
MOCKABLE_FUNCTION(, int, message_set_message_format, MESSAGE_HANDLE, message, uint32_t, message_format);
MOCKABLE_FUNCTION(, int, message_get_message_format, MESSAGE_HANDLE, message, uint32_t*, message_format);
MOCKABLE_FUNCTION(, int, message_set_delivery_tag, MESSAGE_HANDLE, message, AMQP_VALUE, delivery_tag_value);
MOCKABLE_FUNCTION(, int, message_get_delivery_tag, MESSAGE_HANDLE, message, AMQP_VALUE*, delivery_tag_value);
#ifdef __cplusplus
}

View File

@ -20,7 +20,7 @@ extern "C" {
MOCKABLE_FUNCTION(, AMQP_VALUE, messaging_delivery_received, uint32_t, section_number, uint64_t, section_offset);
MOCKABLE_FUNCTION(, AMQP_VALUE, messaging_delivery_accepted);
MOCKABLE_FUNCTION(, AMQP_VALUE, messaging_delivery_rejected, const char*, error_condition, const char*, error_description);
MOCKABLE_FUNCTION(, AMQP_VALUE, messaging_delivery_rejected, const char*, error_condition, const char*, error_description, fields, error_info);
MOCKABLE_FUNCTION(, AMQP_VALUE, messaging_delivery_released);
MOCKABLE_FUNCTION(, AMQP_VALUE, messaging_delivery_modified, bool, delivery_failed, bool, undeliverable_here, fields, message_annotations);

View File

@ -85,7 +85,7 @@ static AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE messag
LogError("Could not retrieve application properties");
amqp_management->on_amqp_management_error(amqp_management->on_amqp_management_error_context);
/* Codes_SRS_AMQP_MANAGEMENT_01_136: [ When `on_message_received` fails due to errors in parsing the response message `on_message_received` shall call `messaging_delivery_rejected` and return the created delivery AMQP value. ]*/
result = messaging_delivery_rejected("amqp:internal-error", "Could not get application properties on AMQP management response.");
result = messaging_delivery_rejected("amqp:internal-error", "Could not get application properties on AMQP management response.", NULL);
}
else
{
@ -98,7 +98,7 @@ static AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE messag
LogError("Could not retrieve message properties");
amqp_management->on_amqp_management_error(amqp_management->on_amqp_management_error_context);
/* Codes_SRS_AMQP_MANAGEMENT_01_136: [ When `on_message_received` fails due to errors in parsing the response message `on_message_received` shall call `messaging_delivery_rejected` and return the created delivery AMQP value. ]*/
result = messaging_delivery_rejected("amqp:internal-error", "Could not get message properties on AMQP management response.");
result = messaging_delivery_rejected("amqp:internal-error", "Could not get message properties on AMQP management response.", NULL);
}
else
{
@ -117,7 +117,7 @@ static AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE messag
LogError("Could not retrieve correlation Id");
amqp_management->on_amqp_management_error(amqp_management->on_amqp_management_error_context);
/* Codes_SRS_AMQP_MANAGEMENT_01_136: [ When `on_message_received` fails due to errors in parsing the response message `on_message_received` shall call `messaging_delivery_rejected` and return the created delivery AMQP value. ]*/
result = messaging_delivery_rejected("amqp:internal-error", "Could not get correlation Id from AMQP management response.");
result = messaging_delivery_rejected("amqp:internal-error", "Could not get correlation Id from AMQP management response.", NULL);
}
else
{
@ -127,7 +127,7 @@ static AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE messag
LogError("Could not retrieve correlation Id ulong value");
amqp_management->on_amqp_management_error(amqp_management->on_amqp_management_error_context);
/* Codes_SRS_AMQP_MANAGEMENT_01_136: [ When `on_message_received` fails due to errors in parsing the response message `on_message_received` shall call `messaging_delivery_rejected` and return the created delivery AMQP value. ]*/
result = messaging_delivery_rejected("amqp:internal-error", "Could not get correlation Id from AMQP management response.");
result = messaging_delivery_rejected("amqp:internal-error", "Could not get correlation Id from AMQP management response.", NULL);
}
else
{
@ -140,7 +140,7 @@ static AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE messag
LogError("Could not retrieve application property map");
amqp_management->on_amqp_management_error(amqp_management->on_amqp_management_error_context);
/* Codes_SRS_AMQP_MANAGEMENT_01_136: [ When `on_message_received` fails due to errors in parsing the response message `on_message_received` shall call `messaging_delivery_rejected` and return the created delivery AMQP value. ]*/
result = messaging_delivery_rejected("amqp:internal-error", "Could not get application property map from the application properties in the AMQP management response.");
result = messaging_delivery_rejected("amqp:internal-error", "Could not get application property map from the application properties in the AMQP management response.", NULL);
}
else
{
@ -165,7 +165,7 @@ static AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE messag
LogError("Could not retrieve status code from application properties");
amqp_management->on_amqp_management_error(amqp_management->on_amqp_management_error_context);
/* Codes_SRS_AMQP_MANAGEMENT_01_136: [ When `on_message_received` fails due to errors in parsing the response message `on_message_received` shall call `messaging_delivery_rejected` and return the created delivery AMQP value. ]*/
result = messaging_delivery_rejected("amqp:internal-error", "Could not retrieve status code from the application properties in the AMQP management response.");
result = messaging_delivery_rejected("amqp:internal-error", "Could not retrieve status code from the application properties in the AMQP management response.", NULL);
}
else
{
@ -177,7 +177,7 @@ static AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE messag
LogError("Could not retrieve status code int value");
amqp_management->on_amqp_management_error(amqp_management->on_amqp_management_error_context);
/* Codes_SRS_AMQP_MANAGEMENT_01_136: [ When `on_message_received` fails due to errors in parsing the response message `on_message_received` shall call `messaging_delivery_rejected` and return the created delivery AMQP value. ]*/
result = messaging_delivery_rejected("amqp:internal-error", "Could not retrieve status code value from the application properties in the AMQP management response.");
result = messaging_delivery_rejected("amqp:internal-error", "Could not retrieve status code value from the application properties in the AMQP management response.", NULL);
}
else
{
@ -203,8 +203,17 @@ static AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE messag
desc_value = amqpvalue_get_map_value(map, desc_key);
if (desc_value != NULL)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_134: [ The status description value shall be extracted from the value found in the map by using `amqpvalue_get_string`. ]*/
if (amqpvalue_get_string(desc_value, &status_description) != 0)
AMQP_TYPE amqp_type = amqpvalue_get_type(desc_value);
if (amqp_type == AMQP_TYPE_STRING)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_134: [ The status description value shall be extracted from the value found in the map by using `amqpvalue_get_string`. ]*/
if (amqpvalue_get_string(desc_value, &status_description) != 0)
{
/* Codes_SRS_AMQP_MANAGEMENT_01_125: [ If status description is not found, NULL shall be passed to the user callback as `status_description` argument. ]*/
status_description = NULL;
}
}
else
{
/* Codes_SRS_AMQP_MANAGEMENT_01_125: [ If status description is not found, NULL shall be passed to the user callback as `status_description` argument. ]*/
status_description = NULL;
@ -308,7 +317,7 @@ static AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE messag
LogError("Could not match AMQP management response to request");
amqp_management->on_amqp_management_error(amqp_management->on_amqp_management_error_context);
/* Codes_SRS_AMQP_MANAGEMENT_01_135: [ When an error occurs in creating AMQP values (for status code, etc.) `on_message_received` shall call `messaging_delivery_released` and return the created delivery AMQP value. ]*/
result = messaging_delivery_rejected("amqp:internal-error", "Could not match AMQP management response to request");
result = messaging_delivery_rejected("amqp:internal-error", "Could not match AMQP management response to request", NULL);
}
else
{

View File

@ -1506,6 +1506,35 @@ AMQP_VALUE amqpvalue_get_list_item(AMQP_VALUE value, size_t index)
return result;
}
int amqpvalue_get_list(AMQP_VALUE value, AMQP_VALUE* list_value)
{
int result;
if ((value == NULL) ||
(list_value == NULL))
{
LogError("Bad arguments: value = %p, list_value = %p",
value, list_value);
result = MU_FAILURE;
}
else
{
AMQP_VALUE_DATA* value_data = (AMQP_VALUE_DATA*)value;
if (value_data->type != AMQP_TYPE_LIST)
{
LogError("Value is not of type LIST");
result = MU_FAILURE;
}
else
{
*list_value = value;
result = 0;
}
}
return result;
}
/* Codes_SRS_AMQPVALUE_01_178: [amqpvalue_create_map shall create an AMQP value that holds a map and return a handle to it.] */
/* Codes_SRS_AMQPVALUE_01_031: [1.6.23 map A polymorphic mapping from distinct keys to values.] */
AMQP_VALUE amqpvalue_create_map(void)

View File

@ -57,10 +57,11 @@ typedef struct LINK_INSTANCE_TAG
sequence_no initial_delivery_count;
uint64_t max_message_size;
uint64_t peer_max_message_size;
uint32_t current_link_credit;
int32_t current_link_credit;
uint32_t max_link_credit;
uint32_t available;
fields attach_properties;
AMQP_VALUE desired_capabilities;
bool is_underlying_session_begun;
bool is_closed;
unsigned char* received_payload;
@ -279,6 +280,15 @@ static int send_attach(LINK_INSTANCE* link, const char* name, handle handle, rol
(void)attach_set_properties(attach, link->attach_properties);
}
if (link->desired_capabilities != NULL)
{
if(attach_set_desired_capabilities(attach, link->desired_capabilities) != 0)
{
LogError("Cannot set attach desired capabilities");
result = MU_FAILURE;
}
}
if (role == role_sender)
{
if (attach_set_initial_delivery_count(attach, link->delivery_count) != 0)
@ -422,12 +432,6 @@ static void link_frame_received(void* context, AMQP_VALUE performative, uint32_t
bool more;
bool is_error;
if (link_instance->current_link_credit == 0)
{
link_instance->current_link_credit = link_instance->max_link_credit;
send_flow(link_instance);
}
more = false;
/* Attempt to get more flag, default to false */
(void)transfer_get_more(transfer_handle, &more);
@ -749,6 +753,7 @@ LINK_HANDLE link_create(SESSION_HANDLE session, const char* name, role role, AMQ
result->is_underlying_session_begun = false;
result->is_closed = false;
result->attach_properties = NULL;
result->desired_capabilities = NULL;
result->received_payload = NULL;
result->received_payload_size = 0;
result->received_delivery_id = 0;
@ -838,6 +843,7 @@ LINK_HANDLE link_create_from_endpoint(SESSION_HANDLE session, LINK_ENDPOINT_HAND
result->is_underlying_session_begun = false;
result->is_closed = false;
result->attach_properties = NULL;
result->desired_capabilities = NULL;
result->received_payload = NULL;
result->received_payload_size = 0;
result->received_delivery_id = 0;
@ -1116,6 +1122,59 @@ int link_get_peer_max_message_size(LINK_HANDLE link, uint64_t* peer_max_message_
return result;
}
int link_get_desired_capabilities(LINK_HANDLE link, AMQP_VALUE* desired_capabilities)
{
int result;
if((link == NULL) ||
(desired_capabilities == NULL))
{
LogError("Bad arguments: link = %p, desired_capabilities = %p",
link, desired_capabilities);
result = MU_FAILURE;
}
else
{
AMQP_VALUE link_desired_capabilties = amqpvalue_clone(link->desired_capabilities);
if(link_desired_capabilties == NULL)
{
LogError("Failed to clone link desired capabilities");
result = MU_FAILURE;
}
else
{
*desired_capabilities = link_desired_capabilties;
result = 0;
}
}
return result;
}
int link_set_desired_capabilities(LINK_HANDLE link, AMQP_VALUE desired_capabilities)
{
int result;
if (link == NULL)
{
LogError("NULL link");
result = MU_FAILURE;
}
else
{
link->desired_capabilities = amqpvalue_clone(desired_capabilities);
if (link->desired_capabilities == NULL)
{
LogError("Failed cloning desired capabilities");
result = MU_FAILURE;
}
else
{
result = 0;
}
}
return result;
}
int link_set_attach_properties(LINK_HANDLE link, fields attach_properties)
{
int result;
@ -1159,6 +1218,74 @@ int link_set_max_link_credit(LINK_HANDLE link, uint32_t max_link_credit)
return result;
}
int link_reset_link_credit(LINK_HANDLE link, uint32_t link_credit, bool drain)
{
int result;
FLOW_HANDLE flow;
LINK_INSTANCE* link_instance = (LINK_INSTANCE*)link;
if (link == NULL)
{
result = MU_FAILURE;
}
else
{
if(link_instance->role == role_sender)
{
LogError("Sender is not allowed to reset link credit");
result = MU_FAILURE;
}
else
{
link->current_link_credit = link_credit;
flow = flow_create(0, 0, 0);
if (flow == NULL)
{
LogError("NULL flow performative");
result = MU_FAILURE;
}
else
{
if (flow_set_link_credit(flow, link->current_link_credit) != 0)
{
LogError("Cannot set link credit on flow performative");
result = MU_FAILURE;
}
else if (flow_set_handle(flow, link->handle) != 0)
{
LogError("Cannot set handle on flow performative");
result = MU_FAILURE;
}
else if (flow_set_delivery_count(flow, link->delivery_count) != 0)
{
LogError("Cannot set delivery count on flow performative");
result = MU_FAILURE;
}
else if (drain && flow_set_drain(flow, drain) != 0)
{
LogError("Cannot set drain on flow performative");
result = MU_FAILURE;
}
else
{
if (session_send_flow(link->link_endpoint, flow) != 0)
{
LogError("Sending flow frame failed in session send");
result = MU_FAILURE;
}
else
{
result = 0;
}
}
flow_destroy(flow);
}
}
}
return result;
}
int link_attach(LINK_HANDLE link, ON_TRANSFER_RECEIVED on_transfer_received, ON_LINK_STATE_CHANGED on_link_state_changed, ON_LINK_FLOW_ON on_link_flow_on, void* callback_context)
{
int result;
@ -1608,6 +1735,12 @@ void link_dowork(LINK_HANDLE link)
{
tickcounter_ms_t current_tick;
if (link->current_link_credit <= 0)
{
link->current_link_credit = link->max_link_credit;
send_flow(link);
}
if (tickcounter_get_current_ms(link->tick_counter, &current_tick) != 0)
{
LogError("Cannot get tick counter value");

View File

@ -31,6 +31,7 @@ typedef struct MESSAGE_INSTANCE_TAG
application_properties application_properties;
annotations footer;
uint32_t message_format;
AMQP_VALUE delivery_tag;
} MESSAGE_INSTANCE;
MESSAGE_BODY_TYPE internal_get_body_type(MESSAGE_HANDLE message)
@ -120,6 +121,7 @@ MESSAGE_HANDLE message_create(void)
result->body_amqp_value = NULL;
result->body_amqp_sequence_items = NULL;
result->body_amqp_sequence_count = 0;
result->delivery_tag = NULL;
/* Codes_SRS_MESSAGE_01_135: [ By default a message on which `message_set_message_format` was not called shall have message format set to 0. ]*/
result->message_format = 0;
@ -230,6 +232,17 @@ MESSAGE_HANDLE message_clone(MESSAGE_HANDLE source_message)
}
}
if ((result != NULL) && (source_message->delivery_tag != NULL))
{
result->delivery_tag = amqpvalue_clone(source_message->delivery_tag);
if (result->delivery_tag == NULL)
{
LogError("Cannot clone message delivery tag");
message_destroy(result);
result = NULL;
}
}
if ((result != NULL) && (source_message->body_amqp_data_count > 0))
{
size_t i;
@ -376,6 +389,11 @@ void message_destroy(MESSAGE_HANDLE message)
amqpvalue_destroy(message->body_amqp_value);
}
if (message->delivery_tag != NULL)
{
amqpvalue_destroy(message->delivery_tag);
}
/* Codes_SRS_MESSAGE_01_136: [ If the message body is made of several AMQP data items, they shall all be freed. ]*/
free_all_body_data_items(message);
@ -1448,3 +1466,89 @@ int message_get_message_format(MESSAGE_HANDLE message, uint32_t *message_format)
return result;
}
int message_set_delivery_tag(MESSAGE_HANDLE message, AMQP_VALUE delivery_tag_value)
{
int result;
if (message == NULL)
{
LogError("NULL message");
result = MU_FAILURE;
}
else
{
if (delivery_tag_value == NULL)
{
if (message->delivery_tag != NULL)
{
amqpvalue_destroy(message->delivery_tag);
message->delivery_tag = NULL;
}
/* Codes_SRS_MESSAGE_01_053: [ On success it shall return 0. ]*/
result = 0;
}
else
{
AMQP_VALUE new_delivery_tag = amqpvalue_clone(delivery_tag_value);
if (new_delivery_tag == NULL)
{
LogError("Cannot clone delivery tag");
result = MU_FAILURE;
}
else
{
if (message->delivery_tag != NULL)
{
amqpvalue_destroy(message->delivery_tag);
}
message->delivery_tag = new_delivery_tag;
/* Codes_SRS_MESSAGE_01_102: [ On success it shall return 0. ]*/
result = 0;
}
}
}
return result;
}
int message_get_delivery_tag(MESSAGE_HANDLE message, AMQP_VALUE *delivery_tag_value)
{
int result;
if ((message == NULL) ||
(delivery_tag_value == NULL))
{
LogError("Bad arguments: message = %p, delivery_tag = %p",
message, delivery_tag_value);
result = MU_FAILURE;
}
else
{
if (message->delivery_tag == NULL)
{
*delivery_tag_value = NULL;
result = 0;
}
else
{
AMQP_VALUE new_delivery_tag = amqpvalue_clone(message->delivery_tag);
if (new_delivery_tag == NULL)
{
LogError("Cannot clone delivery tag");
result = MU_FAILURE;
}
else
{
*delivery_tag_value = new_delivery_tag;
result = 0;
}
}
}
return result;
}

View File

@ -218,6 +218,50 @@ static void decode_message_value_callback(void* context, AMQP_VALUE decoded_valu
}
}
}
else if (is_amqp_sequence_type_by_descriptor(descriptor))
{
MESSAGE_BODY_TYPE body_type;
if (message_get_body_type(decoded_message, &body_type) != 0)
{
LogError("Error getting message body type");
message_receiver->decode_error = true;
}
else
{
if ((body_type != MESSAGE_BODY_TYPE_NONE) &&
(body_type != MESSAGE_BODY_TYPE_SEQUENCE))
{
LogError("Message body type already set to something different than AMQP SEQUENCE");
message_receiver->decode_error = true;
}
else
{
AMQP_VALUE body_amqp_sequence = amqpvalue_get_inplace_described_value(decoded_value);
if (body_amqp_sequence == NULL)
{
LogError("Error getting body AMQP sequence");
message_receiver->decode_error = true;
}
else
{
AMQP_VALUE sequence_value;
if (amqpvalue_get_amqp_sequence(body_amqp_sequence, &sequence_value) != 0)
{
LogError("Error getting body SEQUENCE AMQP value");
message_receiver->decode_error = true;
}
else
{
if (message_add_body_amqp_sequence(decoded_message, sequence_value) != 0)
{
LogError("Error setting body AMQP sequence on received message");
message_receiver->decode_error = true;
}
}
}
}
}
}
else
{
LogError("Failed decoding descriptor");
@ -230,7 +274,6 @@ static AMQP_VALUE on_transfer_received(void* context, TRANSFER_HANDLE transfer,
AMQP_VALUE result = NULL;
MESSAGE_RECEIVER_INSTANCE* message_receiver = (MESSAGE_RECEIVER_INSTANCE*)context;
(void)transfer;
if (message_receiver->on_message_received != NULL)
{
MESSAGE_HANDLE message = message_create();
@ -241,7 +284,31 @@ static AMQP_VALUE on_transfer_received(void* context, TRANSFER_HANDLE transfer,
}
else
{
AMQPVALUE_DECODER_HANDLE amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver);
delivery_tag received_message_tag;
message_format message_format_value;
AMQP_VALUE delivery_tag_value;
AMQPVALUE_DECODER_HANDLE amqpvalue_decoder;
if (transfer_get_message_format(transfer, &message_format_value) == 0)
{
message_set_message_format(message, message_format_value);
}
if (transfer_get_delivery_tag(transfer, &received_message_tag) == 0)
{
delivery_tag_value = amqpvalue_create_delivery_tag(received_message_tag);
if ((delivery_tag_value != NULL) && (message_set_delivery_tag(message, delivery_tag_value) != 0))
{
LogError("Could not set message delivery tag");
set_message_receiver_state(message_receiver, MESSAGE_RECEIVER_STATE_ERROR);
}
}
else
{
delivery_tag_value = NULL;
}
amqpvalue_decoder = amqpvalue_decoder_create(decode_message_value_callback, message_receiver);
if (amqpvalue_decoder == NULL)
{
LogError("Cannot create AMQP value decoder");
@ -271,7 +338,9 @@ static AMQP_VALUE on_transfer_received(void* context, TRANSFER_HANDLE transfer,
amqpvalue_decoder_destroy(amqpvalue_decoder);
}
if ( delivery_tag_value != NULL ) {
amqpvalue_destroy(delivery_tag_value);
}
message_destroy(message);
}
}

View File

@ -237,7 +237,10 @@ static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message
AMQP_VALUE application_properties_value = NULL;
AMQP_VALUE body_amqp_value = NULL;
size_t body_data_count = 0;
size_t body_sequence_count = 0;
AMQP_VALUE msg_annotations = NULL;
AMQP_VALUE footer = NULL;
AMQP_VALUE delivery_annotations = NULL;
bool is_error = false;
// message header
@ -331,6 +334,38 @@ static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message
}
}
// footer
if ((!is_error) &&
(message_get_footer(message, &footer) == 0) &&
(footer != NULL))
{
if (amqpvalue_get_encoded_size(footer, &encoded_size) != 0)
{
LogError("Cannot obtain footer encoded size");
is_error = true;
}
else
{
total_encoded_size += encoded_size;
}
}
// delivery annotations
if ((!is_error) &&
(message_get_delivery_annotations(message, &delivery_annotations) == 0) &&
(delivery_annotations != NULL))
{
if (amqpvalue_get_encoded_size(delivery_annotations, &encoded_size) != 0)
{
LogError("Cannot obtain delivery annotations encoded size");
is_error = true;
}
else
{
total_encoded_size += encoded_size;
}
}
if (is_error)
{
result = SEND_ONE_MESSAGE_ERROR;
@ -438,6 +473,62 @@ static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message
}
break;
}
case MESSAGE_BODY_TYPE_SEQUENCE:
{
AMQP_VALUE message_body_amqp_sequence;
size_t i;
if (message_get_body_amqp_sequence_count(message, &body_sequence_count) != 0)
{
LogError("Cannot get body AMQP sequence count");
result = SEND_ONE_MESSAGE_ERROR;
}
else
{
if (body_sequence_count == 0)
{
LogError("Body sequence count is zero");
result = SEND_ONE_MESSAGE_ERROR;
}
else
{
for (i = 0; i < body_sequence_count; i++)
{
if (message_get_body_amqp_sequence_in_place(message, i, &message_body_amqp_sequence) != 0)
{
LogError("Cannot get body AMQP sequence %u", (unsigned int)i);
result = SEND_ONE_MESSAGE_ERROR;
}
else
{
AMQP_VALUE body_amqp_sequence;
body_amqp_sequence = amqpvalue_create_amqp_sequence(message_body_amqp_sequence);
if (body_amqp_sequence == NULL)
{
LogError("Cannot create body AMQP sequence");
result = SEND_ONE_MESSAGE_ERROR;
}
else
{
if (amqpvalue_get_encoded_size(body_amqp_sequence, &encoded_size) != 0)
{
LogError("Cannot get body AMQP sequence encoded size");
result = SEND_ONE_MESSAGE_ERROR;
}
else
{
total_encoded_size += encoded_size;
}
amqpvalue_destroy(body_amqp_sequence);
}
}
}
}
}
break;
}
}
if (result == 0)
@ -492,6 +583,28 @@ static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message
log_message_chunk(message_sender, "Application properties:", application_properties_value);
}
if ((result == SEND_ONE_MESSAGE_OK) && (footer != NULL))
{
if (amqpvalue_encode(footer, encode_bytes, &payload) != 0)
{
LogError("Cannot encode footer value");
result = SEND_ONE_MESSAGE_ERROR;
}
log_message_chunk(message_sender, "Footer:", footer);
}
if ((result == SEND_ONE_MESSAGE_OK) && (delivery_annotations != NULL))
{
if (amqpvalue_encode(delivery_annotations, encode_bytes, &payload) != 0)
{
LogError("Cannot encode delivery annotations value");
result = SEND_ONE_MESSAGE_ERROR;
}
log_message_chunk(message_sender, "Delivery annotations:", delivery_annotations);
}
if (result == SEND_ONE_MESSAGE_OK)
{
switch (message_body_type)
@ -551,6 +664,42 @@ static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message
}
break;
}
case MESSAGE_BODY_TYPE_SEQUENCE:
{
AMQP_VALUE message_body_amqp_sequence;
size_t i;
for (i = 0; i < body_sequence_count; i++)
{
if (message_get_body_amqp_sequence_in_place(message, i, &message_body_amqp_sequence) != 0)
{
LogError("Cannot get AMQP data %u", (unsigned int)i);
result = SEND_ONE_MESSAGE_ERROR;
}
else
{
AMQP_VALUE body_amqp_sequence;
body_amqp_sequence = amqpvalue_create_amqp_sequence(message_body_amqp_sequence);
if (body_amqp_sequence == NULL)
{
LogError("Cannot create body AMQP sequence");
result = SEND_ONE_MESSAGE_ERROR;
}
else
{
if (amqpvalue_encode(body_amqp_sequence, encode_bytes, &payload) != 0)
{
LogError("Cannot encode body AMQP sequence %u", (unsigned int)i);
result = SEND_ONE_MESSAGE_ERROR;
break;
}
amqpvalue_destroy(body_amqp_sequence);
}
}
}
break;
}
}
}
@ -632,6 +781,16 @@ static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message
{
properties_destroy(properties);
}
if (footer != NULL)
{
annotations_destroy(footer);
}
if (delivery_annotations != NULL)
{
annotations_destroy(delivery_annotations);
}
}
return result;
@ -843,8 +1002,6 @@ int messagesender_close(MESSAGE_SENDER_HANDLE message_sender)
}
else
{
indicate_all_messages_as_error(message_sender);
if ((message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPENING) ||
(message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPEN))
{
@ -864,6 +1021,8 @@ int messagesender_close(MESSAGE_SENDER_HANDLE message_sender)
{
result = 0;
}
indicate_all_messages_as_error(message_sender);
}
return result;

View File

@ -156,7 +156,7 @@ AMQP_VALUE messaging_delivery_accepted(void)
return result;
}
AMQP_VALUE messaging_delivery_rejected(const char* error_condition, const char* error_description)
AMQP_VALUE messaging_delivery_rejected(const char* error_condition, const char* error_description, fields error_info)
{
AMQP_VALUE result;
REJECTED_HANDLE rejected = rejected_create();
@ -188,11 +188,20 @@ AMQP_VALUE messaging_delivery_rejected(const char* error_condition, const char*
}
else
{
if (rejected_set_error(rejected, error_handle) != 0)
if((error_info != NULL) &&
(error_set_info(error_handle, error_info) != 0))
{
LogError("Cannot set error on REJECTED state handle");
LogError("Cannot set error info on error AMQP value for REJECTED state");
error_constructing = true;
}
else
{
if (rejected_set_error(rejected, error_handle) != 0)
{
LogError("Cannot set error on REJECTED state handle");
error_constructing = true;
}
}
}
error_destroy(error_handle);