GeoDR preparation - EventHubs Offset is a string, not an integer encoded as a string (#6461)

* GeoDR preparation - EventHubs Offset is a string, not an integer encoded as a string

* clang-format
This commit is contained in:
Larry Osterman 2025-03-27 12:28:15 -07:00 committed by GitHub
parent ccd7e6b55e
commit 5652d3aecc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 47 additions and 93 deletions

View File

@ -29,7 +29,7 @@ void Azure::Messaging::EventHubs::BlobCheckpointStore::UpdateCheckpointImpl(
throw std::runtime_error("missing offset number");
}
checkpoint.Offset = std::stol(temp);
checkpoint.Offset = temp;
}
void Azure::Messaging::EventHubs::BlobCheckpointStore::UpdateOwnership(
@ -59,7 +59,7 @@ Azure::Messaging::EventHubs::BlobCheckpointStore::CreateCheckpointBlobMetadata(
if (checkpoint.Offset.HasValue())
{
metadata["offset"] = std::to_string(checkpoint.Offset.Value());
metadata["offset"] = checkpoint.Offset.Value();
}
return metadata;
}

View File

@ -67,7 +67,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
"event-hub-name",
"ns.servicebus.windows.net",
"partition-id",
101,
std::string("101"),
202,
});
@ -86,14 +86,14 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
EXPECT_EQ("ns.servicebus.windows.net", checkpoints[0].FullyQualifiedNamespaceName);
EXPECT_EQ("partition-id", checkpoints[0].PartitionId);
EXPECT_EQ(202, checkpoints[0].SequenceNumber.Value());
EXPECT_EQ(101, checkpoints[0].Offset.Value());
EXPECT_EQ("101", checkpoints[0].Offset.Value());
checkpointStore->UpdateCheckpoint(Azure::Messaging::EventHubs::Models::Checkpoint{
consumerGroup,
"event-hub-name",
"ns.servicebus.windows.net",
"partition-id",
102,
std::string("102"),
203,
});
@ -105,7 +105,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
EXPECT_EQ("ns.servicebus.windows.net", checkpoints[0].FullyQualifiedNamespaceName);
EXPECT_EQ("partition-id", checkpoints[0].PartitionId);
EXPECT_EQ(203, checkpoints[0].SequenceNumber.Value());
EXPECT_EQ(102, checkpoints[0].Offset.Value());
EXPECT_EQ("102", checkpoints[0].Offset.Value());
}
TEST_P(BlobCheckpointStoreTest, TestOwnerships)

View File

