Switched blob checkpoint store to use both connection string and token credentials. (#5632)
* Switched blob checkpoint store to use both connection string and token credentials; re-recorded tests using token credentials * Added environment variable to ci.yml * Try a slightly different storage url
This commit is contained in:
parent
4415f66cff
commit
260ae116cc
@ -132,6 +132,8 @@ extends:
|
||||
# EventHubs
|
||||
- Name: CHECKPOINTSTORE_STORAGE_CONNECTION_STRING
|
||||
Value: "DefaultEndpointsProtocol=https;AccountName=notReal;AccountKey=3333333333333333333333333333333333333333333333333333333333333333333333333333333333333333;EndpointSuffix=core.windows.net"
|
||||
- Name: CHECKPOINTSTORE_STORAGE_URL
|
||||
Value: "https://non-real-account.blob.core.windows.net/"
|
||||
|
||||
CMakeTestOptions:
|
||||
- Name: Default
|
||||
|
||||
@ -2,5 +2,5 @@
|
||||
"AssetsRepo": "Azure/azure-sdk-assets",
|
||||
"AssetsRepoPrefixPath": "cpp",
|
||||
"TagPrefix": "cpp/eventhubs",
|
||||
"Tag": "cpp/eventhubs_ab9d97a1b9"
|
||||
"Tag": "cpp/eventhubs_54bfb9d8e7"
|
||||
}
|
||||
|
||||
@ -7,7 +7,8 @@
|
||||
#include <azure/core/nullable.hpp>
|
||||
#include <azure/messaging/eventhubs/checkpoint_store.hpp>
|
||||
#include <azure/messaging/eventhubs/models/checkpoint_store_models.hpp>
|
||||
#include <azure/storage/blobs.hpp>
|
||||
#include <azure/storage/blobs/blob_container_client.hpp>
|
||||
#include <azure/storage/blobs/block_blob_client.hpp>
|
||||
|
||||
#include <sstream>
|
||||
#include <stdexcept>
|
||||
|
||||
@ -4,10 +4,6 @@
|
||||
#include "azure/messaging/eventhubs/checkpointstore_blob/blob_checkpoint_store.hpp"
|
||||
#include "eventhubs_test_base.hpp"
|
||||
|
||||
#include <azure/core/context.hpp>
|
||||
#include <azure/identity.hpp>
|
||||
#include <azure/messaging/eventhubs.hpp>
|
||||
|
||||
namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
|
||||
|
||||
class BlobCheckpointStoreTest : public EventHubsTestBase {
|
||||
@ -32,23 +28,46 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
|
||||
return name;
|
||||
}
|
||||
|
||||
Azure::Storage::Blobs::BlobContainerClient CreateBlobContainerClient(
|
||||
std::string const& testName)
|
||||
{
|
||||
if (GetParam() == AuthType::ConnectionString)
|
||||
{
|
||||
return Azure::Storage::Blobs::BlobContainerClient::CreateFromConnectionString(
|
||||
GetEnv("CHECKPOINTSTORE_STORAGE_CONNECTION_STRING"), testName, m_blobClientOptions);
|
||||
}
|
||||
else if (GetParam() == AuthType::ManagedIdentity)
|
||||
{
|
||||
return Azure::Storage::Blobs::BlobContainerClient(
|
||||
GetEnv("CHECKPOINTSTORE_STORAGE_URL") + "/" + testName,
|
||||
GetTestCredential(),
|
||||
m_blobClientOptions);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw std::runtime_error("AuthType not supported");
|
||||
}
|
||||
}
|
||||
|
||||
Azure::Storage::Blobs::BlobClientOptions m_blobClientOptions;
|
||||
};
|
||||
|
||||
TEST_F(BlobCheckpointStoreTest, TestCheckpoints_LIVEONLY_)
|
||||
TEST_P(BlobCheckpointStoreTest, TestCheckpoints)
|
||||
{
|
||||
std::string const testName = GetRandomName();
|
||||
std::string consumerGroup = GetEnv("EVENTHUB_CONSUMER_GROUP");
|
||||
auto containerClient{Azure::Storage::Blobs::BlobContainerClient::CreateFromConnectionString(
|
||||
GetEnv("CHECKPOINTSTORE_STORAGE_CONNECTION_STRING"), testName, m_blobClientOptions)};
|
||||
Azure::Messaging::EventHubs::BlobCheckpointStore checkpointStore(containerClient);
|
||||
|
||||
auto checkpoints = checkpointStore.ListCheckpoints(
|
||||
auto containerClient{CreateBlobContainerClient(testName)};
|
||||
|
||||
std::shared_ptr<CheckpointStore> checkpointStore{
|
||||
std::make_shared<Azure::Messaging::EventHubs::BlobCheckpointStore>(containerClient)};
|
||||
|
||||
auto checkpoints = checkpointStore->ListCheckpoints(
|
||||
"fully-qualified-namespace", "event-hub-name", "consumer-group");
|
||||
|
||||
EXPECT_EQ(0ul, checkpoints.size());
|
||||
|
||||
checkpointStore.UpdateCheckpoint(Azure::Messaging::EventHubs::Models::Checkpoint{
|
||||
checkpointStore->UpdateCheckpoint(Azure::Messaging::EventHubs::Models::Checkpoint{
|
||||
consumerGroup,
|
||||
"event-hub-name",
|
||||
"ns.servicebus.windows.net",
|
||||
@ -57,7 +76,14 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
|
||||
202,
|
||||
});
|
||||
|
||||
checkpoints = checkpointStore.ListCheckpoints(
|
||||
{
|
||||
// There still should be no checkpoints in the partition we first queried.
|
||||
checkpoints = checkpointStore->ListCheckpoints(
|
||||
"fully-qualified-namespace", "event-hub-name", "consumer-group");
|
||||
EXPECT_EQ(0ul, checkpoints.size());
|
||||
}
|
||||
|
||||
checkpoints = checkpointStore->ListCheckpoints(
|
||||
"ns.servicebus.windows.net", "event-hub-name", consumerGroup);
|
||||
EXPECT_EQ(checkpoints.size(), 1ul);
|
||||
EXPECT_EQ(consumerGroup, checkpoints[0].ConsumerGroup);
|
||||
@ -67,7 +93,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
|
||||
EXPECT_EQ(202, checkpoints[0].SequenceNumber.Value());
|
||||
EXPECT_EQ(101, checkpoints[0].Offset.Value());
|
||||
|
||||
checkpointStore.UpdateCheckpoint(Azure::Messaging::EventHubs::Models::Checkpoint{
|
||||
checkpointStore->UpdateCheckpoint(Azure::Messaging::EventHubs::Models::Checkpoint{
|
||||
consumerGroup,
|
||||
"event-hub-name",
|
||||
"ns.servicebus.windows.net",
|
||||
@ -76,7 +102,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
|
||||
203,
|
||||
});
|
||||
|
||||
checkpoints = checkpointStore.ListCheckpoints(
|
||||
checkpoints = checkpointStore->ListCheckpoints(
|
||||
"ns.servicebus.windows.net", "event-hub-name", consumerGroup);
|
||||
EXPECT_EQ(checkpoints.size(), 1ul);
|
||||
EXPECT_EQ(consumerGroup, checkpoints[0].ConsumerGroup);
|
||||
@ -87,23 +113,24 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
|
||||
EXPECT_EQ(102, checkpoints[0].Offset.Value());
|
||||
}
|
||||
|
||||
TEST_F(BlobCheckpointStoreTest, TestOwnerships_LIVEONLY_)
|
||||
TEST_P(BlobCheckpointStoreTest, TestOwnerships)
|
||||
{
|
||||
std::string const testName = GetRandomName();
|
||||
auto containerClient{Azure::Storage::Blobs::BlobContainerClient::CreateFromConnectionString(
|
||||
GetEnv("CHECKPOINTSTORE_STORAGE_CONNECTION_STRING"), testName, m_blobClientOptions)};
|
||||
|
||||
Azure::Messaging::EventHubs::BlobCheckpointStore checkpointStore(containerClient);
|
||||
auto containerClient{CreateBlobContainerClient(testName)};
|
||||
|
||||
auto ownerships = checkpointStore.ListOwnership(
|
||||
std::shared_ptr<CheckpointStore> checkpointStore{
|
||||
std::make_shared<Azure::Messaging::EventHubs::BlobCheckpointStore>(containerClient)};
|
||||
|
||||
auto ownerships = checkpointStore->ListOwnership(
|
||||
"fully-qualified-namespace", "event-hub-name", "consumer-group");
|
||||
EXPECT_EQ(0ul, ownerships.size());
|
||||
|
||||
ownerships = checkpointStore.ClaimOwnership(
|
||||
ownerships = checkpointStore->ClaimOwnership(
|
||||
std::vector<Azure::Messaging::EventHubs::Models::Ownership>{});
|
||||
EXPECT_EQ(0ul, ownerships.size());
|
||||
|
||||
ownerships = checkpointStore.ClaimOwnership(
|
||||
ownerships = checkpointStore->ClaimOwnership(
|
||||
std::vector<Azure::Messaging::EventHubs::Models::Ownership>{
|
||||
Azure::Messaging::EventHubs::Models::Ownership{
|
||||
"$Default",
|
||||
@ -126,7 +153,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
|
||||
//
|
||||
// This ownership should NOT take precedence over the previous ownership, so the set of
|
||||
// ownerships returned should be empty.
|
||||
ownerships = checkpointStore.ClaimOwnership(
|
||||
ownerships = checkpointStore->ClaimOwnership(
|
||||
std::vector<Azure::Messaging::EventHubs::Models::Ownership>{
|
||||
Azure::Messaging::EventHubs::Models::Ownership{
|
||||
"$Default",
|
||||
@ -137,7 +164,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
|
||||
Azure::ETag("randomETAG")}});
|
||||
EXPECT_EQ(0ul, ownerships.size());
|
||||
|
||||
ownerships = checkpointStore.ClaimOwnership(
|
||||
ownerships = checkpointStore->ClaimOwnership(
|
||||
std::vector<Azure::Messaging::EventHubs::Models::Ownership>{
|
||||
Azure::Messaging::EventHubs::Models::Ownership{
|
||||
"$Default",
|
||||
@ -155,4 +182,27 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
|
||||
EXPECT_EQ("partition-id", ownerships[0].PartitionId);
|
||||
EXPECT_EQ("owner-id", ownerships[0].OwnerId);
|
||||
}
|
||||
|
||||
namespace {
|
||||
static std::string GetSuffix(const testing::TestParamInfo<AuthType>& info)
|
||||
{
|
||||
std::string stringValue = "";
|
||||
switch (info.param)
|
||||
{
|
||||
case AuthType::ConnectionString:
|
||||
stringValue = "ConnectionString_LIVEONLY_";
|
||||
break;
|
||||
case AuthType::ManagedIdentity:
|
||||
stringValue = "ManagedIdentity";
|
||||
break;
|
||||
}
|
||||
return stringValue;
|
||||
}
|
||||
} // namespace
|
||||
INSTANTIATE_TEST_SUITE_P(
|
||||
EventHubs,
|
||||
BlobCheckpointStoreTest,
|
||||
::testing::Values(AuthType::ManagedIdentity, AuthType::ConnectionString),
|
||||
GetSuffix);
|
||||
|
||||
}}}} // namespace Azure::Messaging::EventHubs::Test
|
||||
|
||||
@ -5,7 +5,14 @@
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
class EventHubsTestBase : public Azure::Core::Test::TestBase {
|
||||
enum class AuthType
|
||||
{
|
||||
ManagedIdentity,
|
||||
ConnectionString,
|
||||
};
|
||||
|
||||
class EventHubsTestBase : public Azure::Core::Test::TestBase,
|
||||
public ::testing::WithParamInterface<AuthType> {
|
||||
public:
|
||||
EventHubsTestBase() { TestBase::SetUpTestSuiteLocal(AZURE_TEST_ASSETS_DIR); }
|
||||
// Create
|
||||
@ -18,4 +25,6 @@ public:
|
||||
// Make sure you call the base classes TearDown method to ensure recordings are made.
|
||||
TestBase::TearDown();
|
||||
}
|
||||
|
||||
protected:
|
||||
};
|
||||
|
||||
@ -247,6 +247,10 @@
|
||||
"type": "string",
|
||||
"value": "[variables('consumerGroup')]"
|
||||
},
|
||||
"CHECKPOINTSTORE_STORAGE_URL": {
|
||||
"type": "string",
|
||||
"value": "[concat('https://', variables('storageAccount'), '.blob.', parameters('storageEndpointSuffix'))]"
|
||||
},
|
||||
"CHECKPOINTSTORE_STORAGE_CONNECTION_STRING": {
|
||||
"type": "string",
|
||||
"value": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('storageAccount'), ';AccountKey=', listKeys(variables('storageAccountId'), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value, ';EndpointSuffix=', parameters('storageEndpointSuffix'))]"
|
||||
|
||||
@ -59,6 +59,8 @@ extends:
|
||||
Value: "defaultgroup"
|
||||
- Name: EVENTHUB_NAME
|
||||
Value: "non-real-eventhub-name"
|
||||
- Name: CHECKPOINTSTORE_STORAGE_URL
|
||||
Value: "https://non-real-account.blob.core.windows.net"
|
||||
- Name: EVENTHUB_CONNECTION_STRING
|
||||
Value: "Endpoint=sb://notReal.servicebus.windows.net/;SharedAccessKeyName=notReal"
|
||||
- Name: CHECKPOINTSTORE_STORAGE_CONNECTION_STRING
|
||||
|
||||
Loading…
Reference in New Issue
Block a user