From b108bf62354ec391407e05b5fd29fa6196f70f0d Mon Sep 17 00:00:00 2001 From: Larry Osterman Date: Thu, 17 Aug 2023 09:52:54 -0700 Subject: [PATCH] Post-release updates (#4871) * Added link credit support to message sender and receiver; reworked message receiver creation to move to the partition client object * Added round trip test, including filters; fixed a hang with the consumer client caused by no messages; fixed checkpoint store storage names. * EventDataBatch is created from EventProducer. Fixes #4868 * SendEventDataBatch renamed to Send; pass in Context on methods which require a context * Added producer client send APIs without requiring an explicit event data batch * close message receiver in destructor if it is open * Consumer client and producer client only create message sender and receiver when needed --- CMakePresets.json | 5 + sdk/core/azure-core-amqp/CHANGELOG.md | 2 + .../inc/azure/core/amqp/message_receiver.hpp | 5 + .../inc/azure/core/amqp/message_sender.hpp | 17 ++ .../src/amqp/message_receiver.cpp | 10 + .../src/amqp/message_sender.cpp | 9 + .../amqp/private/message_receiver_impl.hpp | 2 +- .../src/amqp/private/message_sender_impl.hpp | 2 + sdk/eventhubs/README.md | 7 - sdk/eventhubs/assets.json | 2 +- .../azure-messaging-eventhubs/CHANGELOG.md | 13 + .../azure-messaging-eventhubs/README.md | 116 +++++++- .../messaging/eventhubs/consumer_client.hpp | 14 +- .../messaging/eventhubs/event_data_batch.hpp | 74 +++-- .../eventhubs/eventhubs_exception.hpp | 2 +- .../eventhubs/models/management_models.hpp | 2 +- .../messaging/eventhubs/partition_client.hpp | 89 +++--- .../azure/messaging/eventhubs/processor.hpp | 1 - .../messaging/eventhubs/producer_client.hpp | 44 ++- .../samples/README.md | 20 ++ .../samples/consume-events/consume_events.cpp | 20 ++ .../samples/produce-events/produce_events.cpp | 19 +- .../produce-events/produce_events_aad.cpp | 19 +- .../src/checkpoint_store.cpp | 9 +- .../src/consumer_client.cpp | 194 +++---------- .../src/event_data_batch.cpp | 19 +- .../src/partition_client.cpp | 180 +++++++++++- .../src/private/eventhubs_utilities.hpp | 61 +++- .../src/producer_client.cpp | 273 +++++++++++------- .../test-resources.json | 9 +- .../eventhubs_stress_test.cpp | 6 +- .../test/eventhubs_batch_perf_test.hpp | 10 +- .../test/ut/CMakeLists.txt | 12 +- .../test/ut/checkpoint_store_test.cpp | 24 +- .../test/ut/consumer_client_test.cpp | 34 ++- .../test/ut/processor_test.cpp | 3 +- .../test/ut/producer_client_test.cpp | 44 ++- .../test/ut/round_trip_test.cpp | 155 ++++++++++ sdk/eventhubs/ci.yml | 6 + 39 files changed, 1081 insertions(+), 452 deletions(-) delete mode 100644 sdk/eventhubs/README.md create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/test/ut/round_trip_test.cpp diff --git a/CMakePresets.json b/CMakePresets.json index ec2a09746..b20c5afbf 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -370,6 +370,11 @@ "inherits": [ "linux-basic-g++", "debug-build", "enable-tests" ], "displayName": "Linux c++ Debug+Tests" }, + { + "name": "linux-g++-debug-tests-samples", + "inherits": [ "linux-basic-g++", "debug-build", "enable-tests", "enable-samples" ], + "displayName": "Linux c++ Debug+Tests, samples" + }, { "name": "generate-doxygen", "displayName": "zz-Generate Doxygen", diff --git a/sdk/core/azure-core-amqp/CHANGELOG.md b/sdk/core/azure-core-amqp/CHANGELOG.md index 95a5c25da..e127ca88b 100644 --- a/sdk/core/azure-core-amqp/CHANGELOG.md +++ b/sdk/core/azure-core-amqp/CHANGELOG.md @@ -8,6 +8,8 @@ ### Bugs Fixed +- When a message sender is destroyed, close the underlying AMQP link if it hasn't been closed already. + ### Other Changes ## 1.0.0-beta.2 (2023-08-04) diff --git a/sdk/core/azure-core-amqp/inc/azure/core/amqp/message_receiver.hpp b/sdk/core/azure-core-amqp/inc/azure/core/amqp/message_receiver.hpp index 6c62e8960..cb790af43 100644 --- a/sdk/core/azure-core-amqp/inc/azure/core/amqp/message_receiver.hpp +++ b/sdk/core/azure-core-amqp/inc/azure/core/amqp/message_receiver.hpp @@ -74,6 +74,11 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal { /** @brief The Maximum message size for the link associated with the message receiver. */ Nullable MaxMessageSize; + /** @brief The default link credit used when communicating with the service. The link credit + * defines the maximum number of messages which can be outstanding between the service and the + * client. */ + uint32_t MaxLinkCredit{}; + /** @brief Attach properties for the link associated with the message receiver. */ Models::AmqpMap Properties; diff --git a/sdk/core/azure-core-amqp/inc/azure/core/amqp/message_sender.hpp b/sdk/core/azure-core-amqp/inc/azure/core/amqp/message_sender.hpp index 3159d7214..0470a4267 100644 --- a/sdk/core/azure-core-amqp/inc/azure/core/amqp/message_sender.hpp +++ b/sdk/core/azure-core-amqp/inc/azure/core/amqp/message_sender.hpp @@ -90,6 +90,17 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal { /** @brief The Maximum message size for the link associated with the message sender. */ Nullable MaxMessageSize; + /** @brief The link maximum credits. + * + * Each message sent over a link reduces the link-credit by one. When the link-credit reaches + * zero, no more messages can be sent until the sender receives a disposition indicating that at + * least one message has been settled. The sender MAY send as many messages as it likes before + * receiving a disposition, but it MUST NOT send more messages than the link-credit. The sender + * MUST NOT send any messages after sending a disposition that indicates an error. + * + */ + uint32_t MaxLinkCredits{}; + /** @brief The initial delivery count for the link associated with the message. * * The delivery-count is initialized by the sender when a link endpoint is created, and is @@ -130,6 +141,12 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal { */ void Close(); + /** @brief Returns the link negotiated maximum message size + * + * @return The negotiated maximum message size. + */ + std::uint64_t GetMaxMessageSize() const; + /** @brief Send a message synchronously to the target of the message sender. * * @param message The message to send. diff --git a/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp b/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp index 92c7fc704..d5f9d4402 100644 --- a/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp +++ b/sdk/core/azure-core-amqp/src/amqp/message_receiver.cpp @@ -133,6 +133,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { { m_link->SetMaxMessageSize(std::numeric_limits::max()); } + if (m_options.MaxLinkCredit != 0) + { + m_link->SetMaxLinkCredit(m_options.MaxLinkCredit); + } m_link->SetAttachProperties(static_cast(m_options.Properties)); } @@ -194,6 +198,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { { m_eventHandler = nullptr; } + if (m_receiverOpen) + { + Close(); + } } MessageReceiverState MessageReceiverStateFromLowLevel(MESSAGE_RECEIVER_STATE lowLevel) @@ -304,6 +312,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { "Could not open message receiver. errno=" + std::to_string(err) + ", \"" + buf + "\"."); // LCOV_EXCL_STOP } + m_receiverOpen = true; } void MessageReceiverImpl::Close() @@ -312,6 +321,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { { throw std::runtime_error("Could not close message receiver"); // LCOV_EXCL_LINE } + m_receiverOpen = false; } std::string MessageReceiverImpl::GetLinkName() const diff --git a/sdk/core/azure-core-amqp/src/amqp/message_sender.cpp b/sdk/core/azure-core-amqp/src/amqp/message_sender.cpp index 04fa4cc15..282ee990f 100644 --- a/sdk/core/azure-core-amqp/src/amqp/message_sender.cpp +++ b/sdk/core/azure-core-amqp/src/amqp/message_sender.cpp @@ -51,6 +51,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal { return m_impl->QueueSend(message, onSendComplete, context); } + std::uint64_t MessageSender::GetMaxMessageSize() const { return m_impl->GetMaxMessageSize(); } + MessageSender::~MessageSender() noexcept {} }}}} // namespace Azure::Core::Amqp::_internal @@ -107,6 +109,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { m_target); PopulateLinkProperties(); } + void MessageSenderImpl::CreateLink() { m_link = std::make_shared<_detail::LinkImpl>( @@ -149,9 +152,15 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { { m_link->SetMaxMessageSize(std::numeric_limits::max()); } + if (m_options.MaxLinkCredits != 0) + { + m_link->SetMaxLinkCredit(m_options.MaxLinkCredits); + } m_link->SetSenderSettleMode(m_options.SettleMode); } + std::uint64_t MessageSenderImpl::GetMaxMessageSize() const { return m_link->GetMaxMessageSize(); } + _internal::MessageSenderState MessageSenderStateFromLowLevel(MESSAGE_SENDER_STATE lowLevel) { switch (lowLevel) diff --git a/sdk/core/azure-core-amqp/src/amqp/private/message_receiver_impl.hpp b/sdk/core/azure-core-amqp/src/amqp/private/message_receiver_impl.hpp index e3b13e85c..4f76585a3 100644 --- a/sdk/core/azure-core-amqp/src/amqp/private/message_receiver_impl.hpp +++ b/sdk/core/azure-core-amqp/src/amqp/private/message_receiver_impl.hpp @@ -67,13 +67,13 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { void Close(); std::string GetLinkName() const; std::string GetSourceName() const { return static_cast(m_source.GetAddress()); } - uint32_t GetReceivedMessageId(); std::pair, Models::_internal::AmqpError> WaitForIncomingMessage(Context const& context); private: UniqueMessageReceiver m_messageReceiver{}; + bool m_receiverOpen{false}; std::shared_ptr<_detail::LinkImpl> m_link; _internal::MessageReceiverOptions m_options; Models::_internal::MessageSource m_source; diff --git a/sdk/core/azure-core-amqp/src/amqp/private/message_sender_impl.hpp b/sdk/core/azure-core-amqp/src/amqp/private/message_sender_impl.hpp index 2956d655a..d07f3a4e9 100644 --- a/sdk/core/azure-core-amqp/src/amqp/private/message_sender_impl.hpp +++ b/sdk/core/azure-core-amqp/src/amqp/private/message_sender_impl.hpp @@ -62,6 +62,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail { Azure::Core::Amqp::_internal::MessageSender::MessageSendCompleteCallback onSendComplete, Context const& context); + std::uint64_t GetMaxMessageSize() const; + private: static void OnMessageSenderStateChangedFn( void* context, diff --git a/sdk/eventhubs/README.md b/sdk/eventhubs/README.md deleted file mode 100644 index 909d15063..000000000 --- a/sdk/eventhubs/README.md +++ /dev/null @@ -1,7 +0,0 @@ -# Azure Messaging Librares for C++ - -The Azure Messaging EventHubs Client Library for C++ allows you to build applications against Microsoft Azure Messaging EventHubs service. For an overview of Azure Messaging, see [Introduction to Microsoft Azure Messaging](https://learn.microsoft.com/azure/messaging-services/). - -## Latest release - -Find the latest Messaging clients for C++ releases [here](https://azure.github.io/azure-sdk/releases/latest/cpp.html). diff --git a/sdk/eventhubs/assets.json b/sdk/eventhubs/assets.json index ae81886b3..28a533e27 100644 --- a/sdk/eventhubs/assets.json +++ b/sdk/eventhubs/assets.json @@ -2,5 +2,5 @@ "AssetsRepo": "Azure/azure-sdk-assets", "AssetsRepoPrefixPath": "cpp", "TagPrefix": "cpp/eventhubs", - "Tag": "cpp/eventhubs_3d146641d9" + "Tag": "cpp/eventhubs_ea4655bf2e" } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md index 3a4d415fb..c99dc7040 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md +++ b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md @@ -4,12 +4,25 @@ ### Features Added +- `ProducerClient` now has convenience methods for sending events without batching. +- Added `std::ostream` insertion operators for model types to simplify debugging. + ### Breaking Changes +- Storage names used for checkpoint store have been normalized to match behavior of other Azure SDK eventhubs packages. +- `EventDataBatch` object can no longer be directly created but instead must be created via `ProducerClient::CreateEventDataBatch`. +- `EventDataBatch::AddMessage` method has been renamed to `EventDataBatch::TryAddMessage` and it now returns false if the message will not fit. +- `SendEventDataBatch` method has been renamed to `Send` and it now returns a void (throwing an exception of the send fails). + ### Bugs Fixed +- Setting `PartitionClientOptions::StartPosition::EnqueuedTime` now works as expected. +- Internally restructured how AMQP senders and receivers are configured to simplify code and significantly improve reliability. + ### Other Changes +- Azure CLI examples added to README.md file. + ## 1.0.0-beta.1 (2023-08-08) ### Features Added diff --git a/sdk/eventhubs/azure-messaging-eventhubs/README.md b/sdk/eventhubs/azure-messaging-eventhubs/README.md index 3f150c4b9..db8eb7537 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/README.md +++ b/sdk/eventhubs/azure-messaging-eventhubs/README.md @@ -31,6 +31,103 @@ vcpkg install azure-messaging-eventhubs-cpp - An [Event Hub namespace](https://docs.microsoft.com/azure/event-hubs/). - An Event Hub. You can create an event hub in your Event Hubs Namespace using the [Azure Portal](https://docs.microsoft.com/azure/event-hubs/event-hubs-create), or the [Azure CLI](https://docs.microsoft.com/azure/event-hubs/event-hubs-quickstart-cli). +#### Create a namespace using the Azure CLI + +Login to the CLI: +```pwsh +az login +``` + +Create a resource group: +```pwsh +az group create --name --location --subscription +``` + +This should output something like: +```json +{ + "id": "/subscriptions//resourceGroups/", + "location": "", + "managedBy": null, + "name": "", + "properties": { + "provisioningState": "Succeeded" + }, + "tags": null, + "type": "Microsoft.Resources/resourceGroups" +} +``` + +Create an EventHubs instance: +```pwsh + az eventhubs namespace create --resource-group --name --sku Standard --subscription + ``` + +This should output something like: + +```json +{ + "createdAt": "2023-08-10T18:41:54.19Z", + "disableLocalAuth": false, + "id": "/subscriptions//resourceGroups//providers/Microsoft.EventHub/namespaces/", + "isAutoInflateEnabled": false, + "kafkaEnabled": true, + "location": "West US", + "maximumThroughputUnits": 0, + "metricId": "REDACTED", + "minimumTlsVersion": "1.2", + "name": "", + "provisioningState": "Succeeded", + "publicNetworkAccess": "Enabled", + "resourceGroup": "", + "serviceBusEndpoint": "https://.servicebus.windows.net:443/", + "sku": { + "capacity": 1, + "name": "Standard", + "tier": "Standard" + }, + "status": "Active", + "tags": {}, + "type": "Microsoft.EventHub/Namespaces", + "updatedAt": "2023-08-10T18:42:41.343Z", + "zoneRedundant": false +} +``` + +Create an EventHub: + +```pwsh +az eventhubs eventhub create --resource-group --namespace-name --name +``` + +That should output something like: + +```json +{ + "createdAt": "2023-08-10T21:02:07.62Z", + "id": "/subscriptions//resourceGroups//providers/Microsoft.EventHub/namespaces//eventhubs/", + "location": "westus", + "messageRetentionInDays": 7, + "name": "", + "partitionCount": 4, + "partitionIds": [ + "0", + "1", + "2", + "3" + ], + "resourceGroup": "", + "retentionDescription": { + "cleanupPolicy": "Delete", + "retentionTimeInHours": 168 + }, + "status": "Active", + "type": "Microsoft.EventHub/namespaces/eventhubs", + "updatedAt": "2023-08-10T21:02:16.29Z" +} +``` + + ### Authenticate the client Event Hub clients are created using a credential from the [Azure Identity package][azure_identity_pkg], like [DefaultAzureCredential][default_azure_credential]. @@ -38,8 +135,8 @@ Alternatively, you can create a client using a connection string. #### Using a service principal - - ConsumerClient: [link](https://azure.github.io/azure-sdk-for-cpp/storage.html) - - ProducerClient: [link](https://azure.github.io/azure-sdk-for-cpp/storage.html) + - ConsumerClient: [link][consumer_client] + - ProducerClient: [link][producer_client] #### Using a connection string @@ -56,7 +153,7 @@ store events. Events are published to an event hub using an [event publisher](https://docs.microsoft.com/azure/event-hubs/event-hubs-features#event-publishers). In this package, the event publisher is the [ProducerClient](https://azure.github.io/azure-sdk-for-cpp/storage.html) Events can be consumed from an event hub using an [event consumer](https://docs.microsoft.com/azure/event-hubs/event-hubs-features#event-consumers). In this package there are two types for consuming events: -- The basic event consumer is the PartitionClient, in the [ConsumerClient](https://azure.github.io/azure-sdk-for-cpp/storage.html). This consumer is useful if you already known which partitions you want to receive from. +- The basic event consumer is the PartitionClient, in the [ConsumerClient][consumer_client]. This consumer is useful if you already known which partitions you want to receive from. - A distributed event consumer, which uses Azure Blobs for checkpointing and coordination. This is implemented in the [Processor](https://azure.github.io/azure-sdk-for-cpp/storage.html). The Processor is useful when you want to have the partition assignment be dynamically chosen, and balanced with other Processor instances. @@ -65,15 +162,16 @@ More information about Event Hubs features and terminology can be found here: [l # Examples - -Examples for various scenarios can be found on [azure.github.io](https://azure.github.io/azure-sdk-for-cpp/storage.html) or in the samples directory in our GitHub repo for -[EventHubs](https://github.com/Azure/azure-sdk-for-cpp/blob/main/sdk/eventhubs). +Examples for various scenarios can be found on [azure.github.io](https://azure.github.io/azure-sdk-for-cpp/eventhubs.html) or in the samples directory in our GitHub repo for +[EventHubs](https://github.com/Azure/azure-sdk-for-cpp/tree/main/sdk/eventhubs/azure-messaging-eventhubs/samples). ## Send events The following example shows how to send events to an event hub: ```cpp +#include + // Your Event Hubs namespace connection string is available in the Azure portal. std::string connectionString = ""; std::string eventHubName = ""; @@ -101,6 +199,9 @@ auto result = client.SendEventDataBatch(eventBatch); The following example shows how to receive events from partition 1 on an event hub: ```cpp +#include + + // Your Event Hubs namespace connection string is available in the Azure portal. std::string connectionString = ""; std::string eventHubName = ""; @@ -160,6 +261,9 @@ Azure SDK for C++ is licensed under the [MIT](https://github.com/Azure/azure-sdk [azure_sdk_for_cpp_contributing_developer_guide]: https://github.com/Azure/azure-sdk-for-cpp/blob/main/CONTRIBUTING.md#developer-guide [azure_sdk_for_cpp_contributing_pull_requests]: https://github.com/Azure/azure-sdk-for-cpp/blob/main/CONTRIBUTING.md#pull-requests +[consumer_client]: https://azuresdkdocs.blob.core.windows.net/$web/cpp/azure-messaging-eventhubs/latest/class_azure_1_1_messaging_1_1_event_hubs_1_1_consumer_client.html +[producer_client]: https://azuresdkdocs.blob.core.windows.net/$web/cpp/azure-messaging-eventhubs/1.0.0-beta.1/class_azure_1_1_messaging_1_1_event_hubs_1_1_producer_client.html + [source]: https://github.com/Azure/azure-sdk-for-cpp/tree/main/sdk/eventhubs [azure_identity_pkg]: https://azuresdkdocs.blob.core.windows.net/$web/cpp/azure-identity/latest/index.html [default_azure_credential]: https://azuresdkdocs.blob.core.windows.net/$web/cpp/azure-identity/latest/index.html#defaultazurecredential diff --git a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/consumer_client.hpp b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/consumer_client.hpp index aca11d789..1ef8ca71a 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/consumer_client.hpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/consumer_client.hpp @@ -32,9 +32,6 @@ namespace Azure { namespace Messaging { namespace EventHubs { */ Azure::Core::Http::Policies::RetryOptions RetryOptions{}; - /** @brief Maximum message size for messages being sent. */ - Azure::Nullable MaxMessageSize; - /** @brief Name of the consumer client. */ std::string Name{}; }; @@ -133,10 +130,12 @@ namespace Azure { namespace Messaging { namespace EventHubs { * * @param partitionId targeted partition * @param options client options + * @param context The context for the operation can be used for request cancellation. */ PartitionClient CreatePartitionClient( - std::string partitionId, - PartitionClientOptions const& options = {}); + std::string const& partitionId, + PartitionClientOptions const& options = {}, + Azure::Core::Context const& context = {}); /**@brief GetEventHubProperties gets properties of an eventHub. This includes data * like name, and partitions. @@ -156,6 +155,9 @@ namespace Azure { namespace Messaging { namespace EventHubs { Core::Context const& context = {}); private: + void EnsureSession(std::string const& partitionId = {}); + Azure::Core::Amqp::_internal::Session GetSession(std::string const& partitionId = {}); + /// The connection string for the Event Hubs namespace std::string m_connectionString; @@ -181,7 +183,5 @@ namespace Azure { namespace Messaging { namespace EventHubs { /// @brief The options used to configure the consumer client. ConsumerClientOptions m_consumerClientOptions; - - std::string GetStartExpression(Models::StartPosition const& startPosition); }; }}} // namespace Azure::Messaging::EventHubs diff --git a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/event_data_batch.hpp b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/event_data_batch.hpp index 6929a35e2..39dc51f8d 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/event_data_batch.hpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/event_data_batch.hpp @@ -13,6 +13,10 @@ // cspell: words vbin +namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail { + class EventDataBatchFactory; +}}}} // namespace Azure::Messaging::EventHubs::_detail + namespace Azure { namespace Messaging { namespace EventHubs { /** @brief EventDataBatchOptions contains optional parameters for the @@ -27,7 +31,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { /** @brief MaxBytes overrides the max size (in bytes) for a batch. * By default CreateEventDataBatch will use the max message size provided by the service. */ - uint32_t MaxBytes = std::numeric_limits::max(); + Azure::Nullable MaxBytes; /** @brief PartitionKey is hashed to calculate the partition assignment.Messages and message * batches with the same PartitionKey are guaranteed to end up in the same partition. @@ -54,7 +58,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { std::mutex m_rwMutex; std::string m_partitionId; std::string m_partitionKey; - uint64_t m_maxBytes; + Azure::Nullable m_maxBytes; std::vector> m_marshalledMessages; // Annotation properties const uint32_t BatchedMessageFormat = 0x80013700; @@ -89,26 +93,6 @@ namespace Azure { namespace Messaging { namespace EventHubs { return *this; } - /** @brief Event Data Batch constructor - * - * @param options Options settings for creating the data batch - */ - EventDataBatch(EventDataBatchOptions options = {}) - : m_partitionId{options.PartitionId}, m_partitionKey{options.PartitionKey}, - m_maxBytes{options.MaxBytes ? options.MaxBytes : std::numeric_limits::max()}, - m_marshalledMessages{}, m_batchEnvelope{}, m_currentSize{0} - { - if (!options.PartitionId.empty() && !options.PartitionKey.empty()) - { - throw std::runtime_error("Either PartionID or PartitionKey can be set, but not both."); - } - - if (options.PartitionId.empty()) - { - m_partitionId = anyPartitionId; - } - }; - /** @brief Gets the partition ID for the data batch * * @return std::string @@ -124,19 +108,26 @@ namespace Azure { namespace Messaging { namespace EventHubs { * * @return uint64_t */ - uint64_t GetMaxBytes() const { return m_maxBytes; } + uint64_t GetMaxBytes() const { return m_maxBytes.Value(); } - /** @brief Adds a message to the data batch + /** @brief Attempts to add a raw AMQP message to the data batch + * + * @param message The AMQP message to add to the batch + * + * @returns true if the message was added to the batch, false otherwise. + */ + bool TryAddMessage(Azure::Core::Amqp::Models::AmqpMessage const& message) + { + return TryAddAmqpMessage(message); + } + + /** @brief Attempts to add a message to the data batch * * @param message The message to add to the batch - */ - void AddMessage(Azure::Core::Amqp::Models::AmqpMessage message) { AddAmqpMessage(message); } - - /** @brief Adds a message to the data batch * - * @param message The message to add to the batch + * @returns true if the message was added to the batch, false otherwise. */ - void AddMessage(Azure::Messaging::EventHubs::Models::EventData& message); + bool TryAddMessage(Azure::Messaging::EventHubs::Models::EventData const& message); /** @brief Gets the number of messages in the batch * @@ -155,7 +146,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { Azure::Core::Amqp::Models::AmqpMessage ToAmqpMessage() const; private: - void AddAmqpMessage(Azure::Core::Amqp::Models::AmqpMessage message); + bool TryAddAmqpMessage(Azure::Core::Amqp::Models::AmqpMessage message); size_t CalculateActualSizeForPayload(std::vector const& payload) { @@ -179,5 +170,26 @@ namespace Azure { namespace Messaging { namespace EventHubs { batchEnvelope.MessageFormat = BatchedMessageFormat; return batchEnvelope; } + + /** @brief Event Data Batch constructor + * + * @param options Options settings for creating the data batch + */ + EventDataBatch(EventDataBatchOptions options = {}) + : m_partitionId{options.PartitionId}, m_partitionKey{options.PartitionKey}, + m_maxBytes{options.MaxBytes}, m_marshalledMessages{}, m_batchEnvelope{}, m_currentSize{0} + { + if (!options.PartitionId.empty() && !options.PartitionKey.empty()) + { + throw std::runtime_error("Either PartionID or PartitionKey can be set, but not both."); + } + + if (options.PartitionId.empty()) + { + m_partitionId = anyPartitionId; + } + }; + + friend class _detail::EventDataBatchFactory; }; }}} // namespace Azure::Messaging::EventHubs diff --git a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/eventhubs_exception.hpp b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/eventhubs_exception.hpp index cdecce39e..e4ce140af 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/eventhubs_exception.hpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/eventhubs_exception.hpp @@ -62,7 +62,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { * If this field is set to true, then retrying the operation may succeed at a later time. * */ - bool IsTransient; + bool IsTransient{}; friend _detail::EventHubsExceptionFactory; }; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/management_models.hpp b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/management_models.hpp index 247ae2fde..7b1d02b37 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/management_models.hpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/management_models.hpp @@ -41,7 +41,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models { /** The sequence number of the last observed event to be enqueued in the partition. */ int64_t LastEnqueuedSequenceNumber{}; /** The offset of the last observed event to be enqueued in the partition */ - std::string LastEnqueuedOffset; + int64_t LastEnqueuedOffset{}; /** The date and time, in UTC, that the last observed event was enqueued in the partition. */ Azure::DateTime LastEnqueuedTimeUtc; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/partition_client.hpp b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/partition_client.hpp index 8e6ee8d28..b9dfb669c 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/partition_client.hpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/partition_client.hpp @@ -9,7 +9,11 @@ #include #include #include + namespace Azure { namespace Messaging { namespace EventHubs { + namespace _detail { + class PartitionClientFactory; + } /**brief PartitionClientOptions provides options for the ConsumerClient::CreatePartitionClient * function. */ @@ -50,24 +54,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { * This type is instantiated from the [ConsumerClient] type, using * [ConsumerClient.CreatePartitionClient]. */ - class PartitionClient final { - - /// The message receivers used to receive events from the partition. - std::vector m_receivers{}; - - /// The name of the offset to start receiving events from. - std::string m_offsetExpression; - - /// The options used to create the PartitionClient. - PartitionClientOptions m_partitionOptions; - - /// The name of the partition. - std::string m_partitionId; - - /** @brief RetryOptions controls how many times we should retry an operation in - * response to being throttled or encountering a transient error. - */ - Azure::Core::Http::Policies::RetryOptions RetryOptions{}; + class PartitionClient final : private Azure::Core::Amqp::_internal::MessageReceiverEvents { public: /// Create a PartitionClient from another PartitionClient @@ -76,6 +63,10 @@ namespace Azure { namespace Messaging { namespace EventHubs { /// Assign a PartitionClient to another PartitionClient PartitionClient& operator=(PartitionClient const& other) = default; + /** Destroy this partition client. + */ + virtual ~PartitionClient(); + /** Receive events from the partition. * * @param maxMessages The maximum number of messages to receive. @@ -89,32 +80,54 @@ namespace Azure { namespace Messaging { namespace EventHubs { /** @brief Closes the connection to the Event Hub service. */ - void Close() - { - for (size_t i = 0; i < m_receivers.size(); i++) - { - m_receivers[i].Close(); - } - } + void Close() { m_receiver.Close(); } + + private: + friend class _detail::PartitionClientFactory; + /// The message receiver used to receive events from the partition. + Azure::Core::Amqp::_internal::MessageReceiver m_receiver; + + /// The name of the offset to start receiving events from. + // std::string m_offsetExpression; + + /// The options used to create the PartitionClient. + PartitionClientOptions m_partitionOptions; + + /// The name of the partition. + // std::string m_partitionId; + + /** @brief RetryOptions controls how many times we should retry an operation in + * response to being throttled or encountering a transient error. + */ + Azure::Core::Http::Policies::RetryOptions m_retryOptions{}; + + // Azure::Core::Amqp::Common::_internal::AsyncOperationQueue< + // Azure::Core::Amqp::Models::AmqpMessage, + // Azure::Core::Amqp::Models::_internal::AmqpError> + // m_receivedMessageQueue; /** Creates a new PartitionClient * - * @param options The options used to create the PartitionClient. - * @param retryOptions The retry options used to create the PartitionClient. - * + * @param messageReceiver Message Receiver for the partition client. + * @param options options used to create the PartitionClient. + * @param retryOptions controls how many times we should retry an operation in response to being + * throttled or encountering a transient error. */ PartitionClient( + Azure::Core::Amqp::_internal::MessageReceiver const& messageReceiver, PartitionClientOptions options, - Azure::Core::Http::Policies::RetryOptions retryOptions) - { - m_partitionOptions = options; - RetryOptions = retryOptions; - } + Azure::Core::Http::Policies::RetryOptions retryOptions); - /// @brief Push the message receiver back to the vector of receivers. - void PushBackReceiver(Azure::Core::Amqp::_internal::MessageReceiver& receiver) - { - m_receivers.push_back(std::move(receiver)); - } + std::string GetStartExpression(Models::StartPosition const& startPosition); + + virtual void OnMessageReceiverStateChanged( + Azure::Core::Amqp::_internal::MessageReceiver const& receiver, + Azure::Core::Amqp::_internal::MessageReceiverState newState, + Azure::Core::Amqp::_internal::MessageReceiverState oldState); + virtual Azure::Core::Amqp::Models::AmqpValue OnMessageReceived( + Azure::Core::Amqp::_internal::MessageReceiver const& receiver, + Azure::Core::Amqp::Models::AmqpMessage const& message); + virtual void OnMessageReceiverDisconnected( + Azure::Core::Amqp::Models::_internal::AmqpError const& error); }; }}} // namespace Azure::Messaging::EventHubs diff --git a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/processor.hpp b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/processor.hpp index 504c06dbd..dc79a89be 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/processor.hpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/processor.hpp @@ -63,7 +63,6 @@ namespace Azure { namespace Messaging { namespace EventHubs { */ class Processor final { #ifdef TESTING_BUILD_AMQP - friend class Test::ProcessorTest_LoadBalancing_Test; #endif diff --git a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/producer_client.hpp b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/producer_client.hpp index 0ad20d68c..cd8555dcb 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/producer_client.hpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/producer_client.hpp @@ -113,14 +113,43 @@ namespace Azure { namespace Messaging { namespace EventHubs { m_senders.clear(); } - /**@brief Proceeds to send and EventDataBatch + /** @brief Create a new EventDataBatch to be sent to the Event Hub. + * + * @param options Optional batch options + * @param context Context for the operation can be used for request cancellation. + * + * @return newly created EventDataBatch object. + */ + EventDataBatch CreateBatch( + EventDataBatchOptions const& options = {}, + Azure::Core::Context const& context = {}); + + /**@brief Send an EventDataBatch to the remote Event Hub. * * @param eventDataBatch Batch to send * @param context Request context */ - bool SendEventDataBatch( - EventDataBatch const& eventDataBatch, - Core::Context const& context = {}); + void Send(EventDataBatch const& eventDataBatch, Core::Context const& context = {}); + + /**@brief Send an EventData to the remote Event Hub. + * + * @remark This method will create a new EventDataBatch and add the event to it. If the event + * exceeds the maximum size allowed by the Event Hubs service, an exception will be thrown. + * + * @param eventData event to send + * @param context Request context + */ + void Send(Models::EventData const& eventData, Core::Context const& context = {}); + + /**@brief Send a vector of EventData items to the remote Event Hub. + * + * @remark This method will create a new EventDataBatch and add the events to it. If the events + * exceeds the maximum size allowed by the Event Hubs service, an exception will be thrown. + * + * @param eventData events to send + * @param context Request context + */ + void Send(std::vector const& eventData, Core::Context const& context = {}); /**@brief GetEventHubProperties gets properties of an eventHub. This includes data * like name, and partitions. @@ -141,7 +170,12 @@ namespace Azure { namespace Messaging { namespace EventHubs { Core::Context const& context = {}); private: + void EnsureSender( + std::string const& partitionId = "", + Azure::Core::Context const& context = {}); Azure::Core::Amqp::_internal::MessageSender GetSender(std::string const& partitionId = ""); - void CreateSender(std::string const& partitionId = ""); + + void EnsureSession(std::string const& partitionId); + Azure::Core::Amqp::_internal::Session GetSession(std::string const& partitionId = ""); }; }}} // namespace Azure::Messaging::EventHubs diff --git a/sdk/eventhubs/azure-messaging-eventhubs/samples/README.md b/sdk/eventhubs/azure-messaging-eventhubs/samples/README.md index 502ae9733..b80b9efb4 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/samples/README.md +++ b/sdk/eventhubs/azure-messaging-eventhubs/samples/README.md @@ -20,6 +20,26 @@ granted access to the eventhubs service instance. The tests also assume that the currently logged on user is authorized to call into the Event Hubs service instance because they use [Azure::Core::Credentials::TokenCredential](https://azuresdkdocs.blob.core.windows.net/$web/cpp/azure-core/1.3.1/class_azure_1_1_core_1_1_credentials_1_1_token_credential.html) for authorization. +### Setting Environment Variables + +For the samples which use a connection string, the connection string can be retrieved using the Azure CLI with the following: + +```pwsh +az eventhubs namespace authorization-rule keys list --resource-group --namespace-name --name RootManageSharedAccessKey +``` + +```json +{ + "keyName": "RootManageSharedAccessKey", + "primaryConnectionString": "Endpoint=sb://REDACTED.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=REDACTED", + "primaryKey": "REDACTED", + "secondaryConnectionString": "Endpoint=sb://REDACTED.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=REDACTED", + "secondaryKey": "REDACTED" +} +``` + +The value of the `primaryConnectionString` property should be used as the `EVENTHUBS_CONNECTION_STRING` environment variable. + ## Samples diff --git a/sdk/eventhubs/azure-messaging-eventhubs/samples/consume-events/consume_events.cpp b/sdk/eventhubs/azure-messaging-eventhubs/samples/consume-events/consume_events.cpp index 834bbba20..5b781bbdb 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/samples/consume-events/consume_events.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/samples/consume-events/consume_events.cpp @@ -11,8 +11,13 @@ // Both of these should be available from the Azure portal. // +#include +#include #include +using namespace Azure::Core::Diagnostics; +using namespace Azure::Core::Diagnostics::_internal; + #include int main() @@ -38,6 +43,7 @@ int main() // Retrieve properties about the EventHubs instance just created. auto eventhubProperties{consumerClient.GetEventHubProperties()}; std::cout << "Created event hub, properties: " << eventhubProperties << std::endl; + Log::Stream(Logger::Level::Verbose) << "Created event hub, properties: " << eventhubProperties; // Retrieve properties about the EventHubs instance just created. auto partitionProperties{ @@ -50,6 +56,20 @@ int main() Azure::Messaging::EventHubs::PartitionClientOptions partitionClientOptions; partitionClientOptions.StartPosition.Earliest = true; partitionClientOptions.StartPosition.Inclusive = true; + + Log::Stream(Logger::Level::Verbose) + << "Creating partition client. Start position: " << partitionClientOptions.StartPosition; + Log::Stream(Logger::Level::Verbose) + << "Creating partition client. Start position: " << partitionClientOptions.StartPosition; + + Log::Stream(Logger::Level::Verbose) << "earliest: HasValue: " << std::boolalpha + << partitionClientOptions.StartPosition.Earliest.HasValue(); + if (partitionClientOptions.StartPosition.Earliest.HasValue()) + { + std::cerr << "earliest: Value: " << std::boolalpha + << partitionClientOptions.StartPosition.Earliest.Value() << std::endl; + } + Azure::Messaging::EventHubs::PartitionClient partitionClient{consumerClient.CreatePartitionClient( eventhubProperties.PartitionIds[0], partitionClientOptions)}; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/samples/produce-events/produce_events.cpp b/sdk/eventhubs/azure-messaging-eventhubs/samples/produce-events/produce_events.cpp index b667e986d..75cb4d175 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/samples/produce-events/produce_events.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/samples/produce-events/produce_events.cpp @@ -44,42 +44,35 @@ int main() // configure this batch processor to send to that partition. Azure::Messaging::EventHubs::EventDataBatchOptions batchOptions; batchOptions.PartitionId = eventhubProperties.PartitionIds[0]; - Azure::Messaging::EventHubs::EventDataBatch batch(batchOptions); + Azure::Messaging::EventHubs::EventDataBatch batch(producerClient.CreateBatch(batchOptions)); // Send an event with a simple binary body. { Azure::Messaging::EventHubs::Models::EventData event; event.Body = {1, 3, 5, 7}; event.MessageId = "test-message-id"; - batch.AddMessage(event); + batch.TryAddMessage(event); } { Azure::Messaging::EventHubs::Models::EventData event; event.Body = {2, 4, 6, 8, 10}; - batch.AddMessage(event); + batch.TryAddMessage(event); } // Send an event with a body initialized at EventData constructor time. { Azure::Messaging::EventHubs::Models::EventData event{1, 1, 2, 3, 5, 8}; event.MessageId = "test-message-id-fibonacci"; - batch.AddMessage(event); + batch.TryAddMessage(event); } // Send an event with a UTF-8 encoded string body. { Azure::Messaging::EventHubs::Models::EventData event{"Hello Eventhubs!"}; event.MessageId = "test-message-id-hellowworld"; - batch.AddMessage(event); + batch.TryAddMessage(event); } - if (!producerClient.SendEventDataBatch(batch)) - { - std::cerr << "Failed to send message to the Event Hub instance." << std::endl; - } - else - { - std::cout << "Sent message to the Event Hub instance." << std::endl; - } + producerClient.Send(batch); } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/samples/produce-events/produce_events_aad.cpp b/sdk/eventhubs/azure-messaging-eventhubs/samples/produce-events/produce_events_aad.cpp index 99d46e5ae..2d2aa4701 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/samples/produce-events/produce_events_aad.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/samples/produce-events/produce_events_aad.cpp @@ -54,38 +54,31 @@ int main() // configure this batch processor to send to that partition. Azure::Messaging::EventHubs::EventDataBatchOptions batchOptions; batchOptions.PartitionId = eventhubProperties.PartitionIds[0]; - Azure::Messaging::EventHubs::EventDataBatch batch(batchOptions); + Azure::Messaging::EventHubs::EventDataBatch batch{producerClient.CreateBatch(batchOptions)}; // Send an event with a simple binary body. { Azure::Messaging::EventHubs::Models::EventData event; event.Body = {1, 3, 5, 7}; event.MessageId = "test-message-id"; - batch.AddMessage(event); + batch.TryAddMessage(event); } { Azure::Messaging::EventHubs::Models::EventData event; event.Body = {2, 4, 6, 8, 10}; event.MessageId = "test-message-id-2"; - batch.AddMessage(event); + batch.TryAddMessage(event); } { Azure::Messaging::EventHubs::Models::EventData event{1, 1, 2, 3, 5, 8}; event.MessageId = "test-message-id5"; - batch.AddMessage(event); + batch.TryAddMessage(event); } { Azure::Messaging::EventHubs::Models::EventData event{"Hello Eventhubs via AAD!"}; event.MessageId = "test-message-id4"; - batch.AddMessage(event); + batch.TryAddMessage(event); } - if (!producerClient.SendEventDataBatch(batch)) - { - std::cerr << "Failed to send message to the Event Hub instance." << std::endl; - } - else - { - std::cout << "Sent message to the Event Hub instance." << std::endl; - } + producerClient.Send(batch); } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/checkpoint_store.cpp b/sdk/eventhubs/azure-messaging-eventhubs/src/checkpoint_store.cpp index 5a67f0cf9..f657f726f 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/checkpoint_store.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/checkpoint_store.cpp @@ -26,7 +26,9 @@ std::string Azure::Messaging::EventHubs::Models::Ownership::GetOwnershipPrefixNa throw std::runtime_error("missing ownership fields"); } std::stringstream strstr; - strstr << FullyQualifiedNamespace << "/" << EventHubName << "/" << ConsumerGroup << "/ownership/"; + strstr << Azure::Core::_internal::StringExtensions::ToLower(FullyQualifiedNamespace) << "/" + << Azure::Core::_internal::StringExtensions::ToLower(EventHubName) << "/" + << Azure::Core::_internal::StringExtensions::ToLower(ConsumerGroup) << "/ownership/"; return strstr.str(); } @@ -38,8 +40,9 @@ std::string Azure::Messaging::EventHubs::Models::Checkpoint::GetCheckpointBlobPr throw std::runtime_error("missing checkpoint fields"); } std::stringstream strstr; - strstr << FullyQualifiedNamespaceName << "/" << EventHubName << "/" << ConsumerGroup - << "/checkpoint/"; + strstr << Azure::Core::_internal::StringExtensions::ToLower(FullyQualifiedNamespaceName) << "/" + << Azure::Core::_internal::StringExtensions::ToLower(EventHubName) << "/" + << Azure::Core::_internal::StringExtensions::ToLower(ConsumerGroup) << "/checkpoint/"; return strstr.str(); } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/consumer_client.cpp b/sdk/eventhubs/azure-messaging-eventhubs/src/consumer_client.cpp index 82ae34a7f..2dbbd95e9 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/consumer_client.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/consumer_client.cpp @@ -48,179 +48,69 @@ namespace Azure { namespace Messaging { namespace EventHubs { + m_consumerGroup; } - namespace { - struct FilterDescription + void ConsumerClient::EnsureSession(std::string const& partitionId) + { + if (m_sessions.find(partitionId) == m_sessions.end()) { - std::string Name; - std::uint64_t Code; - }; - void AddFilterElementToSourceOptions( - Azure::Core::Amqp::Models::_internal::MessageSourceOptions& sourceOptions, - FilterDescription description, - Azure::Core::Amqp::Models::AmqpValue const& filterValue) - { - Azure::Core::Amqp::Models::AmqpDescribed value{description.Code, filterValue}; - sourceOptions.Filter.emplace(description.Name, value); - } + ConnectionOptions connectOptions; + connectOptions.ContainerId = m_consumerClientOptions.ApplicationID; + connectOptions.EnableTrace = true; + connectOptions.AuthenticationScopes = {"https://eventhubs.azure.net/.default"}; - FilterDescription SelectorFilter{"apache.org:selector-filter:string", 0x0000468c00000004}; - } // namespace + // Set the user agent related properties in the connectOptions based on the package + // information and application ID. + _detail::EventHubsUtilities::SetUserAgent( + connectOptions, m_consumerClientOptions.ApplicationID); + + Connection connection(m_fullyQualifiedNamespace, m_credential, connectOptions); + SessionOptions sessionOptions; + sessionOptions.InitialIncomingWindowSize + = static_cast(std::numeric_limits::max()); + + Session session{connection.CreateSession(sessionOptions)}; + m_sessions.emplace(partitionId, session); + } + } + + Azure::Core::Amqp::_internal::Session ConsumerClient::GetSession(std::string const& partitionId) + { + return m_sessions.at(partitionId); + } PartitionClient ConsumerClient::CreatePartitionClient( - std::string partitionId, - PartitionClientOptions const& options) + std::string const& partitionId, + PartitionClientOptions const& options, + Azure::Core::Context const& context) { - PartitionClient partitionClient(options, m_consumerClientOptions.RetryOptions); - std::string suffix = !partitionId.empty() ? "/Partitions/" + partitionId : ""; std::string hostUrl = m_hostUrl + suffix; - ConnectionOptions connectOptions; - connectOptions.ContainerId = m_consumerClientOptions.ApplicationID; - connectOptions.EnableTrace = true; - connectOptions.AuthenticationScopes = {"https://eventhubs.azure.net/.default"}; + EnsureSession(partitionId); - // Set the user agent related properties in the connectOptions based on the package information - // and application ID. - _detail::EventHubsUtilities::SetUserAgent( - connectOptions, m_consumerClientOptions.ApplicationID); - - Connection connection(m_fullyQualifiedNamespace, m_credential, connectOptions); - SessionOptions sessionOptions; - sessionOptions.InitialIncomingWindowSize = static_cast( - m_consumerClientOptions.MaxMessageSize.ValueOr(std::numeric_limits::max())); - - Session session{connection.CreateSession(sessionOptions)}; - - Azure::Core::Amqp::Models::_internal::MessageSourceOptions sourceOptions; - sourceOptions.Address = static_cast(hostUrl); - AddFilterElementToSourceOptions( - sourceOptions, - SelectorFilter, - static_cast( - GetStartExpression(options.StartPosition))); - - Azure::Core::Amqp::Models::_internal::MessageSource messageSource(sourceOptions); - Azure::Core::Amqp::_internal::MessageReceiverOptions receiverOptions; - if (m_consumerClientOptions.MaxMessageSize) - { - receiverOptions.MaxMessageSize = m_consumerClientOptions.MaxMessageSize.Value(); - } - receiverOptions.EnableTrace = true; - // receiverOptions.MessageTarget = m_consumerClientOptions.MessageTarget; - receiverOptions.Name = m_consumerClientOptions.Name; - receiverOptions.Properties.emplace("com.microsoft:receiver-name", m_consumerClientOptions.Name); - if (options.OwnerLevel.HasValue()) - { - receiverOptions.Properties.emplace("com.microsoft:epoch", options.OwnerLevel.Value()); - } - - MessageReceiver receiver = session.CreateMessageReceiver(messageSource, receiverOptions); - - // Open the connection to the remote. - receiver.Open(); - m_sessions.emplace(partitionId, session); - partitionClient.PushBackReceiver(receiver); - return partitionClient; - } - - std::string ConsumerClient::GetStartExpression(Models::StartPosition const& startPosition) - { - std::string greaterThan = ">"; - - if (startPosition.Inclusive) - { - greaterThan = ">="; - } - - constexpr const char* expressionErrorText - = "Only a single start point can be set: Earliest, EnqueuedTime, " - "Latest, Offset, or SequenceNumber"; - - std::string returnValue; - if (startPosition.EnqueuedTime.HasValue()) - { - returnValue = "amqp.annotation.x--opt-enqueued-time " + greaterThan + "'" - + std::to_string(std::chrono::duration_cast( - startPosition.EnqueuedTime.Value().time_since_epoch()) - .count()) - + "'"; - } - if (startPosition.Offset.HasValue()) - { - if (!returnValue.empty()) - { - throw std::runtime_error(expressionErrorText); - } - returnValue = "amqp.annotation.x-opt-offset " + greaterThan + "'" - + std::to_string(startPosition.Offset.Value()) + "'"; - } - if (startPosition.SequenceNumber.HasValue()) - { - if (!returnValue.empty()) - { - throw std::runtime_error(expressionErrorText); - } - returnValue = "amqp.annotation.x-opt-sequence-number " + greaterThan + "'" - + std::to_string(startPosition.SequenceNumber.Value()) + "'"; - } - if (startPosition.Latest.HasValue()) - { - if (!returnValue.empty()) - { - throw std::runtime_error(expressionErrorText); - } - returnValue = "amqp.annotation.x-opt-offset > '@latest'"; - } - if (startPosition.Earliest.HasValue()) - { - if (!returnValue.empty()) - { - throw std::runtime_error(expressionErrorText); - } - returnValue = "amqp.annotation.x-opt-offset > '-1'"; - } - // If we don't have a filter value, then default to the start. - if (returnValue.empty()) - { - return "amqp.annotation.x-opt-offset > '@latest'"; - } - else - { - return returnValue; - } + return _detail::PartitionClientFactory::CreatePartitionClient( + GetSession(partitionId), + hostUrl, + m_consumerClientOptions.Name, + options, + m_consumerClientOptions.RetryOptions, + context); } Models::EventHubProperties ConsumerClient::GetEventHubProperties(Core::Context const& context) { - // We need to capture the partition client here, because we need to keep it alive across the - // call to GetEventHubsProperties. - // - // If we don't keep the PartitionClient alive, the message receiver inside the partition client - // will be disconnected AFTER the outgoing ATTACH frame is sent. When the response for the - // ATTACH frame is received, it creates a new link_endpoint which is in the half attached state. - // This runs into a uAMQP bug where an incoming link detach frame will cause a crash if the - // corresponding link_endpoint is in the half attached state. - std::shared_ptr client; - if (m_sessions.find("0") == m_sessions.end()) - { - client = std::make_shared(CreatePartitionClient("0")); - } + // Since EventHub properties are not tied to a partition, we don't specify a partition ID. + EnsureSession(); - return _detail::EventHubsUtilities::GetEventHubsProperties( - m_sessions.at("0"), m_eventHub, context); + return _detail::EventHubsUtilities::GetEventHubsProperties(GetSession(), m_eventHub, context); } Models::EventHubPartitionProperties ConsumerClient::GetPartitionProperties( std::string const& partitionId, Core::Context const& context) { - if (m_sessions.find(partitionId) == m_sessions.end()) - { - CreatePartitionClient(partitionId); - } + EnsureSession(partitionId); return _detail::EventHubsUtilities::GetEventHubsPartitionProperties( - m_sessions.at(partitionId), m_eventHub, partitionId, context); + GetSession(partitionId), m_eventHub, partitionId, context); } }}} // namespace Azure::Messaging::EventHubs diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/event_data_batch.cpp b/sdk/eventhubs/azure-messaging-eventhubs/src/event_data_batch.cpp index 462be806e..8eaae8624 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/event_data_batch.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/event_data_batch.cpp @@ -14,9 +14,9 @@ using namespace Azure::Core::Diagnostics; namespace Azure { namespace Messaging { namespace EventHubs { - void EventDataBatch::AddMessage(Azure::Messaging::EventHubs::Models::EventData& message) + bool EventDataBatch::TryAddMessage(Azure::Messaging::EventHubs::Models::EventData const& message) { - AddAmqpMessage(message.GetRawAmqpMessage()); + return TryAddAmqpMessage(message.GetRawAmqpMessage()); } Azure::Core::Amqp::Models::AmqpMessage EventDataBatch::ToAmqpMessage() const @@ -48,7 +48,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { return returnValue; } - void EventDataBatch::AddAmqpMessage(Azure::Core::Amqp::Models::AmqpMessage message) + bool EventDataBatch::TryAddAmqpMessage(Azure::Core::Amqp::Models::AmqpMessage message) { std::lock_guard lock(m_rwMutex); @@ -75,16 +75,23 @@ namespace Azure { namespace Messaging { namespace EventHubs { m_currentSize = serializedMessage.size(); } auto actualPayloadSize = CalculateActualSizeForPayload(serializedMessage); - if (m_currentSize + actualPayloadSize > m_maxBytes) + if (m_currentSize + actualPayloadSize > m_maxBytes.Value()) { m_currentSize = 0; m_batchEnvelope = nullptr; - - throw std::runtime_error("EventDataBatch size is too large."); + return false; } m_currentSize += actualPayloadSize; m_marshalledMessages.push_back(serializedMessage); + return true; } }}} // namespace Azure::Messaging::EventHubs + +namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail { + EventDataBatch EventDataBatchFactory::CreateEventDataBatch(EventDataBatchOptions const& options) + { + return EventDataBatch{options}; + } +}}}} // namespace Azure::Messaging::EventHubs::_detail diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/partition_client.cpp b/sdk/eventhubs/azure-messaging-eventhubs/src/partition_client.cpp index 9562ed59b..cc5306dd7 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/partition_client.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/partition_client.cpp @@ -4,12 +4,167 @@ #include "azure/messaging/eventhubs/partition_client.hpp" #include "azure/messaging/eventhubs/eventhubs_exception.hpp" +#include "private/eventhubs_constants.hpp" #include "private/eventhubs_utilities.hpp" #include "private/retry_operation.hpp" #include +using namespace Azure::Core::Diagnostics::_internal; +using namespace Azure::Core::Diagnostics; + namespace Azure { namespace Messaging { namespace EventHubs { + namespace { + struct FilterDescription + { + std::string Name; + std::uint64_t Code; + }; + void AddFilterElementToSourceOptions( + Azure::Core::Amqp::Models::_internal::MessageSourceOptions& sourceOptions, + FilterDescription description, + Azure::Core::Amqp::Models::AmqpValue const& filterValue) + { + Azure::Core::Amqp::Models::AmqpDescribed value{description.Code, filterValue}; + sourceOptions.Filter.emplace(description.Name, value); + } + + FilterDescription SelectorFilter{"apache.org:selector-filter:string", 0x0000468c00000004}; + + std::string GetStartExpression(Models::StartPosition const& startPosition) + { + Log::Stream(Logger::Level::Verbose) + << "Get Start Expression, startPosition: " << startPosition; + std::string greaterThan = ">"; + + if (startPosition.Inclusive) + { + greaterThan = ">="; + } + + constexpr const char* expressionErrorText + = "Only a single start point can be set: Earliest, EnqueuedTime, " + "Latest, Offset, or SequenceNumber"; + + std::string returnValue; + if (startPosition.EnqueuedTime.HasValue()) + { + returnValue = "amqp.annotation.x-opt-enqueued-time " + greaterThan + "'" + + std::to_string(std::chrono::duration_cast( + static_cast( + startPosition.EnqueuedTime.Value()) + .time_since_epoch()) + .count()) + + "'"; + } + if (startPosition.Offset.HasValue()) + { + if (!returnValue.empty()) + { + throw std::runtime_error(expressionErrorText); + } + returnValue = "amqp.annotation.x-opt-offset " + greaterThan + "'" + + std::to_string(startPosition.Offset.Value()) + "'"; + } + if (startPosition.SequenceNumber.HasValue()) + { + if (!returnValue.empty()) + { + throw std::runtime_error(expressionErrorText); + } + returnValue = "amqp.annotation.x-opt-sequence-number " + greaterThan + "'" + + std::to_string(startPosition.SequenceNumber.Value()) + "'"; + } + if (startPosition.Latest.HasValue()) + { + if (!returnValue.empty()) + { + throw std::runtime_error(expressionErrorText); + } + returnValue = "amqp.annotation.x-opt-offset > '@latest'"; + } + if (startPosition.Earliest.HasValue()) + { + if (!returnValue.empty()) + { + throw std::runtime_error(expressionErrorText); + } + returnValue = "amqp.annotation.x-opt-offset > '-1'"; + } + // If we don't have a filter value, then default to the start. + if (returnValue.empty()) + { + Log::Stream(Logger::Level::Verbose) << "No return value, use default."; + return "amqp.annotation.x-opt-offset > '@latest'"; + } + else + { + Log::Stream(Logger::Level::Verbose) << "Get Start Expression, returnValue: " << returnValue; + return returnValue; + } + } + + // Helper function to create a message receiver. + Azure::Core::Amqp::_internal::MessageReceiver CreateMessageReceiver( + Azure::Core::Amqp::_internal::Session const& session, + std::string const& partitionUrl, + std::string const& receiverName, + PartitionClientOptions const& options, + Azure::Core::Amqp::_internal::MessageReceiverEvents* events = nullptr) + { + Azure::Core::Amqp::Models::_internal::MessageSourceOptions sourceOptions; + sourceOptions.Address = static_cast(partitionUrl); + AddFilterElementToSourceOptions( + sourceOptions, + SelectorFilter, + static_cast( + GetStartExpression(options.StartPosition))); + + Azure::Core::Amqp::Models::_internal::MessageSource messageSource(sourceOptions); + Azure::Core::Amqp::_internal::MessageReceiverOptions receiverOptions; + + receiverOptions.EnableTrace = true; + // Set the link credit to the prefetch count. If the user has not set a prefetch count, then + // we will use the default value. + if (options.Prefetch >= 0) + { + receiverOptions.MaxLinkCredit = options.Prefetch; + } + receiverOptions.Name = receiverName; + receiverOptions.Properties.emplace("com.microsoft:receiver-name", receiverName); + if (options.OwnerLevel.HasValue()) + { + receiverOptions.Properties.emplace("com.microsoft:epoch", options.OwnerLevel.Value()); + } + return session.CreateMessageReceiver(messageSource, receiverOptions, events); + } + } // namespace + + PartitionClient _detail::PartitionClientFactory::CreatePartitionClient( + Azure::Core::Amqp::_internal::Session const& session, + std::string const& partitionUrl, + std::string const& receiverName, + PartitionClientOptions options, + Azure::Core::Http::Policies::RetryOptions retryOptions, + Azure::Core::Context const& context) + { + Azure::Core::Amqp::_internal::MessageReceiver messageReceiver{ + CreateMessageReceiver(session, partitionUrl, receiverName, options)}; + messageReceiver.Open(context); + + return PartitionClient(std::move(messageReceiver), std::move(options), std::move(retryOptions)); + } + + PartitionClient::PartitionClient( + Azure::Core::Amqp::_internal::MessageReceiver const& messageReceiver, + PartitionClientOptions options, + Azure::Core::Http::Policies::RetryOptions retryOptions) + : m_receiver{messageReceiver}, m_partitionOptions{options}, m_retryOptions{retryOptions} + { + } + + PartitionClient::~PartitionClient() {} + /** Receive events from the partition. * * @param maxMessages The maximum number of messages to receive. @@ -22,11 +177,10 @@ namespace Azure { namespace Messaging { namespace EventHubs { Core::Context const& context) { std::vector messages; - // bool prefetchDisabled = m_prefetchCount < 0; while (messages.size() < maxMessages && !context.IsCancelled()) { - auto message = m_receivers[0].WaitForIncomingMessage(context); + auto message = m_receiver.WaitForIncomingMessage(context); if (message.first.HasValue()) { messages.push_back(Models::ReceivedEventData{message.first.Value()}); @@ -38,4 +192,26 @@ namespace Azure { namespace Messaging { namespace EventHubs { } return messages; } + void PartitionClient::OnMessageReceiverStateChanged( + Azure::Core::Amqp::_internal::MessageReceiver const& receiver, + Azure::Core::Amqp::_internal::MessageReceiverState newState, + Azure::Core::Amqp::_internal::MessageReceiverState oldState) + { + (void)receiver; + (void)newState; + (void)oldState; + } + Azure::Core::Amqp::Models::AmqpValue PartitionClient::OnMessageReceived( + Azure::Core::Amqp::_internal::MessageReceiver const& receiver, + Azure::Core::Amqp::Models::AmqpMessage const& message) + { + (void)receiver; + (void)message; + return Azure::Core::Amqp::Models::_internal::Messaging::DeliveryAccepted(); + } + void PartitionClient::OnMessageReceiverDisconnected( + Azure::Core::Amqp::Models::_internal::AmqpError const& error) + { + (void)error; + } }}} // namespace Azure::Messaging::EventHubs diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/private/eventhubs_utilities.hpp b/sdk/eventhubs/azure-messaging-eventhubs/src/private/eventhubs_utilities.hpp index 2c52217f8..3d0273fe4 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/private/eventhubs_utilities.hpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/private/eventhubs_utilities.hpp @@ -4,13 +4,16 @@ // Useful utilities for the Event Hubs Clients. #pragma once +#include "azure/messaging/eventhubs/event_data_batch.hpp" #include "azure/messaging/eventhubs/eventhubs_exception.hpp" #include "azure/messaging/eventhubs/models/management_models.hpp" +#include "azure/messaging/eventhubs/partition_client.hpp" #include "package_version.hpp" #include #include #include +#include #include #include @@ -60,6 +63,24 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail Azure::Core::Amqp::Models::_internal::AmqpErrorCondition const& condition); }; + class EventDataBatchFactory final { + public: + static EventDataBatch CreateEventDataBatch(EventDataBatchOptions const& options); + EventDataBatchFactory() = delete; + }; + + class PartitionClientFactory final { + public: + static PartitionClient CreatePartitionClient( + Azure::Core::Amqp::_internal::Session const& session, + std::string const& partitionUrl, + std::string const& receiverName, + PartitionClientOptions options, + Azure::Core::Http::Policies::RetryOptions retryOptions, + Azure::Core::Context const& context); + PartitionClientFactory() = delete; + }; + class EventHubsUtilities { public: @@ -91,12 +112,12 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail // Create a management client off the session. // Eventhubs management APIs return a status code in the "status-code" application properties. Azure::Core::Amqp::_internal::ManagementClientOptions managementClientOptions; - managementClientOptions.EnableTrace = false; + managementClientOptions.EnableTrace = true; managementClientOptions.ExpectedStatusCodeKeyName = "status-code"; Azure::Core::Amqp::_internal::ManagementClient managementClient{ session.CreateManagementClient(eventHubName, managementClientOptions)}; - managementClient.Open(); + managementClient.Open(context); // Send a message to the management endpoint to retrieve the properties of the eventhub. Azure::Core::Amqp::Models::AmqpMessage message; @@ -113,6 +134,10 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail Models::EventHubProperties properties; if (result.Status != Azure::Core::Amqp::_internal::ManagementOperationStatus::Ok) { + Azure::Core::Diagnostics::_internal::Log::Stream( + Azure::Core::Diagnostics::Logger::Level::Error) + << "Management operation failed. StatusCode: " << result.StatusCode + << " Error: " << result.Error; throw _detail::EventHubsExceptionFactory::CreateEventHubsException( result.Error, result.StatusCode); } @@ -154,12 +179,12 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail // Create a management client off the session. // Eventhubs management APIs return a status code in the "status-code" application properties. Azure::Core::Amqp::_internal::ManagementClientOptions managementClientOptions; - managementClientOptions.EnableTrace = false; + managementClientOptions.EnableTrace = true; managementClientOptions.ExpectedStatusCodeKeyName = "status-code"; Azure::Core::Amqp::_internal::ManagementClient managementClient{ session.CreateManagementClient(eventHubName, managementClientOptions)}; - managementClient.Open(); + managementClient.Open(context); // Send a message to the management endpoint to retrieve the properties of the eventhub. Azure::Core::Amqp::Models::AmqpMessage message; @@ -175,6 +200,10 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail message, context); + Azure::Core::Diagnostics::_internal::Log::Stream( + Azure::Core::Diagnostics::Logger::Level::Informational) + << "Received partition properties: " << result.Message; + Models::EventHubPartitionProperties properties; if (result.Status != Azure::Core::Amqp::_internal::ManagementOperationStatus::Ok) { @@ -198,7 +227,29 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail properties.PartitionId = static_cast(bodyMap["partition"]); properties.BeginningSequenceNumber = bodyMap["begin_sequence_number"]; properties.LastEnqueuedSequenceNumber = bodyMap["last_enqueued_sequence_number"]; - properties.LastEnqueuedOffset = static_cast(bodyMap["last_enqueued_offset"]); + // For the last enqueued offset is returned as a string. Convert to an int64. + properties.LastEnqueuedOffset = std::strtoull( + static_cast(bodyMap["last_enqueued_offset"]).c_str(), nullptr, 10); + + Azure::Core::Diagnostics::_internal::Log::Stream( + Azure::Core::Diagnostics::Logger::Level::Informational) + << "last enqueued time utc: " << bodyMap["last_enqueued_time_utc"]; + Azure::Core::Diagnostics::_internal::Log::Stream( + Azure::Core::Diagnostics::Logger::Level::Informational) + << "last enqueued time utc: " + << static_cast( + bodyMap["last_enqueued_time_utc"].AsTimestamp()) + .count() + << " ms"; + Azure::Core::Diagnostics::_internal::Log::Stream( + Azure::Core::Diagnostics::Logger::Level::Informational) + << "last enqueued time utc: " + << std::chrono::duration_cast( + static_cast( + bodyMap["last_enqueued_time_utc"].AsTimestamp())) + .count() + << " s"; + properties.LastEnqueuedTimeUtc = Azure::DateTime(std::chrono::system_clock::from_time_t( std::chrono::duration_cast( static_cast( diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/producer_client.cpp b/sdk/eventhubs/azure-messaging-eventhubs/src/producer_client.cpp index 36bbe4146..bfe4658a2 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/producer_client.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/producer_client.cpp @@ -3,6 +3,7 @@ #include "azure/messaging/eventhubs/producer_client.hpp" +#include "azure/messaging/eventhubs/event_data_batch.hpp" #include "azure/messaging/eventhubs/eventhubs_exception.hpp" #include "private/eventhubs_utilities.hpp" #include "private/retry_operation.hpp" @@ -13,127 +14,177 @@ namespace { const std::string DefaultAuthScope = "https://eventhubs.azure.net/.default"; } -Azure::Messaging::EventHubs::ProducerClient::ProducerClient( - std::string const& connectionString, - std::string const& eventHub, - Azure::Messaging::EventHubs::ProducerClientOptions options) - : m_connectionString{connectionString}, m_eventHub{eventHub}, m_producerClientOptions(options) -{ - auto sasCredential - = std::make_shared( - connectionString); +namespace Azure { namespace Messaging { namespace EventHubs { - m_credential = sasCredential; - - m_eventHub = (sasCredential->GetEntityPath().empty() ? eventHub : sasCredential->GetEntityPath()); - m_fullyQualifiedNamespace = sasCredential->GetHostName(); -} - -Azure::Messaging::EventHubs::ProducerClient::ProducerClient( - std::string const& fullyQualifiedNamespace, - std::string const& eventHub, - std::shared_ptr credential, - Azure::Messaging::EventHubs::ProducerClientOptions options) - : m_fullyQualifiedNamespace{fullyQualifiedNamespace}, m_eventHub{eventHub}, - m_credential{credential}, m_producerClientOptions(options) -{ -} - -Azure::Core::Amqp::_internal::MessageSender Azure::Messaging::EventHubs::ProducerClient::GetSender( - std::string const& partitionId) -{ - if (m_senders.find(partitionId) == m_senders.end()) + ProducerClient::ProducerClient( + std::string const& connectionString, + std::string const& eventHub, + Azure::Messaging::EventHubs::ProducerClientOptions options) + : m_connectionString{connectionString}, m_eventHub{eventHub}, m_producerClientOptions(options) { - CreateSender(partitionId); + auto sasCredential + = std::make_shared( + connectionString); + + m_credential = sasCredential; + + m_eventHub + = (sasCredential->GetEntityPath().empty() ? eventHub : sasCredential->GetEntityPath()); + m_fullyQualifiedNamespace = sasCredential->GetHostName(); } - auto& sender = m_senders.at(partitionId); - return sender; -} - -void Azure::Messaging::EventHubs::ProducerClient::CreateSender(std::string const& partitionId) -{ - m_targetUrl = "amqps://" + m_fullyQualifiedNamespace + "/" + m_eventHub; - - Azure::Core::Amqp::_internal::ConnectionOptions connectOptions; - connectOptions.ContainerId = m_producerClientOptions.ApplicationID; - connectOptions.EnableTrace = true; - connectOptions.AuthenticationScopes = {"https://eventhubs.azure.net/.default"}; - - // Set the UserAgent related properties on this message sender. - _detail::EventHubsUtilities::SetUserAgent(connectOptions, m_producerClientOptions.ApplicationID); - - std::string fullyQualifiedNamespace{m_fullyQualifiedNamespace}; - std::string targetUrl = m_targetUrl; - - if (!partitionId.empty()) + ProducerClient::ProducerClient( + std::string const& fullyQualifiedNamespace, + std::string const& eventHub, + std::shared_ptr credential, + Azure::Messaging::EventHubs::ProducerClientOptions options) + : m_fullyQualifiedNamespace{fullyQualifiedNamespace}, m_eventHub{eventHub}, + m_credential{credential}, m_producerClientOptions(options) { - targetUrl += "/Partitions/" + partitionId; } - Azure::Core::Amqp::_internal::Connection connection( - fullyQualifiedNamespace, m_credential, connectOptions); - - Azure::Core::Amqp::_internal::SessionOptions sessionOptions; - sessionOptions.InitialIncomingWindowSize = std::numeric_limits::max(); - sessionOptions.InitialOutgoingWindowSize = std::numeric_limits::max(); - - Azure::Core::Amqp::_internal::Session session{connection.CreateSession(sessionOptions)}; - m_sessions.emplace(partitionId, session); - Azure::Core::Amqp::_internal::MessageSenderOptions senderOptions; - - senderOptions.Name = m_producerClientOptions.Name; - senderOptions.EnableTrace = true; - senderOptions.MaxMessageSize = m_producerClientOptions.MaxMessageSize; - - Azure::Core::Amqp::_internal::MessageSender sender - = session.CreateMessageSender(targetUrl, senderOptions, nullptr); - sender.Open(); - m_senders.emplace(partitionId, sender); -} - -bool Azure::Messaging::EventHubs::ProducerClient::SendEventDataBatch( - EventDataBatch const& eventDataBatch, - Core::Context const& context) -{ - auto message = eventDataBatch.ToAmqpMessage(); - - Azure::Messaging::EventHubs::_detail::RetryOperation retryOp( - m_producerClientOptions.RetryOptions); - return retryOp.Execute([&]() -> bool { - auto result = GetSender(eventDataBatch.GetPartitionId()).Send(message, context); - auto sendStatus = std::get<0>(result); - if (sendStatus == Azure::Core::Amqp::_internal::MessageSendStatus::Ok) + void ProducerClient::EnsureSession(std::string const& partitionId = {}) + { + if (m_sessions.find(partitionId) == m_sessions.end()) { - return true; + Azure::Core::Amqp::_internal::ConnectionOptions connectOptions; + connectOptions.ContainerId = m_producerClientOptions.ApplicationID; + connectOptions.EnableTrace = true; + connectOptions.AuthenticationScopes = {"https://eventhubs.azure.net/.default"}; + + // Set the UserAgent related properties on this message sender. + _detail::EventHubsUtilities::SetUserAgent( + connectOptions, m_producerClientOptions.ApplicationID); + + std::string fullyQualifiedNamespace{m_fullyQualifiedNamespace}; + + Azure::Core::Amqp::_internal::Connection connection( + fullyQualifiedNamespace, m_credential, connectOptions); + + Azure::Core::Amqp::_internal::SessionOptions sessionOptions; + sessionOptions.InitialIncomingWindowSize = std::numeric_limits::max(); + sessionOptions.InitialOutgoingWindowSize = std::numeric_limits::max(); + + Azure::Core::Amqp::_internal::Session session{connection.CreateSession(sessionOptions)}; + m_sessions.emplace(partitionId, session); } - // Throw an exception about the error we just received. - throw Azure::Messaging::EventHubs::_detail::EventHubsExceptionFactory::CreateEventHubsException( - std::get<1>(result)); - }); -} - -Azure::Messaging::EventHubs::Models::EventHubProperties -Azure::Messaging::EventHubs::ProducerClient::GetEventHubProperties(Core::Context const& context) -{ - if (m_senders.find("") == m_senders.end()) - { - CreateSender(""); } - return _detail::EventHubsUtilities::GetEventHubsProperties( - m_sessions.at(""), m_eventHub, context); -} - -Azure::Messaging::EventHubs::Models::EventHubPartitionProperties -Azure::Messaging::EventHubs::ProducerClient::GetPartitionProperties( - std::string const& partitionId, - Core::Context const& context) -{ - if (m_senders.find(partitionId) == m_senders.end()) + Azure::Core::Amqp::_internal::Session ProducerClient::GetSession(std::string const& partitionId) { - CreateSender(partitionId); + return m_sessions.at(partitionId); } - return _detail::EventHubsUtilities::GetEventHubsPartitionProperties( - m_sessions.at(partitionId), m_eventHub, partitionId, context); -} + + void ProducerClient::EnsureSender( + std::string const& partitionId, + Azure::Core::Context const& context) + { + if (m_senders.find(partitionId) == m_senders.end()) + { + m_targetUrl = "amqps://" + m_fullyQualifiedNamespace + "/" + m_eventHub; + + EnsureSession(partitionId); + + std::string targetUrl = m_targetUrl; + + if (!partitionId.empty()) + { + targetUrl += "/Partitions/" + partitionId; + } + + Azure::Core::Amqp::_internal::MessageSenderOptions senderOptions; + senderOptions.Name = m_producerClientOptions.Name; + senderOptions.EnableTrace = true; + senderOptions.MaxMessageSize = m_producerClientOptions.MaxMessageSize; + + Azure::Core::Amqp::_internal::MessageSender sender + = GetSession(partitionId).CreateMessageSender(targetUrl, senderOptions, nullptr); + sender.Open(context); + m_senders.emplace(partitionId, sender); + } + } + Azure::Core::Amqp::_internal::MessageSender ProducerClient::GetSender( + std::string const& partitionId) + { + return m_senders.at(partitionId); + } + + EventDataBatch ProducerClient::CreateBatch( + EventDataBatchOptions const& options, + Core::Context const& context) + { + EnsureSender(options.PartitionId, context); + + auto messageSender = GetSender(options.PartitionId); + EventDataBatchOptions optionsToUse{options}; + if (!options.MaxBytes.HasValue()) + { + optionsToUse.MaxBytes = messageSender.GetMaxMessageSize(); + } + + return _detail::EventDataBatchFactory::CreateEventDataBatch(optionsToUse); + } + + void ProducerClient::Send(EventDataBatch const& eventDataBatch, Core::Context const& context) + { + auto message = eventDataBatch.ToAmqpMessage(); + + Azure::Messaging::EventHubs::_detail::RetryOperation retryOp( + m_producerClientOptions.RetryOptions); + retryOp.Execute([&]() -> bool { + auto result = GetSender(eventDataBatch.GetPartitionId()).Send(message, context); + auto sendStatus = std::get<0>(result); + if (sendStatus == Azure::Core::Amqp::_internal::MessageSendStatus::Ok) + { + return true; + } + // Throw an exception about the error we just received. + throw Azure::Messaging::EventHubs::_detail::EventHubsExceptionFactory:: + CreateEventHubsException(std::get<1>(result)); + }); + } + + void ProducerClient::Send(Models::EventData const& eventData, Core::Context const& context) + { + auto batch = CreateBatch(EventDataBatchOptions{}, context); + if (!batch.TryAddMessage(eventData)) + { + throw std::runtime_error("Could not add message to batch."); + } + Send(batch, context); + } + + void ProducerClient::Send( + std::vector const& eventData, + Core::Context const& context) + { + auto batch = CreateBatch(EventDataBatchOptions{}, context); + for (const auto& data : eventData) + { + if (!batch.TryAddMessage(data)) + { + throw std::runtime_error("Could not add message to batch."); + } + } + Send(batch, context); + } + + Models::EventHubProperties ProducerClient::GetEventHubProperties(Core::Context const& context) + { + // EventHub properties are not associated with a particular partition, so create a message + // sender on the empty partition. + EnsureSession(); + + return _detail::EventHubsUtilities::GetEventHubsProperties(GetSession(), m_eventHub, context); + } + + Models::EventHubPartitionProperties ProducerClient::GetPartitionProperties( + std::string const& partitionId, + Core::Context const& context) + { + EnsureSession(partitionId); + + return _detail::EventHubsUtilities::GetEventHubsPartitionProperties( + GetSession(partitionId), m_eventHub, partitionId, context); + } +}}} // namespace Azure::Messaging::EventHubs diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test-resources.json b/sdk/eventhubs/azure-messaging-eventhubs/test-resources.json index 0ee126aa8..5c7ab9d3b 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/test-resources.json +++ b/sdk/eventhubs/azure-messaging-eventhubs/test-resources.json @@ -65,7 +65,8 @@ }, "variables": { "apiVersion": "2017-04-01", - "eventHubName": "eventHub", + "eventHubName": "testEventHub", + "consumerGroup": "defaultGroup", "contributorRoleId": "b24988ac-6180-42a0-ab88-20f7382dd24c", "eventHubsDataOwnerRoleId": "f526a384-b230-433a-b45c-95f59c4a2dec", "storageDataOwnerRoleId": "b7e6dc6d-f1e8-4753-8033-0f276bb0955b", @@ -124,7 +125,7 @@ { "type": "Microsoft.EventHub/Namespaces/EventHubs/ConsumerGroups", "apiVersion": "[variables('apiVersion')]", - "name": "[concat(variables('eventHubsNamespace'), '/', variables('eventHubName'), '/$Default')]", + "name": "[concat(variables('eventHubsNamespace'), '/', variables('eventHubName'), '/', variables('consumerGroup'))]", "location": "[parameters('location')]", "dependsOn": [ "[resourceId('Microsoft.EventHub/namespaces/eventhubs', variables('eventHubsNamespace'), variables('eventHubName'))]", @@ -244,6 +245,10 @@ "type": "string", "value": "[listkeys(variables('eventHubsAuthRuleResourceId'), '2015-08-01').primaryConnectionString]" }, + "EVENTHUB_CONSUMER_GROUP": { + "type": "string", + "value": "[variables('consumerGroup')]" + }, "CHECKPOINTSTORE_STORAGE_CONNECTION_STRING": { "type": "string", "value": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('storageAccount'), ';AccountKey=', listKeys(variables('storageAccountId'), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value, ';EndpointSuffix=', parameters('storageEndpointSuffix'))]" diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/eventhubs_stress_test.cpp b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/eventhubs_stress_test.cpp index 0537f262e..eca8ed59e 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/eventhubs_stress_test.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/eventhubs-stress-test/eventhubs_stress_test.cpp @@ -107,7 +107,7 @@ private: Azure::Messaging::EventHubs::EventDataBatchOptions batchOptions; batchOptions.PartitionId = m_partitionId; - Azure::Messaging::EventHubs::EventDataBatch batch(batchOptions); + Azure::Messaging::EventHubs::EventDataBatch batch(m_client->CreateBatch(batchOptions)); for (uint32_t j = 0; j < m_numberToSend; ++j) { @@ -117,9 +117,9 @@ private: event.Properties["PartitionId"] = static_cast(m_partitionId); AddEndProperty(event, m_numberToSend); - batch.AddMessage(event); + batch.TryAddMessage(event); } - m_client->SendEventDataBatch(batch, context); + m_client->Send(batch, context); auto afterSendProps = m_client->GetPartitionProperties(m_partitionId, context); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/perf/inc/azure/messaging/eventhubs/test/eventhubs_batch_perf_test.hpp b/sdk/eventhubs/azure-messaging-eventhubs/test/perf/inc/azure/messaging/eventhubs/test/eventhubs_batch_perf_test.hpp index f63b2dc4f..e486a05a3 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/test/perf/inc/azure/messaging/eventhubs/test/eventhubs_batch_perf_test.hpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/perf/inc/azure/messaging/eventhubs/test/eventhubs_batch_perf_test.hpp @@ -39,8 +39,8 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace PerfTest uint32_t m_batchSize; uint32_t m_prefetchCount; uint64_t m_rounds; - uint32_t m_paddingBytes; - uint32_t m_maxDeadlineExceeded; + uint32_t m_paddingBytes{}; + uint32_t m_maxDeadlineExceeded{}; std::shared_ptr m_credential; std::unique_ptr m_client; @@ -148,7 +148,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace PerfTest Azure::Messaging::EventHubs::EventDataBatchOptions batchOptions; batchOptions.PartitionId = m_partitionId; - Azure::Messaging::EventHubs::EventDataBatch batch(batchOptions); + Azure::Messaging::EventHubs::EventDataBatch batch{m_client->CreateBatch(batchOptions)}; for (uint32_t j = 0; j < m_numberToSend; ++j) { @@ -158,9 +158,9 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace PerfTest event.Properties["PartitionId"] = static_cast(m_partitionId); AddEndProperty(event, m_numberToSend); - batch.AddMessage(event); + batch.TryAddMessage(event); } - m_client->SendEventDataBatch(batch, context); + m_client->Send(batch, context); auto afterSendProps = m_client->GetPartitionProperties(m_partitionId, context); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/CMakeLists.txt b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/CMakeLists.txt index 5a8388f43..bcc8e5558 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/CMakeLists.txt +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/CMakeLists.txt @@ -17,14 +17,16 @@ SetUpTestProxy("sdk/eventhubs") ################## Unit Tests ########################## add_executable ( azure-messaging-eventhubs-test + checkpoint_store_test.cpp + consumer_client_test.cpp + event_data_test.cpp + processor_load_balancer_test.cpp + processor_test.cpp producer_client_test.cpp retry_operation_test.cpp - event_data_test.cpp - consumer_client_test.cpp - checkpoint_store_test.cpp - processor_load_balancer_test.cpp + round_trip_test.cpp test_checkpoint_store.hpp - processor_test.cpp) +) create_per_service_target_build(eventhubs azure-messaging-eventhubs-test) create_map_file(azure-messaging-eventhubs-test azure-messaging-eventhubs-test.map) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/checkpoint_store_test.cpp b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/checkpoint_store_test.cpp index 3d757cc7f..82af1f7e5 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/checkpoint_store_test.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/checkpoint_store_test.cpp @@ -4,7 +4,6 @@ #include "eventhubs_test_base.hpp" #include -#include #include #include @@ -40,11 +39,9 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { TEST_F(CheckpointStoreTest, TestCheckpoints) { std::string const testName = GetRandomName(); + std::string consumerGroup = GetEnv("EVENTHUB_CONSUMER_GROUP"); auto containerClient{Azure::Storage::Blobs::BlobContainerClient::CreateFromConnectionString( - Azure::Core::_internal::Environment::GetVariable( - "CHECKPOINTSTORE_STORAGE_CONNECTION_STRING"), - testName, - m_blobClientOptions)}; + GetEnv("CHECKPOINTSTORE_STORAGE_CONNECTION_STRING"), testName, m_blobClientOptions)}; Azure::Messaging::EventHubs::BlobCheckpointStore checkpointStore(containerClient); auto checkpoints = checkpointStore.ListCheckpoints( @@ -53,7 +50,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { EXPECT_EQ(0ul, checkpoints.size()); checkpointStore.UpdateCheckpoint(Azure::Messaging::EventHubs::Models::Checkpoint{ - "$Default", + consumerGroup, "event-hub-name", "ns.servicebus.windows.net", "partition-id", @@ -62,9 +59,9 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { }); checkpoints = checkpointStore.ListCheckpoints( - "ns.servicebus.windows.net", "event-hub-name", "$Default"); + "ns.servicebus.windows.net", "event-hub-name", consumerGroup); EXPECT_EQ(checkpoints.size(), 1ul); - EXPECT_EQ("$Default", checkpoints[0].ConsumerGroup); + EXPECT_EQ(consumerGroup, checkpoints[0].ConsumerGroup); EXPECT_EQ("event-hub-name", checkpoints[0].EventHubName); EXPECT_EQ("ns.servicebus.windows.net", checkpoints[0].FullyQualifiedNamespaceName); EXPECT_EQ("partition-id", checkpoints[0].PartitionId); @@ -72,7 +69,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { EXPECT_EQ(101, checkpoints[0].Offset.Value()); checkpointStore.UpdateCheckpoint(Azure::Messaging::EventHubs::Models::Checkpoint{ - "$Default", + consumerGroup, "event-hub-name", "ns.servicebus.windows.net", "partition-id", @@ -81,9 +78,9 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { }); checkpoints = checkpointStore.ListCheckpoints( - "ns.servicebus.windows.net", "event-hub-name", "$Default"); + "ns.servicebus.windows.net", "event-hub-name", consumerGroup); EXPECT_EQ(checkpoints.size(), 1ul); - EXPECT_EQ("$Default", checkpoints[0].ConsumerGroup); + EXPECT_EQ(consumerGroup, checkpoints[0].ConsumerGroup); EXPECT_EQ("event-hub-name", checkpoints[0].EventHubName); EXPECT_EQ("ns.servicebus.windows.net", checkpoints[0].FullyQualifiedNamespaceName); EXPECT_EQ("partition-id", checkpoints[0].PartitionId); @@ -95,10 +92,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { { std::string const testName = GetRandomName(); auto containerClient{Azure::Storage::Blobs::BlobContainerClient::CreateFromConnectionString( - Azure::Core::_internal::Environment::GetVariable( - "CHECKPOINTSTORE_STORAGE_CONNECTION_STRING"), - testName, - m_blobClientOptions)}; + GetEnv("CHECKPOINTSTORE_STORAGE_CONNECTION_STRING"), testName, m_blobClientOptions)}; Azure::Messaging::EventHubs::BlobCheckpointStore checkpointStore(containerClient); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/consumer_client_test.cpp b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/consumer_client_test.cpp index 6fb94295d..a538d7fc8 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/consumer_client_test.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/consumer_client_test.cpp @@ -22,15 +22,33 @@ void ProcessMessageSuccess(Azure::Core::Amqp::Models::AmqpMessage const& message } // namespace LocalTest namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { class ConsumerClientTest : public EventHubsTestBase { + void SetUp() override + { + EventHubsTestBase::SetUp(); + if (m_testContext.IsLiveMode()) + { + std::string const connStringNoEntityPath = GetEnv("EVENTHUB_CONNECTION_STRING"); + std::string eventHubName = GetEnv("EVENTHUB_NAME"); + + Azure::Messaging::EventHubs::ProducerClient producer{connStringNoEntityPath, eventHubName}; + EventDataBatchOptions eventBatchOptions; + eventBatchOptions.PartitionId = "1"; + EventDataBatch batch{producer.CreateBatch(eventBatchOptions)}; + EXPECT_TRUE(batch.TryAddMessage(Models::EventData{"Test"})); + EXPECT_NO_THROW(producer.Send(batch)); + } + } }; TEST_F(ConsumerClientTest, ConnectionStringNoEntityPath_LIVEONLY_) { std::string const connStringNoEntityPath = GetEnv("EVENTHUB_CONNECTION_STRING"); + std::string consumerGroup = GetEnv("EVENTHUB_CONSUMER_GROUP"); + std::string eventHubName = GetEnv("EVENTHUB_NAME"); auto client = Azure::Messaging::EventHubs::ConsumerClient( - connStringNoEntityPath, "eventhub", "$Default"); - EXPECT_EQ("eventhub", client.GetEventHubName()); + connStringNoEntityPath, eventHubName, consumerGroup); + EXPECT_EQ(eventHubName, client.GetEventHubName()); } TEST_F(ConsumerClientTest, ConnectionStringEntityPath_LIVEONLY_) @@ -38,8 +56,12 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { std::string const connStringNoEntityPath = GetEnv("EVENTHUB_CONNECTION_STRING") + ";EntityPath=hehe"; + std::string consumerGroup = GetEnv("EVENTHUB_CONSUMER_GROUP"); + std::string eventHubName = GetEnv("EVENTHUB_NAME"); + + // The eventHubName parameter is ignored because the eventhub name is in the connection string. auto client = Azure::Messaging::EventHubs::ConsumerClient( - connStringNoEntityPath, "eventhub", "$DefaultZ"); + connStringNoEntityPath, eventHubName, "$DefaultZ"); EXPECT_EQ("hehe", client.GetEventHubName()); EXPECT_EQ("$DefaultZ", client.GetConsumerGroup()); } @@ -47,8 +69,9 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { TEST_F(ConsumerClientTest, ConnectionStringEntityPathNoConsumerGroup_LIVEONLY_) { std::string const connStringNoEntityPath = GetEnv("EVENTHUB_CONNECTION_STRING"); - auto client = Azure::Messaging::EventHubs::ConsumerClient(connStringNoEntityPath, "eventhub"); - EXPECT_EQ("eventhub", client.GetEventHubName()); + std::string eventHubName = GetEnv("EVENTHUB_NAME"); + auto client = Azure::Messaging::EventHubs::ConsumerClient(connStringNoEntityPath, eventHubName); + EXPECT_EQ(eventHubName, client.GetEventHubName()); EXPECT_EQ("$Default", client.GetConsumerGroup()); } @@ -121,7 +144,6 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { options.ApplicationID = "unit-test"; options.Name = "unit-test"; - options.MaxMessageSize = std::numeric_limits::max(); auto client = Azure::Messaging::EventHubs::ConsumerClient(connStringEntityPath); Azure::Messaging::EventHubs::PartitionClientOptions partitionOptions; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/processor_test.cpp b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/processor_test.cpp index 271e36cb0..641571b10 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/processor_test.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/processor_test.cpp @@ -33,6 +33,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { Azure::Messaging::EventHubs::BlobCheckpointStore checkpointStore(containerClient); std::string eventHubName{GetEnv("EVENTHUB_NAME")}; + std::string consumerGroup = GetEnv("EVENTHUB_CONSUMER_GROUP"); std::string const connStringNoEntityPath = GetEnv("EVENTHUB_CONNECTION_STRING") + ";EntityPath=" + eventHubName; @@ -42,7 +43,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { options.Name = "processor unittest"; auto client = Azure::Messaging::EventHubs::ConsumerClient( - connStringNoEntityPath, eventHubName, "$Default", options); + connStringNoEntityPath, eventHubName, consumerGroup, options); ProcessorOptions processorOptions; processorOptions.LoadBalancingStrategy = Models::ProcessorStrategy::ProcessorStrategyBalanced; processorOptions.UpdateInterval = std::chrono::seconds(2); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/producer_client_test.cpp b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/producer_client_test.cpp index e7960f1b5..13a751e0c 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/producer_client_test.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/producer_client_test.cpp @@ -59,6 +59,9 @@ TEST_F(ProducerClientTest, SendMessage_LIVEONLY_) producerOptions.Name = "sender-link"; producerOptions.ApplicationID = "some"; + auto client = Azure::Messaging::EventHubs::ProducerClient( + connStringEntityPath, eventHubName, producerOptions); + Azure::Core::Amqp::Models::AmqpMessage message2; Azure::Messaging::EventHubs::Models::EventData message1; message2.SetBody(Azure::Core::Amqp::Models::AmqpValue("Hello7")); @@ -71,27 +74,46 @@ TEST_F(ProducerClientTest, SendMessage_LIVEONLY_) Azure::Messaging::EventHubs::EventDataBatchOptions edboptions; edboptions.MaxBytes = std::numeric_limits::max(); edboptions.PartitionId = "1"; - Azure::Messaging::EventHubs::EventDataBatch eventBatch(edboptions); + Azure::Messaging::EventHubs::EventDataBatch eventBatch{client.CreateBatch(edboptions)}; Azure::Messaging::EventHubs::EventDataBatchOptions edboptions2; edboptions2.MaxBytes = std::numeric_limits::max(); ; edboptions2.PartitionId = "2"; - Azure::Messaging::EventHubs::EventDataBatch eventBatch2(edboptions2); + Azure::Messaging::EventHubs::EventDataBatch eventBatch2{client.CreateBatch(edboptions2)}; - eventBatch.AddMessage(message1); - eventBatch.AddMessage(message2); + eventBatch.TryAddMessage(message1); + eventBatch.TryAddMessage(message2); - eventBatch2.AddMessage(message3); - eventBatch2.AddMessage(message2); + eventBatch2.TryAddMessage(message3); + eventBatch2.TryAddMessage(message2); + + for (int i = 0; i < 5; i++) + { + EXPECT_NO_THROW(client.Send(eventBatch)); + } +} + +TEST_F(ProducerClientTest, EventHubRawMessageSend_LIVEONLY_) +{ + std::string eventHubName{GetEnv("EVENTHUB_NAME")}; + std::string const connStringEntityPath + = GetEnv("EVENTHUB_CONNECTION_STRING") + ";EntityPath=" + eventHubName; + + Azure::Messaging::EventHubs::ProducerClientOptions producerOptions; + producerOptions.Name = "sender-link"; + producerOptions.ApplicationID = "some"; auto client = Azure::Messaging::EventHubs::ProducerClient( connStringEntityPath, eventHubName, producerOptions); - for (int i = 0; i < 5; i++) - { - auto result = client.SendEventDataBatch(eventBatch); - EXPECT_TRUE(result); - } + + client.Send(Azure::Messaging::EventHubs::Models::EventData{"This is a test message"}); + + // Send using the implicit EventData constructor. + client.Send(std::string{"String test message"}); + + // Send using a vector of implicit EventData constructor with a binary buffer. + client.Send({{12, 13, 14, 15}, {16, 17, 18, 19}}); } TEST_F(ProducerClientTest, GetEventHubProperties_LIVEONLY_) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/round_trip_test.cpp b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/round_trip_test.cpp new file mode 100644 index 000000000..4ca46d857 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/round_trip_test.cpp @@ -0,0 +1,155 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. +#include "eventhubs_test_base.hpp" + +#include +#include +#include + +#include + +namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { + + class RoundTripTests : public EventHubsTestBase { + }; + + // Round trip a message with a string body using a sequence number filter. + TEST_F(RoundTripTests, SendAndReceiveStringSequenceNumber_LIVEONLY_) + { + std::string const connectionString = GetEnv("EVENTHUB_CONNECTION_STRING"); + std::string const eventHubName = GetEnv("EVENTHUB_NAME"); + std::string const consumerGroup = GetEnv("EVENTHUB_CONSUMER_GROUP"); + + int64_t startSequenceNumber = 0; + + { + auto producer = Azure::Messaging::EventHubs::ProducerClient(connectionString, eventHubName); + auto partitionProperties = producer.GetPartitionProperties("1"); + startSequenceNumber = partitionProperties.LastEnqueuedSequenceNumber; + + Azure::Messaging::EventHubs::EventDataBatchOptions batchOptions; + batchOptions.PartitionId = "1"; + Azure::Messaging::EventHubs::EventDataBatch eventBatch{producer.CreateBatch(batchOptions)}; + eventBatch.TryAddMessage(Azure::Messaging::EventHubs::Models::EventData("Hello world!")); + EXPECT_NO_THROW(producer.Send(eventBatch)); + } + + { + Azure::Messaging::EventHubs::PartitionClientOptions partitionOptions; + partitionOptions.StartPosition.SequenceNumber = startSequenceNumber; + + auto consumer = Azure::Messaging::EventHubs::ConsumerClient( + connectionString, eventHubName, consumerGroup); + auto receiver = consumer.CreatePartitionClient("1", partitionOptions); + + auto receivedEvents = receiver.ReceiveEvents(1); + ASSERT_EQ(1ul, receivedEvents.size()); + std::vector expected{'H', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 'd', '!'}; + + EXPECT_EQ(expected, receivedEvents[0].Body); + } + } + + // Round trip a message with a binary body using an offset filter. + TEST_F(RoundTripTests, SendAndReceiveBinaryDataOffset_LIVEONLY_) + { + std::string const connectionString = GetEnv("EVENTHUB_CONNECTION_STRING"); + std::string const eventHubName = GetEnv("EVENTHUB_NAME"); + std::string const consumerGroup = GetEnv("EVENTHUB_CONSUMER_GROUP"); + + int64_t startOffset = 0; + { + auto producer = Azure::Messaging::EventHubs::ProducerClient(connectionString, eventHubName); + auto partitionProperties = producer.GetPartitionProperties("1"); + startOffset = partitionProperties.LastEnqueuedOffset; + + Azure::Messaging::EventHubs::EventDataBatchOptions batchOptions; + batchOptions.PartitionId = "1"; + Azure::Messaging::EventHubs::EventDataBatch eventBatch{producer.CreateBatch(batchOptions)}; + eventBatch.TryAddMessage(Azure::Messaging::EventHubs::Models::EventData({1, 2, 3, 4, 5})); + EXPECT_NO_THROW(producer.Send(eventBatch)); + } + + { + auto consumer = Azure::Messaging::EventHubs::ConsumerClient( + connectionString, eventHubName, consumerGroup); + + Azure::Messaging::EventHubs::PartitionClientOptions partitionOptions; + partitionOptions.StartPosition.Offset = startOffset; + + auto receiver = consumer.CreatePartitionClient("1", partitionOptions); + + auto receivedEvents = receiver.ReceiveEvents(1); + ASSERT_EQ(1ul, receivedEvents.size()); + for (auto const& event : receivedEvents) + { + GTEST_LOG_(INFO) << "Event: " << event; + EXPECT_TRUE(event.EnqueuedTime); + EXPECT_TRUE(event.Offset); + EXPECT_TRUE(event.SequenceNumber); + } + + std::vector expected{1, 2, 3, 4, 5}; + + EXPECT_EQ(expected, receivedEvents[0].Body); + } + } + + // Round trip a message with a binary body using a queued time filter. + TEST_F(RoundTripTests, SendAndReceiveTimestamp_LIVEONLY_) + { + std::string const connectionString = GetEnv("EVENTHUB_CONNECTION_STRING"); + std::string const eventHubName = GetEnv("EVENTHUB_NAME"); + std::string const consumerGroup = GetEnv("EVENTHUB_CONSUMER_GROUP"); + + Azure::DateTime startTime; + { + auto producer = Azure::Messaging::EventHubs::ProducerClient(connectionString, eventHubName); + auto partitionProperties = producer.GetPartitionProperties("1"); + GTEST_LOG_(INFO) << "Partition Properties: " << partitionProperties; + startTime = partitionProperties.LastEnqueuedTimeUtc + std::chrono::seconds(1); + + GTEST_LOG_(INFO) << "Sleep for a second to reset enqueued time"; + std::this_thread::sleep_for(std::chrono::seconds(2)); + + Azure::Messaging::EventHubs::EventDataBatchOptions batchOptions; + batchOptions.PartitionId = "1"; + Azure::Messaging::EventHubs::EventDataBatch eventBatch{producer.CreateBatch(batchOptions)}; + Azure::Messaging::EventHubs::Models::EventData eventData; + eventData.Body = {1, 2, 3, 4, 5, 6, 7}; + eventData.ContentType = "application/binary"; + eventData.MessageId = "Test Message Id"; + EXPECT_TRUE(eventBatch.TryAddMessage(eventData)); + EXPECT_NO_THROW(producer.Send(eventBatch)); + } + + { + auto consumer = Azure::Messaging::EventHubs::ConsumerClient( + connectionString, eventHubName, consumerGroup); + + Azure::Messaging::EventHubs::PartitionClientOptions partitionOptions; + partitionOptions.StartPosition.EnqueuedTime = startTime; + partitionOptions.StartPosition.Inclusive = false; + + auto receiver = consumer.CreatePartitionClient("1", partitionOptions); + + auto receivedEvents = receiver.ReceiveEvents(1); + ASSERT_EQ(1ul, receivedEvents.size()); + for (auto const& event : receivedEvents) + { + GTEST_LOG_(INFO) << "Event: " << event; + EXPECT_TRUE(event.EnqueuedTime); + EXPECT_TRUE(event.Offset); + EXPECT_TRUE(event.SequenceNumber); + } + std::vector expected{1, 2, 3, 4, 5, 6, 7}; + + EXPECT_EQ(expected, receivedEvents[0].Body); + ASSERT_TRUE(receivedEvents[0].ContentType); + EXPECT_EQ("application/binary", receivedEvents[0].ContentType.Value()); + ASSERT_TRUE(receivedEvents[0].MessageId); + EXPECT_EQ("Test Message Id", static_cast(receivedEvents[0].MessageId.Value())); + } + } + +}}}} // namespace Azure::Messaging::EventHubs::Test diff --git a/sdk/eventhubs/ci.yml b/sdk/eventhubs/ci.yml index 1a129b947..b5343f0d9 100644 --- a/sdk/eventhubs/ci.yml +++ b/sdk/eventhubs/ci.yml @@ -49,6 +49,12 @@ stages: Value: "non-real-secret" - Name: AZURE_SUBSCRIPTION_ID Value: "non-real-sub" + - Name: EVENTHUB_CONSUMER_GROUP + Value: "defaultgroup" + - Name: EVENTHUB_NAME + Value: "non-real-eventhub-name" + - Name: EVENTHUB_CONNECTION_STRING + Value: "Endpoint=sb://notReal.servicebus.windows.net/;SharedAccessKeyName=notReal" - Name: CHECKPOINTSTORE_STORAGE_CONNECTION_STRING Value: "DefaultEndpointsProtocol=https;AccountName=notReal;AccountKey=3333333333333333333333333333333333333333333333333333333333333333333333333333333333333333;EndpointSuffix=core.windows.net"