@ -53,7 +53,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models {
/// @brief The partition ID for the corresponding checkpoint.
std::string PartitionId{};
/// @brief The offset of the last successfully processed event.
Azure::Nullable<int64_t> Offset{};
Azure::Nullable<std::string> Offset{};
/// @brief The sequence number of the last successfully processed event.
Azure::Nullable<int64_t> SequenceNumber{};

View File

@ -127,7 +127,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models {
* The offset is a marker or identifier for an event within the Event Hubs stream.
* The identifier is unique within a partition of the Event Hubs stream.
*/
Azure::Nullable<std::uint64_t> Offset;
Azure::Nullable<std::string> Offset;
/** @brief The partition key for sending a message to a partition.
*

View File

@ -41,7 +41,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models {
/** The sequence number of the last observed event to be enqueued in the partition. */
int64_t LastEnqueuedSequenceNumber{};
/** The offset of the last observed event to be enqueued in the partition */
int64_t LastEnqueuedOffset{};
std::string LastEnqueuedOffset{};
/** The date and time, in UTC, that the last observed event was enqueued in the partition. */
Azure::DateTime LastEnqueuedTimeUtc;

View File

@ -21,7 +21,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models {
*@remark NOTE: offsets are not stable values, and might refer to different events over time
* as the Event Hub events reach their age limit and are discarded.
*/
Azure::Nullable<int64_t> Offset;
Azure::Nullable<std::string> Offset;
/**@brief SequenceNumber will start the consumer after the specified sequence number. Can be
* exclusive or inclusive, based on the Inclusive property.

View File

@ -56,24 +56,12 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models {
auto dateTime = Azure::DateTime{Azure::DateTime::time_point{timePoint}};
EnqueuedTime = dateTime;
}
else if (key == _detail::OffsetNumberAnnotation)
else if (key == _detail::OffsetAnnotation)
{
switch (item.second.GetType())
{
case Azure::Core::Amqp::Models::AmqpValueType::Ulong:
Offset = item.second;
break;
case Azure::Core::Amqp::Models::AmqpValueType::Long:
Offset = static_cast<int64_t>(item.second);
break;
case Azure::Core::Amqp::Models::AmqpValueType::Uint:
Offset = static_cast<uint32_t>(item.second);
break;
case Azure::Core::Amqp::Models::AmqpValueType::Int:
Offset = static_cast<int32_t>(item.second);
break;
case Azure::Core::Amqp::Models::AmqpValueType::String:
Offset = std::strtoul(static_cast<std::string>(item.second).c_str(), nullptr, 10);
Offset = static_cast<std::string>(item.second);
break;
default:
break;

View File

@ -65,7 +65,7 @@ namespace Azure { namespace Messaging { namespace EventHubs {
throw std::runtime_error(expressionErrorText);
}
returnValue = "amqp.annotation.x-opt-offset " + greaterThan + "'"
+ std::to_string(startPosition.Offset.Value()) + "'";
+ startPosition.Offset.Value() + "'";
}
if (startPosition.SequenceNumber.HasValue())
{

View File

@ -13,7 +13,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail
constexpr const char* PartitionKeyAnnotation = "x-opt-partition-key";
constexpr const char* SequenceNumberAnnotation = "x-opt-sequence-number";
constexpr const char* OffsetNumberAnnotation = "x-opt-offset";
constexpr const char* OffsetAnnotation = "x-opt-offset";
constexpr const char* EnqueuedTimeAnnotation = "x-opt-enqueued-time";
constexpr const char* EventHubsServiceScheme = "amqps://";

View File

@ -207,8 +207,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail
properties.BeginningSequenceNumber = bodyMap["begin_sequence_number"];
properties.LastEnqueuedSequenceNumber = bodyMap["last_enqueued_sequence_number"];
// For <reasons> the last enqueued offset is returned as a string. Convert to an int64.
properties.LastEnqueuedOffset = std::strtoull(
static_cast<std::string>(bodyMap["last_enqueued_offset"]).c_str(), nullptr, 10);
properties.LastEnqueuedOffset = static_cast<std::string>(bodyMap["last_enqueued_offset"]);
properties.LastEnqueuedTimeUtc = Azure::DateTime(std::chrono::system_clock::from_time_t(
std::chrono::duration_cast<std::chrono::seconds>(

View File

@ -31,7 +31,7 @@ namespace Azure { namespace Messaging { namespace EventHubs {
{
Azure::Nullable<int64_t> sequenceNumber;
Azure::Nullable<int64_t> offsetNumber;
Azure::Nullable<std::string> offset;
for (auto const& pair : amqpMessage.MessageAnnotations)
{
@ -43,13 +43,10 @@ namespace Azure { namespace Messaging { namespace EventHubs {
|| pair.second.GetType() == Azure::Core::Amqp::Models::AmqpValueType::Ulong)
sequenceNumber = static_cast<int64_t>(pair.second);
}
if (pair.first == _detail::OffsetNumberAnnotation)
if (pair.first == _detail::OffsetAnnotation)
{
if (pair.second.GetType() == Azure::Core::Amqp::Models::AmqpValueType::Int
|| pair.second.GetType() == Azure::Core::Amqp::Models::AmqpValueType::Uint
|| pair.second.GetType() == Azure::Core::Amqp::Models::AmqpValueType::Long
|| pair.second.GetType() == Azure::Core::Amqp::Models::AmqpValueType::Ulong)
offsetNumber = static_cast<int64_t>(pair.second);
if (pair.second.GetType() == Azure::Core::Amqp::Models::AmqpValueType::String)
offset = static_cast<std::string>(pair.second);
}
}
@ -58,8 +55,8 @@ namespace Azure { namespace Messaging { namespace EventHubs {
m_consumerClientDetails.EventHubName,
m_consumerClientDetails.FullyQualifiedNamespace,
m_partitionId,
sequenceNumber,
offsetNumber};
offset,
sequenceNumber};
m_checkpointStore->UpdateCheckpoint(checkpoint, context);
}
@ -77,7 +74,7 @@ namespace Azure { namespace Messaging { namespace EventHubs {
{
sequenceNumber = eventData->SequenceNumber.Value();
}
uint64_t offset{};
std::string offset{};
if (!eventData->Offset.HasValue())
{
offset = eventData->Offset.Value();

View File

@ -49,7 +49,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
"event-hub-name",
"ns.servicebus.windows.net",
"partition-id",
101,
std::string("101"),
202,
});
@ -68,14 +68,14 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
EXPECT_EQ("ns.servicebus.windows.net", checkpoints[0].FullyQualifiedNamespaceName);
EXPECT_EQ("partition-id", checkpoints[0].PartitionId);
EXPECT_EQ(202, checkpoints[0].SequenceNumber.Value());
EXPECT_EQ(101, checkpoints[0].Offset.Value());
EXPECT_EQ("101", checkpoints[0].Offset.Value());
checkpointStore->UpdateCheckpoint(Azure::Messaging::EventHubs::Models::Checkpoint{
consumerGroup,
"event-hub-name",
"ns.servicebus.windows.net",
"partition-id",
102,
std::string("102"),
203,
});
@ -87,7 +87,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
EXPECT_EQ("ns.servicebus.windows.net", checkpoints[0].FullyQualifiedNamespaceName);
EXPECT_EQ("partition-id", checkpoints[0].PartitionId);
EXPECT_EQ(203, checkpoints[0].SequenceNumber.Value());
EXPECT_EQ(102, checkpoints[0].Offset.Value());
EXPECT_EQ("102", checkpoints[0].Offset.Value());
}
TEST_F(CheckpointStoreTest, TestOwnerships)

View File

@ -204,12 +204,11 @@ TEST_F(EventDataTest, ReceivedEventData)
std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage> message{
std::make_shared<Azure::Core::Amqp::Models::AmqpMessage>()};
message->MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation}
Azure::Messaging::EventHubs::_detail::OffsetAnnotation}
.AsAmqpValue()]
= 54644;
Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message);
ASSERT_TRUE(receivedEventData.Offset);
EXPECT_EQ(receivedEventData.Offset.Value(), 54644);
ASSERT_FALSE(receivedEventData.Offset); // Offset must be a string value, not a numeric value.
EXPECT_FALSE(receivedEventData.SequenceNumber);
EXPECT_FALSE(receivedEventData.EnqueuedTime);
EXPECT_FALSE(receivedEventData.PartitionKey);
@ -218,12 +217,12 @@ TEST_F(EventDataTest, ReceivedEventData)
std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage> message{
std::make_shared<Azure::Core::Amqp::Models::AmqpMessage>()};
message->MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation}
Azure::Messaging::EventHubs::_detail::OffsetAnnotation}
.AsAmqpValue()]
= "54644";
Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message);
ASSERT_TRUE(receivedEventData.Offset);
EXPECT_EQ(receivedEventData.Offset.Value(), 54644);
EXPECT_EQ(receivedEventData.Offset.Value(), "54644");
EXPECT_FALSE(receivedEventData.SequenceNumber);
EXPECT_FALSE(receivedEventData.EnqueuedTime);
EXPECT_FALSE(receivedEventData.PartitionKey);
@ -232,54 +231,12 @@ TEST_F(EventDataTest, ReceivedEventData)
std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage> message{
std::make_shared<Azure::Core::Amqp::Models::AmqpMessage>()};
message->MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation}
Azure::Messaging::EventHubs::_detail::OffsetAnnotation}
.AsAmqpValue()]
= static_cast<uint32_t>(53);
= "53";
Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message);
ASSERT_TRUE(receivedEventData.Offset);
EXPECT_EQ(receivedEventData.Offset.Value(), 53);
EXPECT_FALSE(receivedEventData.SequenceNumber);
EXPECT_FALSE(receivedEventData.EnqueuedTime);
EXPECT_FALSE(receivedEventData.PartitionKey);
}
{
std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage> message{
std::make_shared<Azure::Core::Amqp::Models::AmqpMessage>()};
message->MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation}
.AsAmqpValue()]
= static_cast<int32_t>(57);
Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message);
EXPECT_TRUE(receivedEventData.Offset);
EXPECT_EQ(receivedEventData.Offset.Value(), 57);
EXPECT_FALSE(receivedEventData.SequenceNumber);
EXPECT_FALSE(receivedEventData.EnqueuedTime);
EXPECT_FALSE(receivedEventData.PartitionKey);
}
{
std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage> message{
std::make_shared<Azure::Core::Amqp::Models::AmqpMessage>()};
message->MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation}
.AsAmqpValue()]
= static_cast<uint64_t>(661011);
Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message);
EXPECT_TRUE(receivedEventData.Offset);
EXPECT_EQ(receivedEventData.Offset.Value(), 661011);
EXPECT_FALSE(receivedEventData.SequenceNumber);
EXPECT_FALSE(receivedEventData.EnqueuedTime);
EXPECT_FALSE(receivedEventData.PartitionKey);
}
{
std::shared_ptr<Azure::Core::Amqp::Models::AmqpMessage> message{
std::make_shared<Azure::Core::Amqp::Models::AmqpMessage>()};
message->MessageAnnotations[Azure::Core::Amqp::Models::AmqpSymbol{
Azure::Messaging::EventHubs::_detail::OffsetNumberAnnotation}
.AsAmqpValue()]
= static_cast<int64_t>(1412612);
Azure::Messaging::EventHubs::Models::ReceivedEventData receivedEventData(message);
EXPECT_TRUE(receivedEventData.Offset);
EXPECT_EQ(receivedEventData.Offset.Value(), 1412612);
EXPECT_EQ(receivedEventData.Offset.Value(), "53");
EXPECT_FALSE(receivedEventData.SequenceNumber);
EXPECT_FALSE(receivedEventData.EnqueuedTime);
EXPECT_FALSE(receivedEventData.PartitionKey);

View File

@ -584,6 +584,19 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
Azure::Core::Context context{Azure::DateTime::clock::now() + std::chrono::milliseconds(50)};
EXPECT_ANY_THROW(processor.NextPartitionClient(context));
{
auto partitionClientIterator = partitionClients.begin();
auto partitionClient = partitionClientIterator->second;
GTEST_LOG_(INFO) << "Erase client for partition " << partitionClientIterator->first;
partitionClients.erase(partitionClientIterator);
partitionClient->Close();
GTEST_LOG_(INFO) << "Get next partition client after releasing one.";
auto partitionClientNew = processor.NextPartitionClient();
GTEST_LOG_(INFO) << "Found client for partition " << partitionClientNew->PartitionId();
}
while (!partitionClients.empty())
{
auto partitionClientIterator = partitionClients.begin();

View File

@ -49,7 +49,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
// Round trip a message with a binary body using an offset filter.
TEST_P(RoundTripTests, SendAndReceiveBinaryDataOffset_LIVEONLY_)
{
int64_t startOffset = 0;
std::string startOffset = "0";
{
auto producer{CreateProducerClient()};
auto partitionProperties = producer->GetPartitionProperties("1");