Removed storage dependency from eventhubs (#4954)

* Removed storage dependency from eventhubs

---------

Co-authored-by: Anton Kolesnyk <41349689+antkmsft@users.noreply.github.com>
This commit is contained in:
Larry Osterman 2023-09-22 08:06:58 -07:00 committed by GitHub
parent 5f579513c2
commit 336c8c02fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 1078 additions and 324 deletions

View File

@ -90,6 +90,22 @@
},
"architecture": "win32"
},
{
"name": "x64-msvc-static",
"displayName": "Windows x64 MSVC Static",
"description": "Windows Default, MSVC, x64 architecture.",
"inherits": "msvc-windows-default",
"hidden": true,
"cacheVariables": {
"VCPKG_TARGET_TRIPLET": "x64-windows-static",
"MSVC_USE_STATIC_CRT": true
},
"architecture": {
"value": "x64",
"strategy": "external"
}
},
{
"name": "x64",
"displayName": "Windows x64",
@ -305,6 +321,16 @@
"displayName": "x86 MSVC Debug static With Perf Tests and samples",
"inherits": [ "x86-msvc-static", "debug-build", "enable-tests", "enable-perf", "enable-samples", "curl-transport", "winhttp-transport" ]
},
{
"name": "x64-msvc-static-debug-perftests",
"displayName": "x64 MSVC Debug static With Perf Tests and samples",
"inherits": [ "x64-msvc-static", "debug-build", "enable-tests", "enable-perf", "enable-samples", "curl-transport", "winhttp-transport" ]
},
{
"name": "x64-msvc-static-release-perftests",
"displayName": "x64 MSVC Release static With Perf Tests and samples",
"inherits": [ "x64-msvc-static", "release-build", "enable-tests", "enable-perf", "enable-samples", "curl-transport", "winhttp-transport" ]
},
{
"name": "x64-static-release-perftests",
"displayName": "x64 Release With Perf Tests, static",

View File

@ -6,7 +6,7 @@
# Usage: generate_documentation(azure-core 1.0.0-preview.1)
# Requires: Doxygen
# Target name in the form of ${PROJECT_NAME}-docs (e.g. azure-core-docs)
# Note that PROJECT_NAME is also the directory containing the package.
function(generate_documentation PROJECT_NAME PROJECT_VERSION)
if(BUILD_DOCUMENTATION)
find_package(Doxygen 1.9.7 REQUIRED doxygen)

View File

@ -6,6 +6,7 @@ set(AZ_ROOT_DIR "${CMAKE_CURRENT_LIST_DIR}/..")
macro(az_vcpkg_integrate)
message("Vcpkg integrate step.")
# AUTO CMAKE_TOOLCHAIN_FILE:
# User can call `cmake -DCMAKE_TOOLCHAIN_FILE="path_to_the_toolchain"` as the most specific scenario.
# As the last alternative (default case), Azure SDK will automatically clone VCPKG folder and set toolchain from there.
@ -17,7 +18,7 @@ macro(az_vcpkg_integrate)
message("AZURE_SDK_DISABLE_AUTO_VCPKG is not defined. Fetch a local copy of vcpkg.")
# GET VCPKG FROM SOURCE
# User can set env var AZURE_SDK_VCPKG_COMMIT to pick the VCPKG commit to fetch
set(VCPKG_COMMIT_STRING dafef74af53669ef1cc9015f55e0ce809ead62aa) # default SDK tested commit
set(VCPKG_COMMIT_STRING 33409307f1e3411112a0a6bbf3011ea3cca1bfc9) # default SDK tested commit
if(DEFINED ENV{AZURE_SDK_VCPKG_COMMIT})
message("AZURE_SDK_VCPKG_COMMIT is defined. Using that instead of the default.")
set(VCPKG_COMMIT_STRING "$ENV{AZURE_SDK_VCPKG_COMMIT}") # default SDK tested commit

View File

@ -33,6 +33,10 @@ macro(GetFolderList project)
elseif(${project} STREQUAL EVENTHUBS)
DownloadDepVersion(sdk/core azure-core 1.10.1)
DownloadDepVersion(sdk/core azure-core-amqp 1.0.0-beta.1)
elseif(${project} STREQUAL EVENTHUBS_CHECKPOINT_BLOB)
DownloadDepVersion(sdk/core azure-core 1.10.1)
DownloadDepVersion(sdk/core azure-core-amqp 1.0.0-beta.1)
DownloadDepVersion(sdk/eventhubs azure-messaging-eventhubs 1.0.0-beta.3)
DownloadDepVersion(sdk/storage/azure-storage-common azure-storage-common 12.3.3)
DownloadDepVersion(sdk/storage/azure-storage-blobs azure-storage-blobs 12.8.0)
endif()

View File

@ -254,7 +254,7 @@ jobs:
}
displayName: Copy CHANGELOG.md to package artifact
- script: cmake --build . --target ${{ artifact.Name }}-docs
- script: cmake --build . --target ${{ artifact.Path }}-docs
workingDirectory: build
displayName: Generate docs (${{ artifact.Name }}-docs)

View File

@ -9,3 +9,5 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_WINDOWS_EXPORT_ALL_SYMBOLS ON)
add_subdirectory(azure-messaging-eventhubs)
#add_subdirectory(azure-messaging-eventhubs-checkpointstore-blob)
add_subdirectory(blob-store)

View File

@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "cpp",
"TagPrefix": "cpp/eventhubs",
"Tag": "cpp/eventhubs_ea4655bf2e"
"Tag": "cpp/eventhubs_d59a0a9f3c"
}

View File

@ -49,10 +49,6 @@ elseif(NOT AZ_ALL_LIBRARIES)
find_package(azure-core-amqp-cpp REQUIRED)
endif()
find_package(azure-storage-blobs-cpp CONFIG QUIET)
if(NOT azure-storage-blobs-cpp_FOUND)
find_package(azure-storage-blobs-cpp REQUIRED)
endif()
endif()
set(
@ -110,7 +106,7 @@ target_include_directories(
$<INSTALL_INTERFACE:include>
)
target_link_libraries(azure-messaging-eventhubs PUBLIC Azure::azure-core Azure::azure-core-amqp Azure::azure-storage-blobs)
target_link_libraries(azure-messaging-eventhubs PUBLIC Azure::azure-core Azure::azure-core-amqp)
# coverage. Has no effect if BUILD_CODE_COVERAGE is OFF
create_code_coverage(eventhubs azure-messaging-eventhubs azure-messaging-eventhubs-test "tests?/*;samples?/*")

View File

@ -6,7 +6,6 @@
#include <azure/core/context.hpp>
#include <azure/core/datetime.hpp>
#include <azure/core/nullable.hpp>
#include <azure/storage/blobs.hpp>
#include <sstream>
#include <stdexcept>
@ -65,68 +64,4 @@ namespace Azure { namespace Messaging { namespace EventHubs {
virtual ~CheckpointStore() = default;
};
/** @brief BlobCheckpointStore is an implementation of a CheckpointStore backed by Azure Blob
* Storage.
*/
class BlobCheckpointStore final : public CheckpointStore {
Azure::Storage::Blobs::BlobContainerClient m_containerClient;
void UpdateCheckpointImpl(
Azure::Storage::Metadata const& metadata,
Models::Checkpoint& checkpoint);
void UpdateOwnership(
Azure::Storage::Blobs::Models::BlobItem const& blob,
Models::Ownership& ownership);
Azure::Storage::Metadata CreateCheckpointBlobMetadata(Models::Checkpoint const& checkpoint);
std::pair<Azure::DateTime, Azure::ETag> SetMetadata(
std::string const& blobName,
Azure::Storage::Metadata const& metadata,
Azure::ETag const& etag,
Core::Context const& context = {});
public:
/** @brief Construct a BlobCheckpointStore from another BlobCheckpointStore.
*/
BlobCheckpointStore(BlobCheckpointStore const& other) = default;
/** @brief Assign a BlobCheckpointStore to another BlobCheckpointStore.
*/
BlobCheckpointStore& operator=(BlobCheckpointStore const& other) = default;
/**@brief Construct a BlobCheckpointStore.
*
* @param containerClient An Azure Blob ContainerClient used to hold the checkpoints.
*/
BlobCheckpointStore(Azure::Storage::Blobs::BlobContainerClient const& containerClient)
: CheckpointStore(), m_containerClient(containerClient)
{
m_containerClient.CreateIfNotExists();
}
std::vector<Models::Ownership> ClaimOwnership(
std::vector<Models::Ownership> const& partitionOwnership,
Core::Context const& context = {}) override;
std::vector<Models::Checkpoint> ListCheckpoints(
std::string const& fullyQualifiedNamespace,
std::string const& eventHubName,
std::string const& consumerGroup,
Core::Context const& context = {}) override;
/**@brief ListOwnership lists all ownerships.
*/
std::vector<Models::Ownership> ListOwnership(
std::string const& fullyQualifiedNamespace,
std::string const& eventHubName,
std::string const& consumerGroup,
Core::Context const& context = {}) override;
/**@brief UpdateCheckpoint updates a specific checkpoint with a sequence and offset.
*/
void UpdateCheckpoint(Models::Checkpoint const& checkpoint, Core::Context const& context = {})
override;
};
}}} // namespace Azure::Messaging::EventHubs

