azure-sdk-for-cpp/sdk/storage/inc/common/concurrent_transfer.hpp
JinmingHu 66c7518dce
Suggested changes in API review (#244)
* Rename Properties->HttpHeaders

* Rename BlobXxxItems->Items

* Remove MaxResults in response of list operations

* specify time format in comments for GetUSerDelegationKey API

* per operation pipelines and per retry pipelines

* Define some response types and rename ListBlobsFlat

* Add different return types for some APIs

* Rename BlobDownloadInfo->BlobDownloadResponse

* Assign default value to BlobContentLength to supporess compiler warnings

* add concurrent download to buffer

* concurrent upload block blob from buffer
2020-07-07 10:54:05 +08:00

71 lines
1.7 KiB
C++

// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
#pragma once
#include <atomic>
#include <cstdlib>
#include <functional>
#include <future>
namespace Azure { namespace Storage { namespace Details {
inline void ConcurrentTransfer(
int64_t offset,
int64_t length,
int64_t chunkSize,
int concurrency,
// offset, length, chunk id, number of chunks
std::function<void(int64_t, int64_t, int64_t, int64_t)> transferFunc)
{
std::atomic<int> numWorkingThreads{concurrency};
std::atomic<int> nextChunkId{0};
std::atomic<bool> failed{false};
const auto numChunks = (length + chunkSize - 1) / chunkSize;
auto threadFunc = [&]() {
while (true)
{
int chunkId = nextChunkId.fetch_add(1);
if (chunkId >= numChunks || failed)
{
break;
}
int64_t chunkOffset = offset + chunkSize * chunkId;
int64_t chunkLength = std::min(length - chunkSize * chunkId, chunkSize);
try
{
transferFunc(chunkOffset, chunkLength, chunkId, numChunks);
}
catch (std::exception&)
{
if (failed.exchange(true) == false)
{
numWorkingThreads.fetch_sub(1);
throw;
}
}
}
numWorkingThreads.fetch_sub(1);
};
std::vector<std::future<void>> threadHandles;
for (int i = 0; i < concurrency - 1; ++i)
{
threadHandles.emplace_back(std::async(std::launch::async, threadFunc));
}
threadFunc();
for (auto& handle : threadHandles)
{
handle.get();
}
if (numWorkingThreads != 0)
{
std::abort();
}
}
}}} // namespace Azure::Storage::Details