From 5652d3aeccd4b2c6e45572ac468179682c6574e6 Mon Sep 17 00:00:00 2001 From: Larry Osterman Date: Thu, 27 Mar 2025 12:28:15 -0700 Subject: [PATCH] GeoDR preparation - EventHubs Offset is a string, not an integer encoded as a string (#6461) * GeoDR preparation - EventHubs Offset is a string, not an integer encoded as a string * clang-format --- .../src/blob_checkpoint_store.cpp | 4 +- .../test/ut/blob_checkpoint_store_test.cpp | 8 +-- .../models/checkpoint_store_models.hpp | 2 +- .../messaging/eventhubs/models/event_data.hpp | 2 +- .../eventhubs/models/management_models.hpp | 2 +- .../models/partition_client_models.hpp | 2 +- .../src/event_data.cpp | 16 +----- .../src/partition_client.cpp | 2 +- .../src/private/eventhubs_constants.hpp | 2 +- .../src/private/eventhubs_utilities.hpp | 3 +- .../src/processor_partition_client.cpp | 17 +++--- .../test/ut/checkpoint_store_test.cpp | 8 +-- .../test/ut/event_data_test.cpp | 57 +++---------------- .../test/ut/processor_test.cpp | 13 +++++ .../test/ut/round_trip_test.cpp | 2 +- 15 files changed, 47 insertions(+), 93 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/blob_checkpoint_store.cpp b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/blob_checkpoint_store.cpp index 1f51cec0d..19cd0e781 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/blob_checkpoint_store.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/src/blob_checkpoint_store.cpp @@ -29,7 +29,7 @@ void Azure::Messaging::EventHubs::BlobCheckpointStore::UpdateCheckpointImpl( throw std::runtime_error("missing offset number"); } - checkpoint.Offset = std::stol(temp); + checkpoint.Offset = temp; } void Azure::Messaging::EventHubs::BlobCheckpointStore::UpdateOwnership( @@ -59,7 +59,7 @@ Azure::Messaging::EventHubs::BlobCheckpointStore::CreateCheckpointBlobMetadata( if (checkpoint.Offset.HasValue()) { - metadata["offset"] = std::to_string(checkpoint.Offset.Value()); + metadata["offset"] = checkpoint.Offset.Value(); } return metadata; } diff --git a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/test/ut/blob_checkpoint_store_test.cpp b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/test/ut/blob_checkpoint_store_test.cpp index 1b7233410..6487653d5 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/test/ut/blob_checkpoint_store_test.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/test/ut/blob_checkpoint_store_test.cpp @@ -67,7 +67,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { "event-hub-name", "ns.servicebus.windows.net", "partition-id", - 101, + std::string("101"), 202, }); @@ -86,14 +86,14 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { EXPECT_EQ("ns.servicebus.windows.net", checkpoints[0].FullyQualifiedNamespaceName); EXPECT_EQ("partition-id", checkpoints[0].PartitionId); EXPECT_EQ(202, checkpoints[0].SequenceNumber.Value()); - EXPECT_EQ(101, checkpoints[0].Offset.Value()); + EXPECT_EQ("101", checkpoints[0].Offset.Value()); checkpointStore->UpdateCheckpoint(Azure::Messaging::EventHubs::Models::Checkpoint{ consumerGroup, "event-hub-name", "ns.servicebus.windows.net", "partition-id", - 102, + std::string("102"), 203, }); @@ -105,7 +105,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { EXPECT_EQ("ns.servicebus.windows.net", checkpoints[0].FullyQualifiedNamespaceName); EXPECT_EQ("partition-id", checkpoints[0].PartitionId); EXPECT_EQ(203, checkpoints[0].SequenceNumber.Value()); - EXPECT_EQ(102, checkpoints[0].Offset.Value()); + EXPECT_EQ("102", checkpoints[0].Offset.Value()); } TEST_P(BlobCheckpointStoreTest, TestOwnerships) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/checkpoint_store_models.hpp b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/checkpoint_store_models.hpp index d8455555d..db4be9685 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/checkpoint_store_models.hpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/checkpoint_store_models.hpp @@ -53,7 +53,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models { /// @brief The partition ID for the corresponding checkpoint. std::string PartitionId{}; /// @brief The offset of the last successfully processed event. - Azure::Nullable Offset{}; + Azure::Nullable Offset{}; /// @brief The sequence number of the last successfully processed event. Azure::Nullable SequenceNumber{}; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/event_data.hpp b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/event_data.hpp index c5844b21b..cdf53042a 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/event_data.hpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/event_data.hpp @@ -127,7 +127,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models { * The offset is a marker or identifier for an event within the Event Hubs stream. * The identifier is unique within a partition of the Event Hubs stream. */ - Azure::Nullable Offset; + Azure::Nullable Offset; /** @brief The partition key for sending a message to a partition. * diff --git a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/management_models.hpp b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/management_models.hpp index 5ac0b3102..530701b8f 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/management_models.hpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/management_models.hpp @@ -41,7 +41,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models { /** The sequence number of the last observed event to be enqueued in the partition. */ int64_t LastEnqueuedSequenceNumber{}; /** The offset of the last observed event to be enqueued in the partition */ - int64_t LastEnqueuedOffset{}; + std::string LastEnqueuedOffset{}; /** The date and time, in UTC, that the last observed event was enqueued in the partition. */ Azure::DateTime LastEnqueuedTimeUtc; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/partition_client_models.hpp b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/partition_client_models.hpp index 66e07981f..532807f08 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/partition_client_models.hpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/models/partition_client_models.hpp @@ -21,7 +21,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models { *@remark NOTE: offsets are not stable values, and might refer to different events over time * as the Event Hub events reach their age limit and are discarded. */ - Azure::Nullable Offset; + Azure::Nullable Offset; /**@brief SequenceNumber will start the consumer after the specified sequence number. Can be * exclusive or inclusive, based on the Inclusive property. diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/event_data.cpp b/sdk/eventhubs/azure-messaging-eventhubs/src/event_data.cpp index 9e04f7848..f0565515c 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/event_data.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/event_data.cpp @@ -56,24 +56,12 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models { auto dateTime = Azure::DateTime{Azure::DateTime::time_point{timePoint}}; EnqueuedTime = dateTime; } - else if (key == _detail::OffsetNumberAnnotation) + else if (key == _detail::OffsetAnnotation) { switch (item.second.GetType()) { - case Azure::Core::Amqp::Models::AmqpValueType::Ulong: - Offset = item.second; - break; - case Azure::Core::Amqp::Models::AmqpValueType::Long: - Offset = static_cast(item.second); - break; - case Azure::Core::Amqp::Models::AmqpValueType::Uint: - Offset = static_cast(item.second); - break; - case Azure::Core::Amqp::Models::AmqpValueType::Int: - Offset = static_cast(item.second); - break; case Azure::Core::Amqp::Models::AmqpValueType::String: - Offset = std::strtoul(static_cast(item.second).c_str(), nullptr, 10); + Offset = static_cast(item.second); break; default: break; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/partition_client.cpp b/sdk/eventhubs/azure-messaging-eventhubs/src/partition_client.cpp index bf0c23594..af734ecbd 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/partition_client.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/partition_client.cpp @@ -65,7 +65,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { throw std::runtime_error(expressionErrorText); } returnValue = "amqp.annotation.x-opt-offset " + greaterThan + "'" - + std::to_string(startPosition.Offset.Value()) + "'"; + + startPosition.Offset.Value() + "'"; } if (startPosition.SequenceNumber.HasValue()) { diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/private/eventhubs_constants.hpp b/sdk/eventhubs/azure-messaging-eventhubs/src/private/eventhubs_constants.hpp index 70210181c..248fcb975 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/private/eventhubs_constants.hpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/private/eventhubs_constants.hpp @@ -13,7 +13,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail constexpr const char* PartitionKeyAnnotation = "x-opt-partition-key"; constexpr const char* SequenceNumberAnnotation = "x-opt-sequence-number"; - constexpr const char* OffsetNumberAnnotation = "x-opt-offset"; + constexpr const char* OffsetAnnotation = "x-opt-offset"; constexpr const char* EnqueuedTimeAnnotation = "x-opt-enqueued-time"; constexpr const char* EventHubsServiceScheme = "amqps://"; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/private/eventhubs_utilities.hpp b/sdk/eventhubs/azure-messaging-eventhubs/src/private/eventhubs_utilities.hpp index e0b77107a..58d723373 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/private/eventhubs_utilities.hpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/private/eventhubs_utilities.hpp @@ -207,8 +207,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail properties.BeginningSequenceNumber = bodyMap["begin_sequence_number"]; properties.LastEnqueuedSequenceNumber = bodyMap["last_enqueued_sequence_number"]; // For the last enqueued offset is returned as a string. Convert to an int64. - properties.LastEnqueuedOffset = std::strtoull( - static_cast(bodyMap["last_enqueued_offset"]).c_str(), nullptr, 10); + properties.LastEnqueuedOffset = static_cast(bodyMap["last_enqueued_offset"]); properties.LastEnqueuedTimeUtc = Azure::DateTime(std::chrono::system_clock::from_time_t( std::chrono::duration_cast( diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/processor_partition_client.cpp b/sdk/eventhubs/azure-messaging-eventhubs/src/processor_partition_client.cpp index ba8899fd5..480c8b5fc 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/processor_partition_client.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/processor_partition_client.cpp @@ -31,7 +31,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { { Azure::Nullable sequenceNumber; - Azure::Nullable offsetNumber; + Azure::Nullable offset; for (auto const& pair : amqpMessage.MessageAnnotations) { @@ -43,13 +43,10 @@ namespace Azure { namespace Messaging { namespace EventHubs { || pair.second.GetType() == Azure::Core::Amqp::Models::AmqpValueType::Ulong) sequenceNumber = static_cast(pair.second); } - if (pair.first == _detail::OffsetNumberAnnotation) + if (pair.first == _detail::OffsetAnnotation) { - if (pair.second.GetType() == Azure::Core::Amqp::Models::AmqpValueType::Int - || pair.second.GetType() == Azure::Core::Amqp::Models::AmqpValueType::Uint - || pair.second.GetType() == Azure::Core::Amqp::Models::AmqpValueType::Long - || pair.second.GetType() == Azure::Core::Amqp::Models::AmqpValueType::Ulong) - offsetNumber = static_cast(pair.second); + if (pair.second.GetType() == Azure::Core::Amqp::Models::AmqpValueType::String) + offset = static_cast(pair.second); } } @@ -58,8 +55,8 @@ namespace Azure { namespace Messaging { namespace EventHubs { m_consumerClientDetails.EventHubName, m_consumerClientDetails.FullyQualifiedNamespace, m_partitionId, - sequenceNumber, - offsetNumber}; + offset, + sequenceNumber}; m_checkpointStore->UpdateCheckpoint(checkpoint, context); } @@ -77,7 +74,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { { sequenceNumber = eventData->SequenceNumber.Value(); } - uint64_t offset{}; + std::string offset{}; if (!eventData->Offset.HasValue()) { offset = eventData->Offset.Value(); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/checkpoint_store_test.cpp b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/checkpoint_store_test.cpp index b203dffb0..155fd1234 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/checkpoint_store_test.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/checkpoint_store_test.cpp @@ -49,7 +49,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { "event-hub-name", "ns.servicebus.windows.net", "partition-id", - 101, + std::string("101"), 202, }); @@ -68,14 +68,14 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { EXPECT_EQ("ns.servicebus.windows.net", checkpoints[0].FullyQualifiedNamespaceName); EXPECT_EQ("partition-id", checkpoints[0].PartitionId); EXPECT_EQ(202, checkpoints[0].SequenceNumber.Value()); - EXPECT_EQ(101, checkpoints[0].Offset.Value()); + EXPECT_EQ("101", checkpoints[0].Offset.Value()); checkpointStore->UpdateCheckpoint(Azure::Messaging::EventHubs::Models::Checkpoint{ consumerGroup, "event-hub-name", "ns.servicebus.windows.net", "partition-id", - 102, + std::string("102"), 203, }); @@ -87,7 +87,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { EXPECT_EQ("ns.servicebus.windows.net", checkpoints[0].FullyQualifiedNamespaceName); EXPECT_EQ("partition-id", checkpoints[0].PartitionId); EXPECT_EQ(203, checkpoints[0].SequenceNumber.Value()); - EXPECT_EQ(102, checkpoints[0].Offset.Value()); + EXPECT_EQ("102", checkpoints[0].Offset.Value()); } TEST_F(CheckpointStoreTest, TestOwnerships) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/event_data_test.cpp b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/event_data_test.cpp index a6c18a05e..1f899aeac 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/event_data_test.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/event_data_test.cpp @@ -204,12 +204,11 @@ TEST_F(EventDataTest, ReceivedEventData) std::shared_ptr message{ std::make_shared()}; message->MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{ - Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation} + Azure::Messaging::EventHubs::_detail::OffsetAnnotation} .AsAmqpValue()] = 54644; Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message); - ASSERT_TRUE(receivedEventData.Offset); - EXPECT_EQ(receivedEventData.Offset.Value(), 54644); + ASSERT_FALSE(receivedEventData.Offset); // Offset must be a string value, not a numeric value. EXPECT_FALSE(receivedEventData.SequenceNumber); EXPECT_FALSE(receivedEventData.EnqueuedTime); EXPECT_FALSE(receivedEventData.PartitionKey); @@ -218,12 +217,12 @@ TEST_F(EventDataTest, ReceivedEventData) std::shared_ptr message{ std::make_shared()}; message->MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{ - Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation} + Azure::Messaging::EventHubs::_detail::OffsetAnnotation} .AsAmqpValue()] = "54644"; Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message); ASSERT_TRUE(receivedEventData.Offset); - EXPECT_EQ(receivedEventData.Offset.Value(), 54644); + EXPECT_EQ(receivedEventData.Offset.Value(), "54644"); EXPECT_FALSE(receivedEventData.SequenceNumber); EXPECT_FALSE(receivedEventData.EnqueuedTime); EXPECT_FALSE(receivedEventData.PartitionKey); @@ -232,54 +231,12 @@ TEST_F(EventDataTest, ReceivedEventData) std::shared_ptr message{ std::make_shared()}; message->MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{ - Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation} + Azure::Messaging::EventHubs::_detail::OffsetAnnotation} .AsAmqpValue()] - = static_cast(53); + = "53"; Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message); ASSERT_TRUE(receivedEventData.Offset); - EXPECT_EQ(receivedEventData.Offset.Value(), 53); - EXPECT_FALSE(receivedEventData.SequenceNumber); - EXPECT_FALSE(receivedEventData.EnqueuedTime); - EXPECT_FALSE(receivedEventData.PartitionKey); - } - { - std::shared_ptr message{ - std::make_shared()}; - message->MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{ - Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation} - .AsAmqpValue()] - = static_cast(57); - Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message); - EXPECT_TRUE(receivedEventData.Offset); - EXPECT_EQ(receivedEventData.Offset.Value(), 57); - EXPECT_FALSE(receivedEventData.SequenceNumber); - EXPECT_FALSE(receivedEventData.EnqueuedTime); - EXPECT_FALSE(receivedEventData.PartitionKey); - } - { - std::shared_ptr message{ - std::make_shared()}; - message->MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{ - Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation} - .AsAmqpValue()] - = static_cast(661011); - Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message); - EXPECT_TRUE(receivedEventData.Offset); - EXPECT_EQ(receivedEventData.Offset.Value(), 661011); - EXPECT_FALSE(receivedEventData.SequenceNumber); - EXPECT_FALSE(receivedEventData.EnqueuedTime); - EXPECT_FALSE(receivedEventData.PartitionKey); - } - { - std::shared_ptr message{ - std::make_shared()}; - message->MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{ - Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation} - .AsAmqpValue()] - = static_cast(1412612); - Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message); - EXPECT_TRUE(receivedEventData.Offset); - EXPECT_EQ(receivedEventData.Offset.Value(), 1412612); + EXPECT_EQ(receivedEventData.Offset.Value(), "53"); EXPECT_FALSE(receivedEventData.SequenceNumber); EXPECT_FALSE(receivedEventData.EnqueuedTime); EXPECT_FALSE(receivedEventData.PartitionKey); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/processor_test.cpp b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/processor_test.cpp index fbdd06886..98651ec2b 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/processor_test.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/processor_test.cpp @@ -584,6 +584,19 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { Azure::Core::Context context{Azure::DateTime::clock::now() + std::chrono::milliseconds(50)}; EXPECT_ANY_THROW(processor.NextPartitionClient(context)); + { + auto partitionClientIterator = partitionClients.begin(); + auto partitionClient = partitionClientIterator->second; + GTEST_LOG_(INFO) << "Erase client for partition " << partitionClientIterator->first; + partitionClients.erase(partitionClientIterator); + partitionClient->Close(); + + GTEST_LOG_(INFO) << "Get next partition client after releasing one."; + auto partitionClientNew = processor.NextPartitionClient(); + + GTEST_LOG_(INFO) << "Found client for partition " << partitionClientNew->PartitionId(); + } + while (!partitionClients.empty()) { auto partitionClientIterator = partitionClients.begin(); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/round_trip_test.cpp b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/round_trip_test.cpp index 24df9573e..0df0d8275 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/test/ut/round_trip_test.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/test/ut/round_trip_test.cpp @@ -49,7 +49,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test { // Round trip a message with a binary body using an offset filter. TEST_P(RoundTripTests, SendAndReceiveBinaryDataOffset_LIVEONLY_) { - int64_t startOffset = 0; + std::string startOffset = "0"; { auto producer{CreateProducerClient()}; auto partitionProperties = producer->GetPartitionProperties("1");