View File

@ -3,8 +3,8 @@
#pragma once
#include <azure/core/context.hpp>
#include <azure/core/datetime.hpp>
#include <azure/core/etag.hpp>
#include <azure/core/nullable.hpp>
#include <azure/storage/blobs.hpp>
#include <sstream>
#include <vector>

View File

@ -130,7 +130,7 @@ namespace Azure { namespace Messaging { namespace EventHubs {
*
* @param context The context to control the request lifetime.
*/
void Run(Core::Context const& context = {})
void Run(Core::Context const& context)
{
Models::EventHubProperties eventHubProperties
= m_consumerClient->GetEventHubProperties(context);
@ -140,12 +140,12 @@ namespace Azure { namespace Messaging { namespace EventHubs {
// = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
// const auto current = std::chrono::system_clock::from_time_t(timeNowSeconds);
// TODO : this is where we re load balance on the update interval
/* while (!context.IsCancelled())
{
std::this_thread::sleep_for(m_ownershipUpdateInterval);
Dispatch(eventHubProperties, consumers, context);
}*/
//// TODO : this is where we re load balance on the update interval
// while (!context.IsCancelled())
//{
// std::this_thread::sleep_for(m_ownershipUpdateInterval);
// Dispatch(eventHubProperties, consumers, context);
//}
}
/** @brief Dispatches events to the appropriate partition clients.

View File

@ -3,6 +3,7 @@
#include "azure/messaging/eventhubs/checkpoint_store.hpp"
#include <azure/core/internal/diagnostics/log.hpp>
#include <azure/core/internal/strings.hpp>
#include <stdexcept>
@ -55,211 +56,3 @@ std::string Azure::Messaging::EventHubs::Models::Checkpoint::GetCheckpointBlobNa
}
return GetCheckpointBlobPrefixName() + PartitionId;
}
void Azure::Messaging::EventHubs::BlobCheckpointStore::UpdateCheckpointImpl(
Azure::Storage::Metadata const& metadata,
Checkpoint& checkpoint)
{
std::string temp = metadata.at("sequencenumber");
if (temp.empty())
{
throw std::runtime_error("missing sequence number");
}
checkpoint.SequenceNumber = std::stol(temp);
temp = metadata.at("offset");
if (temp.empty())
{
throw std::runtime_error("missing offset number");
}
checkpoint.Offset = std::stol(temp);
}
void Azure::Messaging::EventHubs::BlobCheckpointStore::UpdateOwnership(
Azure::Storage::Blobs::Models::BlobItem const& blob,
Ownership& ownership)
{
std::string temp = blob.Details.Metadata.at("ownerid");
if (temp.empty())
{
throw std::runtime_error("missing sequence number");
}
ownership.OwnerId = temp;
ownership.LastModifiedTime = blob.Details.LastModified;
ownership.ETag = blob.Details.ETag;
}
Azure::Storage::Metadata
Azure::Messaging::EventHubs::BlobCheckpointStore::CreateCheckpointBlobMetadata(
Checkpoint const& checkpoint)
{
Azure::Storage::Metadata metadata;
if (checkpoint.SequenceNumber.HasValue())
{
metadata["sequencenumber"] = std::to_string(checkpoint.SequenceNumber.Value());
}
if (checkpoint.Offset.HasValue())
{
metadata["offset"] = std::to_string(checkpoint.Offset.Value());
}
return metadata;
}
std::vector<Ownership> Azure::Messaging::EventHubs::BlobCheckpointStore::ClaimOwnership(
std::vector<Ownership> const& partitionOwnership,
Core::Context const& context)
{
std::vector<Ownership> newOwnerships;
for (Ownership ownership : partitionOwnership)
{
std::string blobName = ownership.GetOwnershipName();
Azure::Storage::Metadata metadata;
metadata["ownerId"] = ownership.OwnerId;
try
{
std::pair<Azure::DateTime, Azure::ETag> result
= SetMetadata(blobName, metadata, ownership.ETag.ValueOr(Azure::ETag()), context);
if (result.second.HasValue())
{
Ownership newOwnership(ownership);
newOwnership.ETag = result.second;
newOwnership.LastModifiedTime = result.first;
newOwnerships.emplace_back(newOwnership);
}
}
catch (...)
{
// we can fail to claim ownership and that's okay - it's expected that clients will
// attempt to claim with whatever state they hold locally. If they fail it just means
// someone else claimed ownership before them.
continue;
}
}
return newOwnerships;
}
std::vector<Checkpoint> Azure::Messaging::EventHubs::BlobCheckpointStore::ListCheckpoints(
std::string const& fullyQualifiedNamespace,
std::string const& eventHubName,
std::string const& consumerGroup,
Core::Context const& context)
{
std::vector<Checkpoint> checkpoints;
std::string prefix = Models::Checkpoint{consumerGroup, eventHubName, fullyQualifiedNamespace}
.GetCheckpointBlobPrefixName();
Azure::Storage::Blobs::ListBlobsOptions listOptions;
listOptions.Prefix = prefix;
listOptions.Include = Azure::Storage::Blobs::Models::ListBlobsIncludeFlags::Metadata;
for (auto page = m_containerClient.ListBlobs(listOptions, context); page.HasPage();
page.MoveToNextPage())
{
for (auto& blob : page.Blobs)
{
std::string partitionId = blob.Name.substr(blob.Name.rfind('/') + 1);
Checkpoint c = Checkpoint{consumerGroup, eventHubName, fullyQualifiedNamespace, partitionId};
UpdateCheckpointImpl(blob.Details.Metadata, c);
checkpoints.push_back(c);
}
}
return checkpoints;
}
/**@brief ListOwnership lists all ownerships.
*/
std::vector<Ownership> Azure::Messaging::EventHubs::BlobCheckpointStore::ListOwnership(
std::string const& fullyQualifiedNamespace,
std::string const& eventHubName,
std::string const& consumerGroup,
Core::Context const& context)
{
std::vector<Ownership> ownerships;
std::string prefix
= Ownership{consumerGroup, eventHubName, fullyQualifiedNamespace}.GetOwnershipPrefixName();
Azure::Storage::Blobs::ListBlobsOptions listOptions;
listOptions.Prefix = prefix;
listOptions.Include = Azure::Storage::Blobs::Models::ListBlobsIncludeFlags::Metadata;
for (auto page = m_containerClient.ListBlobs(listOptions, context); page.HasPage();
page.MoveToNextPage())
{
for (auto& blob : page.Blobs)
{
std::string partitionId = blob.Name.substr(blob.Name.rfind('/') + 1);
Ownership o{consumerGroup, eventHubName, fullyQualifiedNamespace, partitionId};
UpdateOwnership(blob, o);
ownerships.push_back(o);
}
}
return ownerships;
}
/**@brief UpdateCheckpoint updates a specific checkpoint with a sequence and offset.
*/
void Azure::Messaging::EventHubs::BlobCheckpointStore::UpdateCheckpoint(
Checkpoint const& checkpoint,
Core::Context const& context)
{
std::string blobName = checkpoint.GetCheckpointBlobName();
SetMetadata(blobName, CreateCheckpointBlobMetadata(checkpoint), Azure::ETag(), context);
}
std::pair<Azure::DateTime, Azure::ETag>
Azure::Messaging::EventHubs::BlobCheckpointStore::SetMetadata(
std::string const& blobName,
Azure::Storage::Metadata const& metadata,
Azure::ETag const& etag,
Core::Context const& context)
{
auto blobClient = m_containerClient.GetBlockBlobClient(blobName);
std::pair<Azure::DateTime, Azure::ETag> returnValue;
Azure::Storage::Blobs::SetBlobMetadataOptions options;
try
{
if (etag.HasValue())
{
options.AccessConditions.IfMatch = etag;
}
Azure::Storage::Blobs::Models::SetBlobMetadataResult result
= blobClient.SetMetadata(metadata, options, context).Value;
returnValue = std::make_pair(result.LastModified, result.ETag);
}
catch (Azure::Core::RequestFailedException const& ex)
{
// Ignore HTTP code 412 meaning condition could not be met;
if (ex.StatusCode == Azure::Core::Http::HttpStatusCode::PreconditionFailed)
{
}
if (ex.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound)
{
Azure::Core::Diagnostics::_internal::Log::Write(
Azure::Core::Diagnostics::Logger::Level::Warning,
"Set Metadata failed with PreconditionFailed or NotFound.; Upload blob content");
std::string blobContent = "";
// throws when blob does not exist , we need to upload the blob in order to create it
std::vector<uint8_t> buffer(blobContent.begin(), blobContent.end());
Azure::Storage::Blobs::UploadBlockBlobFromOptions upOptions;
upOptions.Metadata = metadata;
Azure::Storage::Blobs::Models::UploadBlockBlobFromResult result
= blobClient.UploadFrom(buffer.data(), buffer.size(), upOptions, context).Value;
returnValue = std::make_pair(result.LastModified, result.ETag);
}
else
{
throw;
}
}
return returnValue;
}

View File

@ -2,6 +2,7 @@
// Licensed under the MIT License.
#include "eventhubs_test_base.hpp"
#include "test_checkpoint_store.hpp"
#include <azure/core/context.hpp>
#include <azure/identity.hpp>
@ -12,11 +13,7 @@
namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
class CheckpointStoreTest : public EventHubsTestBase {
virtual void SetUp() override
{
EventHubsTestBase::SetUp();
m_blobClientOptions = InitClientOptions<Azure::Storage::Blobs::BlobClientOptions>();
}
virtual void SetUp() override { EventHubsTestBase::SetUp(); }
protected:
std::string GetRandomName()
@ -32,17 +29,13 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
}
return name;
}
Azure::Storage::Blobs::BlobClientOptions m_blobClientOptions;
};
TEST_F(CheckpointStoreTest, TestCheckpoints)
{
std::string const testName = GetRandomName();
std::string consumerGroup = GetEnv("EVENTHUB_CONSUMER_GROUP");
auto containerClient{Azure::Storage::Blobs::BlobContainerClient::CreateFromConnectionString(
GetEnv("CHECKPOINTSTORE_STORAGE_CONNECTION_STRING"), testName, m_blobClientOptions)};
Azure::Messaging::EventHubs::BlobCheckpointStore checkpointStore(containerClient);
Azure::Messaging::EventHubs::Test::TestCheckpointStore checkpointStore;
auto checkpoints = checkpointStore.ListCheckpoints(
"fully-qualified-namespace", "event-hub-name", "consumer-group");
@ -91,10 +84,8 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
TEST_F(CheckpointStoreTest, TestOwnerships)
{
std::string const testName = GetRandomName();
auto containerClient{Azure::Storage::Blobs::BlobContainerClient::CreateFromConnectionString(
GetEnv("CHECKPOINTSTORE_STORAGE_CONNECTION_STRING"), testName, m_blobClientOptions)};
Azure::Messaging::EventHubs::BlobCheckpointStore checkpointStore(containerClient);
TestCheckpointStore checkpointStore;
auto ownerships = checkpointStore.ListOwnership(
"fully-qualified-namespace", "event-hub-name", "consumer-group");

View File

@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "./test_checkpoint_store.hpp"
#include "eventhubs_test_base.hpp"
#include <azure/core/context.hpp>
@ -26,11 +27,8 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
TEST_F(ProcessorTest, LoadBalancing_LIVEONLY_)
{
std::string const testName = GetRandomName();
auto containerClient{Azure::Storage::Blobs::BlobContainerClient::CreateFromConnectionString(
Azure::Core::_internal::Environment::GetVariable(
"CHECKPOINTSTORE_STORAGE_CONNECTION_STRING"),
testName)};
Azure::Messaging::EventHubs::BlobCheckpointStore checkpointStore(containerClient);
std::shared_ptr<Azure::Messaging::EventHubs::CheckpointStore> checkpointStore{
std::make_shared<Azure::Messaging::EventHubs::Test::TestCheckpointStore>()};
std::string eventHubName{GetEnv("EVENTHUB_NAME")};
std::string consumerGroup = GetEnv("EVENTHUB_CONSUMER_GROUP");
@ -48,11 +46,9 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
processorOptions.UpdateInterval = std::chrono::seconds(2);
Processor processor(
std::make_shared<ConsumerClient>(client),
std::make_shared<BlobCheckpointStore>(checkpointStore),
processorOptions);
std::make_shared<ConsumerClient>(client), checkpointStore, processorOptions);
processor.Run();
processor.Run({});
GTEST_LOG_(INFO) << "Sleep for 2 seconds to allow the processor to stabilize.";
std::this_thread::sleep_for(std::chrono::seconds(2));

View File

@ -16,7 +16,7 @@
class ProducerClientTest : public EventHubsTestBase {
};
TEST_F(ProducerClientTest, ConnectionStringNoEntityPath_LIVEONLY_)
TEST_F(ProducerClientTest, ConnectionStringNoEntityPath)
{
std::string const connStringNoEntityPath = GetEnv("EVENTHUB_CONNECTION_STRING");
std::string eventHubName{GetEnv("EVENTHUB_NAME")};
@ -25,7 +25,7 @@ TEST_F(ProducerClientTest, ConnectionStringNoEntityPath_LIVEONLY_)
EXPECT_EQ(eventHubName, client.GetEventHubName());
}
TEST_F(ProducerClientTest, ConnectionStringEntityPath_LIVEONLY_)
TEST_F(ProducerClientTest, ConnectionStringEntityPath)
{
std::string eventHubName{GetEnv("EVENTHUB_NAME")};
std::string const connStringEntityPath

View File

@ -2,7 +2,7 @@
"name": "azure-messaging-eventhubs",
"version-string": "1.0.0",
"dependencies": [
"azure-core-amqp-cpp",
"azure-storage-blobs-cpp"
"azure-core-cpp",
"azure-core-amqp-cpp"
]
}

View File

@ -5,7 +5,6 @@
include(CMakeFindDependencyMacro)
find_dependency(azure-core-amqp-cpp)
find_dependency(azure-storage-blobs-cpp)
include("${CMAKE_CURRENT_LIST_DIR}/azure-messaging-eventhubs-cppTargets.cmake")

View File

@ -16,11 +16,6 @@
"default-features": false,
"version>=": "1.0.0-beta.2"
},
{
"name": "azure-storage-blobs-cpp",
"default-features": false,
"version>=": "12.8.0"
},
{
"name": "vcpkg-cmake",
"host": true

View File

@ -0,0 +1,13 @@
# Release History
## 1.0.0-beta.3 (Unreleased)
### Features Added
- Initial release. Split from the `azure-messaging-eventubs` package.
### Breaking Changes
### Bugs Fixed
### Other Changes

View File

@ -0,0 +1,109 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# setting CMAKE_TOOLCHAIN_FILE must happen before creating the project
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/../../../cmake-modules")
include(AzureVcpkg)
az_vcpkg_integrate()
cmake_minimum_required (VERSION 3.13)
project(azure-messaging-eventhubs-checkpoint-blob LANGUAGES CXX)
# Compile Options
option(FETCH_SOURCE_DEPS "build source dependencies" OFF)
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED True)
set(CMAKE_WINDOWS_EXPORT_ALL_SYMBOLS ON)
if(FETCH_SOURCE_DEPS)
set(AZ_ALL_LIBRARIES ON)
include(FolderList)
SetCompileOptions(EVENTHUBS_CHECKPOINT_BLOB)
endif()
include(AzureVersion)
include(AzureCodeCoverage)
include(AzureTransportAdapters)
include(AzureDoxygen)
include(AzureGlobalCompileOptions)
include(AzureConfigRTTI)
include(AzureBuildTargetForCI)
# Add create_map_file function
include(CreateMapFile)
if(FETCH_SOURCE_DEPS)
GetFolderList(EVENTHUBS)
foreach(oneFolder IN LISTS BUILD_FOLDERS)
message("add folder ${oneFolder}")
add_subdirectory(${oneFolder} EXCLUDE_FROM_ALL)
endforeach()
elseif(NOT AZ_ALL_LIBRARIES)
find_package(azure-core-cpp CONFIG QUIET)
if(NOT azure-core-cpp_FOUND)
find_package(azure-core-cpp REQUIRED)
endif()
find_package(azure-messaging-eventhubs-cpp CONFIG QUIET)
if(NOT azure-core-cpp_FOUND)
find_package(azure-messaging-eventhubs-cpp REQUIRED)
endif()
find_package(azure-storage-blobs-cpp CONFIG QUIET)
if(NOT azure-storage-blobs-cpp_FOUND)
find_package(azure-storage-blobs-cpp REQUIRED)
endif()
endif()
set(
AZURE_MESSAGING_EVENTHUBS_BLOB_CHECKPOINT_HEADER
inc/azure/messaging/eventhubs/checkpointstore_blob/blob_checkpoint_store.hpp
)
set(
AZURE_MESSAGING_EVENTHUBS_BLOB_CHECKPOINT_SOURCE
src/blob_checkpoint_store.cpp
)
add_library(
azure-messaging-eventhubs-checkpoint-blob
${AZURE_MESSAGING_EVENTHUBS_BLOB_CHECKPOINT_HEADER} ${AZURE_MESSAGING_EVENTHUBS_BLOB_CHECKPOINT_SOURCE}
)
create_per_service_target_build(eventhubs azure-messaging-eventhubs-checkpoint-blob)
add_library(Azure::azure-messaging-eventhubs-checkpoint-blob ALIAS azure-messaging-eventhubs-checkpoint-blob)
target_include_directories(
azure-messaging-eventhubs-checkpoint-blob
PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/inc>
$<INSTALL_INTERFACE:include>
)
target_link_libraries(azure-messaging-eventhubs-checkpoint-blob
PUBLIC Azure::azure-core Azure::azure-messaging-eventhubs Azure::azure-storage-blobs
)
# coverage. Has no effect if BUILD_CODE_COVERAGE is OFF
create_code_coverage(eventhubs azure-messaging-eventhubs-checkpoint-blob azure-messaging-eventhubs-blobcheckpointstore-test "tests?/*;samples?/*")
get_az_version("${CMAKE_CURRENT_SOURCE_DIR}/src/private/package_version.hpp")
#generate_documentation(azure-messaging-eventhubs-checkpoint-blob ${AZ_LIBRARY_VERSION})
generate_documentation(blob-store ${AZ_LIBRARY_VERSION})
if(BUILD_TESTING)
add_subdirectory(test)
endif()
az_vcpkg_export(
azure-messaging-eventhubs-checkpoint-blob
MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB
azure/messaging/eventhubs/checkpointstore_blob/dll_import_export.hpp
)
az_rtti_setup(
azure-messaging-eventhubs-checkpoint-blob
MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB
azure/messaging/eventhubs/checkpointstore_blob/rtti.hpp
)
unset(FETCH_SOURCE_DEPS CACHE)

View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) Microsoft Corporation.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -0,0 +1,80 @@
<!-- cspell:words azeventhubs -->
# Azure Event Hubs Blob Storage Checkpoint Store for C++
The EventHubs Blob Storage Checkpoint Store is a checkpoint store for the
[Azure Event Hubs](https://azure.microsoft.com/services/event-hubs/) service, used to enable an
[Event Processor](https://learn.microsoft.com/azure/event-hubs/event-hubs-event-processor-host) to store checkpoints and partition ownership information in Azure Blob Storage.
For information on how to use an EventHubs processor, see the [Azure SDK for C++ EventHubs documentation](https://azure.github.io/azure-sdk-for-cpp/eventhubs.html).
Key links:
- [Source code][source]
- [API Reference Documentation][cppdoc]
- [Product documentation](https://azure.microsoft.com/services/event-hubs/)
- [Samples][cppdoc_examples]
## Getting started
### Install the package
Install the Azure Event Hubs Blob Storage Checkpoint Store for C++ with `vcpkg`:
```bash
vcpkg install azure-messaging-eventhubs-checkpointstore-blob-cpp
```
### Prerequisites
- A C++ Compiler with C++14 support
- An [Azure subscription](https://azure.microsoft.com/free/)
- An [Event Hub namespace](https://docs.microsoft.com/azure/event-hubs/).
- An Event Hub. You can create an event hub in your Event Hubs Namespace using the [Azure Portal](https://docs.microsoft.com/azure/event-hubs/event-hubs-create), or the [Azure CLI](https://docs.microsoft.com/azure/event-hubs/event-hubs-quickstart-cli).
## Contributing
For details on contributing to this repository, see the [contributing guide][azure_sdk_for_cpp_contributing].
This project welcomes contributions and suggestions. Most contributions require you to agree to a
Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us
the rights to use your contribution. For details, visit https://cla.microsoft.com.
When you submit a pull request, a CLA-bot will automatically determine whether you need to provide
a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions
provided by the bot. You will only need to do this once across all repos using our CLA.
This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/).
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or
contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.
### Additional Helpful Links for Contributors
Many people all over the world have helped make this project better. You'll want to check out:
* [What are some good first issues for new contributors to the repo?](https://github.com/azure/azure-sdk-for-cpp/issues?q=is%3Aopen+is%3Aissue+label%3A%22up+for+grabs%22)
* [How to build and test your change][azure_sdk_for_cpp_contributing_developer_guide]
* [How you can make a change happen!][azure_sdk_for_cpp_contributing_pull_requests]
* Frequently Asked Questions (FAQ) and Conceptual Topics in the detailed [Azure SDK for C++ wiki](https://github.com/azure/azure-sdk-for-cpp/wiki).
<!-- ### Community-->
### Reporting security issues and security bugs
Security issues and bugs should be reported privately, via email, to the Microsoft Security Response Center (MSRC) <secure@microsoft.com>. You should receive a response within 24 hours. If for some reason you do not, please follow up via email to ensure we received your original message. Further information, including the MSRC PGP key, can be found in the [Security TechCenter](https://www.microsoft.com/msrc/faqs-report-an-issue).
### License
Azure SDK for C++ is licensed under the [MIT](https://github.com/Azure/azure-sdk-for-cpp/blob/main/LICENSE.txt) license.
<!-- LINKS -->
[azure_sdk_for_cpp_contributing]: https://github.com/Azure/azure-sdk-for-cpp/blob/main/CONTRIBUTING.md
[azure_sdk_for_cpp_contributing_developer_guide]: https://github.com/Azure/azure-sdk-for-cpp/blob/main/CONTRIBUTING.md#developer-guide
[azure_sdk_for_cpp_contributing_pull_requests]: https://github.com/Azure/azure-sdk-for-cpp/blob/main/CONTRIBUTING.md#pull-requests
[consumer_client]: https://azuresdkdocs.blob.core.windows.net/$web/cpp/azure-messaging-eventhubs/latest/class_azure_1_1_messaging_1_1_event_hubs_1_1_consumer_client.html
[producer_client]: https://azuresdkdocs.blob.core.windows.net/$web/cpp/azure-messaging-eventhubs/1.0.0-beta.1/class_azure_1_1_messaging_1_1_event_hubs_1_1_producer_client.html
[source]: https://github.com/Azure/azure-sdk-for-cpp/tree/main/sdk/eventhubs
[azure_identity_pkg]: https://azuresdkdocs.blob.core.windows.net/$web/cpp/azure-identity/latest/index.html
[default_azure_credential]: https://azuresdkdocs.blob.core.windows.net/$web/cpp/azure-identity/latest/index.html#defaultazurecredential
<!-- TODO: Fix go links to refer to C++ documentation when it is published.-->
[cppdoc]: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
[cppdoc_examples]: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs#pkg-examples
![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-cpp%2Fsdk%2Feventhubs%2FREADME.png)

View File

@ -0,0 +1,103 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#pragma once
#include <azure/core/context.hpp>
#include <azure/core/datetime.hpp>
#include <azure/core/nullable.hpp>
#include <azure/messaging/eventhubs/checkpoint_store.hpp>
#include <azure/messaging/eventhubs/models/checkpoint_store_models.hpp>
#include <azure/storage/blobs.hpp>
#include <sstream>
#include <stdexcept>
#include <vector>
namespace Azure { namespace Messaging { namespace EventHubs {
/** @brief BlobCheckpointStore is an implementation of a CheckpointStore backed by Azure Blob
* Storage.
*/
class BlobCheckpointStore final : public Azure::Messaging::EventHubs::CheckpointStore {
Azure::Storage::Blobs::BlobContainerClient m_containerClient;
void UpdateCheckpointImpl(
Azure::Storage::Metadata const& metadata,
Models::Checkpoint& checkpoint);
void UpdateOwnership(
Azure::Storage::Blobs::Models::BlobItem const& blob,
Models::Ownership& ownership);
Azure::Storage::Metadata CreateCheckpointBlobMetadata(Models::Checkpoint const& checkpoint);
std::pair<Azure::DateTime, Azure::ETag> SetMetadata(
std::string const& blobName,
Azure::Storage::Metadata const& metadata,
Azure::ETag const& etag,
Core::Context const& context = {});
public:
/** @brief Construct a BlobCheckpointStore from another BlobCheckpointStore.
*/
BlobCheckpointStore(BlobCheckpointStore const& other) = default;
/** @brief Assign a BlobCheckpointStore to another BlobCheckpointStore.
*/
BlobCheckpointStore& operator=(BlobCheckpointStore const& other) = default;
/**@brief Construct a BlobCheckpointStore.
*
* @param containerClient An Azure Blob ContainerClient used to hold the checkpoints.
*/
BlobCheckpointStore(Azure::Storage::Blobs::BlobContainerClient const& containerClient)
: Azure::Messaging::EventHubs::CheckpointStore(), m_containerClient(containerClient)
{
m_containerClient.CreateIfNotExists();
}
/**@brief ClaimOwnership Claims ownership for a particular partition.
*
* @param partitionOwnership - The list of partition ownerships this instance is claiming.
* @param context - The context for cancelling long running operations.
*/
std::vector<Models::Ownership> ClaimOwnership(
std::vector<Models::Ownership> const& partitionOwnership,
Core::Context const& context = {}) override;
/**@brief List the checkpoints from storage.
*
* @param fullyQualifiedNamespace - The fully qualified Event Hubs namespace.
* @param eventHubName - The name of the specific Event Hub.
* @param consumerGroup - The name of the specific consumer group.
* @param context - The context for cancelling long running operations.
*
* @return A list of checkpoints.
*/
std::vector<Models::Checkpoint> ListCheckpoints(
std::string const& fullyQualifiedNamespace,
std::string const& eventHubName,
std::string const& consumerGroup,
Core::Context const& context = {}) override;
/**@brief ListOwnership lists all ownerships.
*
* @param fullyQualifiedNamespace - The fully qualified Event Hubs namespace.
* @param eventHubName - The name of the specific Event Hub.
* @param consumerGroup - The name of the specific consumer group.
* @param context - The context for cancelling long running operations.
*
* @return A list of ownerships.
*/
std::vector<Models::Ownership> ListOwnership(
std::string const& fullyQualifiedNamespace,
std::string const& eventHubName,
std::string const& consumerGroup,
Core::Context const& context = {}) override;
/**@brief UpdateCheckpoint updates a specific checkpoint with a sequence and offset.
*/
void UpdateCheckpoint(Models::Checkpoint const& checkpoint, Core::Context const& context = {})
override;
};
}}} // namespace Azure::Messaging::EventHubs

View File

@ -0,0 +1,40 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
/**
* @file
* @brief DLL export macro.
*/
// For explanation, see the comment in azure/core/dll_import_export.hpp
#pragma once
/**
* @def AZ_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_DLLEXPORT
* @brief Applies DLL export attribute, when applicable.
* @note See https://docs.microsoft.com/cpp/cpp/dllexport-dllimport?view=msvc-160.
*/
#if defined(AZ_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_DLL) \
|| (0 /*@AZ_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_DLL_INSTALLED_AS_PACKAGE@*/)
#define AZ_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_BUILT_AS_DLL 1
#else
#define AZ_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_BUILT_AS_DLL 0
#endif
#if AZ_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_BUILT_AS_DLL
#if defined(_MSC_VER)
#if defined(AZ_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_BEING_BUILT)
#define AZ_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_DLLEXPORT __declspec(dllexport)
#else // !defined(AZ_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_BEING_BUILT)
#define AZ_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_DLLEXPORT __declspec(dllimport)
#endif // AZ_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_BEING_BUILT
#else // !defined(_MSC_VER)
#define AZ_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_DLLEXPORT
#endif // _MSC_VER
#else // !AZ_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_BUILT_AS_DLL
#define AZ_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_DLLEXPORT
#endif // AZ_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_BUILT_AS_DLL
#undef AZ_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_BUILT_AS_DLL

View File

@ -0,0 +1,36 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
/**
* @file
* @brief Run-time type info enable or disable.
*
* @details Checks whenever RTTI is enabled and exports the symbol
* `AZ_MESSAGING_EVENTHUBS_RTTI`. When the macro is not defined, RTTI is disabled.
*
* @details Each library has this header file. These headers are being configured by
* `az_rtti_setup()` CMake macro. CMake install will patch this file during installation, depending
* on the build flags.
*/
#pragma once
/**
* @def AZ_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_RTTI
* @brief A macro indicating whether the code is built with RTTI or not.
*
* @details `AZ_RTTI` could be defined while building the Azure SDK with CMake, however, after
* the build is completed, that information is not preserved for the code that consumes Azure SDK
* headers, unless the code that consumes the SDK is the part of the same build process. To address
* this issue, CMake install would patch the header it places in the installation directory, so that
* condition:
* `#if defined(AZ_RTTI) || (0)`
* becomes, effectively,
* `#if defined(AZ_RTTI) || (0 + 1)`
* when the library was built with RTTI support, and will make no changes to the
* condition when it was not.
*/
#if defined(AZ_RTTI) || (0 /*@AZ_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_RTTI@*/)
#define AZ_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_RTTI
#endif

View File

@ -0,0 +1,219 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "azure/messaging/eventhubs/checkpointstore_blob/blob_checkpoint_store.hpp"
#include "azure/messaging/eventhubs/checkpoint_store.hpp"
#include <azure/core/internal/diagnostics/log.hpp>
#include <stdexcept>
using namespace Azure::Messaging::EventHubs::Models;
void Azure::Messaging::EventHubs::BlobCheckpointStore::UpdateCheckpointImpl(
Azure::Storage::Metadata const& metadata,
Checkpoint& checkpoint)
{
std::string temp = metadata.at("sequencenumber");
if (temp.empty())
{
throw std::runtime_error("missing sequence number");
}
checkpoint.SequenceNumber = std::stol(temp);
temp = metadata.at("offset");
if (temp.empty())
{
throw std::runtime_error("missing offset number");
}
checkpoint.Offset = std::stol(temp);
}
void Azure::Messaging::EventHubs::BlobCheckpointStore::UpdateOwnership(
Azure::Storage::Blobs::Models::BlobItem const& blob,
Ownership& ownership)
{
std::string temp = blob.Details.Metadata.at("ownerid");
if (temp.empty())
{
throw std::runtime_error("missing sequence number");
}
ownership.OwnerId = temp;
ownership.LastModifiedTime = blob.Details.LastModified;
ownership.ETag = blob.Details.ETag;
}
Azure::Storage::Metadata
Azure::Messaging::EventHubs::BlobCheckpointStore::CreateCheckpointBlobMetadata(
Checkpoint const& checkpoint)
{
Azure::Storage::Metadata metadata;
if (checkpoint.SequenceNumber.HasValue())
{
metadata["sequencenumber"] = std::to_string(checkpoint.SequenceNumber.Value());
}
if (checkpoint.Offset.HasValue())
{
metadata["offset"] = std::to_string(checkpoint.Offset.Value());
}
return metadata;
}
std::vector<Ownership> Azure::Messaging::EventHubs::BlobCheckpointStore::ClaimOwnership(
std::vector<Ownership> const& partitionOwnership,
Core::Context const& context)
{
std::vector<Ownership> newOwnerships;
for (Ownership ownership : partitionOwnership)
{
std::string blobName = ownership.GetOwnershipName();
Azure::Storage::Metadata metadata;
metadata["ownerId"] = ownership.OwnerId;
try
{
std::pair<Azure::DateTime, Azure::ETag> result
= SetMetadata(blobName, metadata, ownership.ETag.ValueOr(Azure::ETag()), context);
if (result.second.HasValue())
{
Ownership newOwnership(ownership);
newOwnership.ETag = result.second;
newOwnership.LastModifiedTime = result.first;
newOwnerships.emplace_back(newOwnership);
}
}
catch (...)
{
// we can fail to claim ownership and that's okay - it's expected that clients will
// attempt to claim with whatever state they hold locally. If they fail it just means
// someone else claimed ownership before them.
continue;
}
}
return newOwnerships;
}
std::vector<Checkpoint> Azure::Messaging::EventHubs::BlobCheckpointStore::ListCheckpoints(
std::string const& fullyQualifiedNamespace,
std::string const& eventHubName,
std::string const& consumerGroup,
Core::Context const& context)
{
std::vector<Checkpoint> checkpoints;
std::string prefix = Models::Checkpoint{consumerGroup, eventHubName, fullyQualifiedNamespace}
.GetCheckpointBlobPrefixName();
Azure::Storage::Blobs::ListBlobsOptions listOptions;
listOptions.Prefix = prefix;
listOptions.Include = Azure::Storage::Blobs::Models::ListBlobsIncludeFlags::Metadata;
for (auto page = m_containerClient.ListBlobs(listOptions, context); page.HasPage();
page.MoveToNextPage())
{
for (auto& blob : page.Blobs)
{
std::string partitionId = blob.Name.substr(blob.Name.rfind('/') + 1);
Checkpoint c = Checkpoint{consumerGroup, eventHubName, fullyQualifiedNamespace, partitionId};
UpdateCheckpointImpl(blob.Details.Metadata, c);
checkpoints.push_back(c);
}
}
return checkpoints;
}
/**@brief ListOwnership lists all ownerships.
*/
std::vector<Ownership> Azure::Messaging::EventHubs::BlobCheckpointStore::ListOwnership(
std::string const& fullyQualifiedNamespace,
std::string const& eventHubName,
std::string const& consumerGroup,
Core::Context const& context)
{
std::vector<Ownership> ownerships;
std::string prefix
= Ownership{consumerGroup, eventHubName, fullyQualifiedNamespace}.GetOwnershipPrefixName();
Azure::Storage::Blobs::ListBlobsOptions listOptions;
listOptions.Prefix = prefix;
listOptions.Include = Azure::Storage::Blobs::Models::ListBlobsIncludeFlags::Metadata;
for (auto page = m_containerClient.ListBlobs(listOptions, context); page.HasPage();
page.MoveToNextPage())
{
for (auto& blob : page.Blobs)
{
std::string partitionId = blob.Name.substr(blob.Name.rfind('/') + 1);
Ownership o{consumerGroup, eventHubName, fullyQualifiedNamespace, partitionId};
UpdateOwnership(blob, o);
ownerships.push_back(o);
}
}
return ownerships;
}
/**@brief UpdateCheckpoint updates a specific checkpoint with a sequence and offset.
*/
void Azure::Messaging::EventHubs::BlobCheckpointStore::UpdateCheckpoint(
Checkpoint const& checkpoint,
Core::Context const& context)
{
std::string blobName = checkpoint.GetCheckpointBlobName();
SetMetadata(blobName, CreateCheckpointBlobMetadata(checkpoint), Azure::ETag(), context);
}
std::pair<Azure::DateTime, Azure::ETag>
Azure::Messaging::EventHubs::BlobCheckpointStore::SetMetadata(
std::string const& blobName,
Azure::Storage::Metadata const& metadata,
Azure::ETag const& etag,
Core::Context const& context)
{
auto blobClient = m_containerClient.GetBlockBlobClient(blobName);
std::pair<Azure::DateTime, Azure::ETag> returnValue;
Azure::Storage::Blobs::SetBlobMetadataOptions options;
try
{
if (etag.HasValue())
{
options.AccessConditions.IfMatch = etag;
}
Azure::Storage::Blobs::Models::SetBlobMetadataResult result
= blobClient.SetMetadata(metadata, options, context).Value;
returnValue = std::make_pair(result.LastModified, result.ETag);
}
catch (Azure::Core::RequestFailedException const& ex)
{
// Ignore HTTP code 412 meaning condition could not be met;
if (ex.StatusCode == Azure::Core::Http::HttpStatusCode::PreconditionFailed)
{
}
if (ex.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound)
{
Azure::Core::Diagnostics::_internal::Log::Write(
Azure::Core::Diagnostics::Logger::Level::Warning,
"Set Metadata failed with PreconditionFailed or NotFound.; Upload blob content");
std::string blobContent = "";
// throws when blob does not exist , we need to upload the blob in order to create it
std::vector<uint8_t> buffer(blobContent.begin(), blobContent.end());
Azure::Storage::Blobs::UploadBlockBlobFromOptions upOptions;
upOptions.Metadata = metadata;
Azure::Storage::Blobs::Models::UploadBlockBlobFromResult result
= blobClient.UploadFrom(buffer.data(), buffer.size(), upOptions, context).Value;
returnValue = std::make_pair(result.LastModified, result.ETag);
}
else
{
throw;
}
}
return returnValue;
}

View File

@ -0,0 +1,70 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
/**
* @file
* @brief Provides version information.
*/
#pragma once
#include <cstdint>
#define AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_MAJOR 1
#define AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_MINOR 0
#define AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_PATCH 0
#define AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_PRERELEASE "beta.1"
#define AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_ITOA_HELPER(i) #i
#define AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_ITOA(i) \
AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_ITOA_HELPER(i)
namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail {
/**
* @brief Provides version information.
*/
class PackageVersion final {
public:
/**
* @brief Major numeric identifier.
*/
static constexpr int32_t Major = AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_MAJOR;
/**
* @brief Minor numeric identifier.
*/
static constexpr int32_t Minor = AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_MINOR;
/**
* @brief Patch numeric identifier.
*/
static constexpr int32_t Patch = AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_PATCH;
/**
* @brief Indicates whether the SDK is in a pre-release state.
*/
static constexpr bool IsPreRelease
= sizeof(AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_PRERELEASE) != sizeof("");
/**
* @brief The version in string format used for telemetry following the `semver.org`
* standard (https://semver.org).
*/
static constexpr const char* ToString()
{
return IsPreRelease
? AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_ITOA(AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_MAJOR) "." AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_ITOA(
AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_MINOR) "." AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_ITOA(AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_PATCH) "-" AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_PRERELEASE
: AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_ITOA(AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_MAJOR) "." AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_ITOA(
AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_MINOR) "." AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_ITOA(AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_PATCH);
}
};
}}}} // namespace Azure::Messaging::EventHubs::_detail
#undef AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_ITOA_HELPER
#undef AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_ITOA
#undef AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_MAJOR
#undef AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_MINOR
#undef AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_PATCH
#undef AZURE_MESSAGING_EVENTHUBS_CHECKPOINTSTORE_BLOB_VERSION_PRERELEASE

View File

@ -0,0 +1,54 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
cmake_minimum_required (VERSION 3.13)
project (azure-messaging-eventhubs-blobstore-test LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED True)
include(GoogleTest)
# Export the test folder for recordings access.
add_compile_definitions(AZURE_TEST_RECORDING_DIR="${CMAKE_CURRENT_LIST_DIR}")
include(TestProxyPrep)
SetUpTestProxy("sdk/eventhubs")
################## Unit Tests ##########################
add_executable (
azure-messaging-eventhubs-blobstore-test
blob_checkpoint_store_test.cpp
)
create_per_service_target_build(eventhubs azure-messaging-eventhubs-blobstore-test)
create_map_file(azure-messaging-eventhubs-blobstore-test azure-messaging-eventhubs-blobstore-test.map)
if (MSVC)
target_compile_options(azure-messaging-eventhubs-blobstore-test PUBLIC /wd6326 /wd26495 /wd26812)
endif()
target_link_libraries(
azure-messaging-eventhubs-blobstore-test
PRIVATE
azure-messaging-eventhubs-checkpoint-blob
azure-messaging-eventhubs
azure-core-test-fw
azure-identity
gtest
gtest_main
gmock
)
# Adding private headers so we can test the private APIs with no relative paths include.
target_include_directories (
azure-messaging-eventhubs-blobstore-test
PRIVATE
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../../src>)
# gtest_add_tests will scan the test from azure-messaging-eventhubs-test and call add_test
# for each test to ctest. This enables `ctest -r` to run specific tests directly.
gtest_discover_tests(azure-messaging-eventhubs-blobstore-test
TEST_PREFIX azure-messaging-eventhubs.
NO_PRETTY_TYPES
NO_PRETTY_VALUES
DISCOVERY_TIMEOUT 600)

View File

@ -0,0 +1,158 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "azure/messaging/eventhubs/checkpointstore_blob/blob_checkpoint_store.hpp"
#include "eventhubs_test_base.hpp"
#include <azure/core/context.hpp>
#include <azure/identity.hpp>
#include <azure/messaging/eventhubs.hpp>
namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
class BlobCheckpointStoreTest : public EventHubsTestBase {
virtual void SetUp() override
{
EventHubsTestBase::SetUp();
m_blobClientOptions = InitClientOptions<Azure::Storage::Blobs::BlobClientOptions>();
}
protected:
std::string GetRandomName()
{
std::string name = "checkpoint";
if (m_testContext.IsLiveMode())
{
name.append(Azure::Core::Uuid::CreateUuid().ToString());
}
else
{
name.append("-recording");
}
return name;
}
Azure::Storage::Blobs::BlobClientOptions m_blobClientOptions;
};
TEST_F(BlobCheckpointStoreTest, TestCheckpoints)
{
std::string const testName = GetRandomName();
std::string consumerGroup = GetEnv("EVENTHUB_CONSUMER_GROUP");
auto containerClient{Azure::Storage::Blobs::BlobContainerClient::CreateFromConnectionString(
GetEnv("CHECKPOINTSTORE_STORAGE_CONNECTION_STRING"), testName, m_blobClientOptions)};
Azure::Messaging::EventHubs::BlobCheckpointStore checkpointStore(containerClient);
auto checkpoints = checkpointStore.ListCheckpoints(
"fully-qualified-namespace", "event-hub-name", "consumer-group");
EXPECT_EQ(0ul, checkpoints.size());
checkpointStore.UpdateCheckpoint(Azure::Messaging::EventHubs::Models::Checkpoint{
consumerGroup,
"event-hub-name",
"ns.servicebus.windows.net",
"partition-id",
101,
202,
});
checkpoints = checkpointStore.ListCheckpoints(
"ns.servicebus.windows.net", "event-hub-name", consumerGroup);
EXPECT_EQ(checkpoints.size(), 1ul);
EXPECT_EQ(consumerGroup, checkpoints[0].ConsumerGroup);
EXPECT_EQ("event-hub-name", checkpoints[0].EventHubName);
EXPECT_EQ("ns.servicebus.windows.net", checkpoints[0].FullyQualifiedNamespaceName);
EXPECT_EQ("partition-id", checkpoints[0].PartitionId);
EXPECT_EQ(202, checkpoints[0].SequenceNumber.Value());
EXPECT_EQ(101, checkpoints[0].Offset.Value());
checkpointStore.UpdateCheckpoint(Azure::Messaging::EventHubs::Models::Checkpoint{
consumerGroup,
"event-hub-name",
"ns.servicebus.windows.net",
"partition-id",
102,
203,
});
checkpoints = checkpointStore.ListCheckpoints(
"ns.servicebus.windows.net", "event-hub-name", consumerGroup);
EXPECT_EQ(checkpoints.size(), 1ul);
EXPECT_EQ(consumerGroup, checkpoints[0].ConsumerGroup);
EXPECT_EQ("event-hub-name", checkpoints[0].EventHubName);
EXPECT_EQ("ns.servicebus.windows.net", checkpoints[0].FullyQualifiedNamespaceName);
EXPECT_EQ("partition-id", checkpoints[0].PartitionId);
EXPECT_EQ(203, checkpoints[0].SequenceNumber.Value());
EXPECT_EQ(102, checkpoints[0].Offset.Value());
}
TEST_F(BlobCheckpointStoreTest, TestOwnerships)
{
std::string const testName = GetRandomName();
auto containerClient{Azure::Storage::Blobs::BlobContainerClient::CreateFromConnectionString(
GetEnv("CHECKPOINTSTORE_STORAGE_CONNECTION_STRING"), testName, m_blobClientOptions)};
Azure::Messaging::EventHubs::BlobCheckpointStore checkpointStore(containerClient);
auto ownerships = checkpointStore.ListOwnership(
"fully-qualified-namespace", "event-hub-name", "consumer-group");
EXPECT_EQ(0ul, ownerships.size());
ownerships = checkpointStore.ClaimOwnership(
std::vector<Azure::Messaging::EventHubs::Models::Ownership>{});
EXPECT_EQ(0ul, ownerships.size());
ownerships = checkpointStore.ClaimOwnership(
std::vector<Azure::Messaging::EventHubs::Models::Ownership>{
Azure::Messaging::EventHubs::Models::Ownership{
"$Default",
"event-hub-name",
"ns.servicebus.windows.net",
"partition-id",
"owner-id"}});
// Fail the test immediately if there isn't an entry in the ownerships vector.
ASSERT_EQ(1ul, ownerships.size());
EXPECT_EQ("$Default", ownerships[0].ConsumerGroup);
EXPECT_EQ("event-hub-name", ownerships[0].EventHubName);
EXPECT_EQ("ns.servicebus.windows.net", ownerships[0].FullyQualifiedNamespace);
EXPECT_EQ("partition-id", ownerships[0].PartitionId);
EXPECT_EQ("owner-id", ownerships[0].OwnerId);
EXPECT_TRUE(ownerships[0].ETag.HasValue());
EXPECT_TRUE(ownerships[0].LastModifiedTime.HasValue());
Azure::ETag validEtag = ownerships[0].ETag.Value();
// Azure::DateTime lastDatetime = ownerships[0].LastModifiedTime.Value();
//
// This ownership should NOT take precedence over the previous ownership, so the set of
// ownerships returned should be empty.
ownerships = checkpointStore.ClaimOwnership(
std::vector<Azure::Messaging::EventHubs::Models::Ownership>{
Azure::Messaging::EventHubs::Models::Ownership{
"$Default",
"event-hub-name",
"ns.servicebus.windows.net",
"partition-id",
"owner-id",
Azure::ETag("randomETAG")}});
EXPECT_EQ(0ul, ownerships.size());
ownerships = checkpointStore.ClaimOwnership(
std::vector<Azure::Messaging::EventHubs::Models::Ownership>{
Azure::Messaging::EventHubs::Models::Ownership{
"$Default",
"event-hub-name",
"ns.servicebus.windows.net",
"partition-id",
"owner-id",
validEtag}});
EXPECT_EQ(1ul, ownerships.size());
EXPECT_NE(validEtag, ownerships[0].ETag.Value());
EXPECT_EQ("$Default", ownerships[0].ConsumerGroup);
EXPECT_EQ("event-hub-name", ownerships[0].EventHubName);
EXPECT_EQ("ns.servicebus.windows.net", ownerships[0].FullyQualifiedNamespace);
EXPECT_EQ("partition-id", ownerships[0].PartitionId);
EXPECT_EQ("owner-id", ownerships[0].OwnerId);
}
}}}} // namespace Azure::Messaging::EventHubs::Test

View File

@ -0,0 +1,21 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include <azure/core/test/test_base.hpp>
#include <gtest/gtest.h>
class EventHubsTestBase : public Azure::Core::Test::TestBase {
public:
EventHubsTestBase() { TestBase::SetUpTestSuiteLocal(AZURE_TEST_ASSETS_DIR); }
// Create
virtual void SetUp() override
{
Azure::Core::Test::TestBase::SetUpTestBase(AZURE_TEST_RECORDING_DIR);
}
virtual void TearDown() override
{
// Make sure you call the base classes TearDown method to ensure recordings are made.
TestBase::TearDown();
}
};

View File

@ -0,0 +1,10 @@
{
"name": "azure-messaging-eventhubs-checkpointstore-cpp",
"version-string": "1.0.0",
"dependencies": [
"azure-core-cpp",
"azure-core-amqp-cpp",
"azure-messaging-eventhubs-cpp",
"azure-storage-blobs-cpp"
]
}

View File

@ -0,0 +1,13 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
@PACKAGE_INIT@
include(CMakeFindDependencyMacro)
find_dependency(azure-core-amqp-cpp)
find_dependency(azure-messaging-eventhubs-cpp)
find_dependency(azure-storage-blobs-cpp)
include("${CMAKE_CURRENT_LIST_DIR}/azure-messaging-eventhubs-checkpointstore-blob-cppTargets.cmake")
check_required_components("azure-messaging-eventhubs-checkpointstore-blob-cpp")

View File

@ -0,0 +1,22 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
vcpkg_from_github(
OUT_SOURCE_PATH SOURCE_PATH
REPO Azure/azure-sdk-for-cpp
REF azure-messaging-eventhubs-checkpointstore-blob_@AZ_LIBRARY_VERSION@
SHA512 0
)
vcpkg_cmake_configure(
SOURCE_PATH "${SOURCE_PATH}/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/"
OPTIONS
-DWARNINGS_AS_ERRORS=OFF
-DBUILD_TESTING=OFF
)
vcpkg_cmake_install()
file(REMOVE_RECURSE "${CURRENT_PACKAGES_DIR}/debug/include")
vcpkg_cmake_config_fixup()
file(REMOVE_RECURSE "${CURRENT_PACKAGES_DIR}/debug/share")
vcpkg_copy_pdbs()

View File

@ -0,0 +1,38 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
{
"name": "azure-messaging-eventhubs-checkpointstore-blob-cpp",
"version-semver": "@AZ_LIBRARY_VERSION@",
"description": [
"Microsoft Azure Messaging Event Hubs SDK for C++ Blob Checkpoint Store",
"This library provides an Azure-Storage-Blobs based implementation of an Azure Messaging Event Hubs SDK Checkpoint Store."
],
"homepage": "https://github.com/Azure/azure-sdk-for-cpp/tree/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob",
"license": "MIT",
"dependencies": [
{
"name": "azure-core-amqp-cpp",
"default-features": false,
"version>=": "1.0.0-beta.2"
},
{
"name": "azure-messaging-eventhubs",
"default-features": false,
"version>=": "1.0.0-beta.2"
},
{
"name": "azure-storage-blobs-cpp",
"default-features": false,
"version>=": "12.8.0"
},
{
"name": "vcpkg-cmake",
"host": true
},
{
"name": "vcpkg-cmake-config",
"host": true
}
]
}

View File

@ -36,10 +36,16 @@ stages:
- Name: azure-messaging-eventhubs
Path: azure-messaging-eventhubs
VcpkgPortName: azure-messaging-eventhubs-cpp
- Name: azure-messaging-eventhubs-checkpointstore-blob
Path: blob-store
VcpkgPortName: azure-messaging-eventhubs-checkpointstore-blob-cpp
ArtifactsSource:
- Name: azure-messaging-eventhubs
Path: azure-messaging-eventhubs
VcpkgPortName: azure-messaging-eventhubs-cpp
- Name: azure-messaging-eventhubs-checkpointstore-blob
Path: blob-store
VcpkgPortName: azure-messaging-eventhubs-checkpointstore-blob-cpp
TestEnv:
- Name: AZURE_TENANT_ID
Value: "33333333-3333-3333-3333-333333333333"
@ -69,4 +75,7 @@ stages:
Value: '-DBUILD_TESTING=ON -DBUILD_SAMPLES=ON -DBUILD_PERFORMANCE_TESTS=ON'
CMakeSourceTestOptions:
- Name: Source
Value: '-DFETCH_SOURCE_DEPS=OFF'
# FETCH_SOURCE_DEPS is disabled because azure-messaging-eventhubs is not present in the vcpkg catalog at the current Azure SDK baseline commit.
# Disabling DISABLE_AZURE_CORE_OPENTELEMETRY because eventhubs does not have a dependency on azure-core-opentelemetry, but azure core includes
# azure-core-opentelemetry in builds unless disabled.
Value: '-DFETCH_SOURCE_DEPS=OFF -DDISABLE_AZURE_CORE_OPENTELEMETRY=ON'

View File

@ -1,7 +1,7 @@
{
"name": "azure-sdk-for-cpp",
"version": "1.5.0",
"builtin-baseline": "dafef74af53669ef1cc9015f55e0ce809ead62aa",
"builtin-baseline": "33409307f1e3411112a0a6bbf3011ea3cca1bfc9",
"dependencies": [
{
"name": "curl"