Progress stream reader (#2563)

* Progress stream reader

* format

* Update sdk/core/azure-core/src/io/body_stream.cpp

Co-authored-by: JinmingHu <jinmhu@microsoft.com>

* PR comments

* remove

* one more comment

* replaced if null with azure_assert

* moved from pointer to reference

* PR comments

* clang

Co-authored-by: JinmingHu <jinmhu@microsoft.com>
This commit is contained in:
George Arama 2021-07-20 13:28:48 -07:00 committed by GitHub
parent 157e4875f1
commit 1f15e046e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 179 additions and 0 deletions

View File

@ -3,6 +3,7 @@
## 1.2.0-beta.1 (Unreleased)
### Features Added
- Added `Azure::Core::IO::ProgressBodyStream` type that wraps an existing BodyStream based type stream and reports progress via callback when the stream position is updated.
### Breaking Changes

View File

@ -20,6 +20,7 @@
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <functional>
#include <memory>
#include <vector>
@ -282,4 +283,36 @@ namespace Azure { namespace Core { namespace IO {
int64_t Length() const override;
};
/**
* @brief A concrete implementation of #Azure::Core::IO::BodyStream that wraps another stream and
* reports progress
*/
class ProgressBodyStream : public BodyStream {
private:
BodyStream* m_bodyStream;
int64_t m_bytesTransferred;
std::function<void(int64_t bytesTransferred)> m_callback;
private:
size_t OnRead(uint8_t* buffer, size_t count, Azure::Core::Context const& context) override;
public:
/**
* @brief Constructs `%ProgressBodyStream` from a %BodyStream.
*
* @param bodyStream The body stream to wrap.
*
* @param callback The callback method used to report progress back to the caller.
*
* @remark The #Azure::Core::IO::ProgressBodyStream does not own the wrapped stream
* and is not responsible for closing / cleaning up resources.
*/
ProgressBodyStream(
BodyStream& bodyStream,
std::function<void(int64_t bytesTransferred)> callback);
void Rewind() override;
int64_t Length() const override;
};
}}} // namespace Azure::Core::IO

View File

@ -190,3 +190,31 @@ size_t FileBodyStream::OnRead(uint8_t* buffer, size_t count, Azure::Core::Contex
void FileBodyStream::Rewind() { m_randomAccessFileBodyStream->Rewind(); }
int64_t FileBodyStream::Length() const { return m_randomAccessFileBodyStream->Length(); }
ProgressBodyStream::ProgressBodyStream(
BodyStream& bodyStream,
std::function<void(int64_t bytesTransferred)> callback)
: m_bodyStream(&bodyStream), m_bytesTransferred(0), m_callback(std::move(callback))
{
}
void ProgressBodyStream::Rewind()
{
m_bodyStream->Rewind();
m_bytesTransferred = 0;
m_callback(m_bytesTransferred);
}
size_t ProgressBodyStream::OnRead(
uint8_t* buffer,
size_t count,
Azure::Core::Context const& context)
{
size_t read = m_bodyStream->Read(buffer, count, context);
m_bytesTransferred += read;
m_callback(m_bytesTransferred);
return read;
}
int64_t ProgressBodyStream::Length() const { return m_bodyStream->Length(); }

View File

@ -145,3 +145,120 @@ TEST(FileBodyStream, Read)
EXPECT_EQ(readSize, FileSize);
EXPECT_EQ(buffer[FileSize], 0);
}
TEST(ProgressBodyStream, Init)
{
int64_t bytesTransferred = -1;
std::string testDataPath(AZURE_TEST_DATA_PATH);
testDataPath.append("/fileData");
Azure::Core::IO::FileBodyStream stream(testDataPath);
ProgressBodyStream progress(stream, [&bytesTransferred](int64_t bt) { bytesTransferred = bt; });
EXPECT_EQ(bytesTransferred, -1);
EXPECT_EQ(progress.Length(), stream.Length());
}
TEST(ProgressBodyStream, ReadChunk)
{
int64_t bytesTransferred = -1;
std::string testDataPath(AZURE_TEST_DATA_PATH);
testDataPath.append("/fileData");
Azure::Core::IO::FileBodyStream stream(testDataPath);
ProgressBodyStream progress(stream, [&bytesTransferred](int64_t bt) { bytesTransferred = bt; });
std::vector<uint8_t> buffer(30);
size_t readSize = progress.ReadToCount(buffer.data(), 10);
EXPECT_EQ(bytesTransferred, 10);
EXPECT_EQ(progress.Length(), stream.Length());
EXPECT_EQ(readSize, 10);
}
TEST(ProgressBodyStream, MultiWrapProgressStream)
{
int64_t bytesTransferred = -1;
int64_t wrapBytesTransferred = -1;
std::string testDataPath(AZURE_TEST_DATA_PATH);
testDataPath.append("/fileData");
Azure::Core::IO::FileBodyStream stream(testDataPath);
ProgressBodyStream progress(stream, [&bytesTransferred](int64_t bt) { bytesTransferred = bt; });
ProgressBodyStream progress2(
progress, [&wrapBytesTransferred](int64_t bt) { wrapBytesTransferred = bt; });
std::vector<uint8_t> buffer(30);
size_t readSize = progress2.ReadToCount(buffer.data(), 10);
EXPECT_EQ(bytesTransferred, 10);
EXPECT_EQ(progress.Length(), stream.Length());
EXPECT_EQ(readSize, 10);
EXPECT_EQ(wrapBytesTransferred, 10);
EXPECT_EQ(progress2.Length(), stream.Length());
}
TEST(ProgressBodyStream, ReadMultipleChunks)
{
int64_t bytesTransferred = -1;
std::string testDataPath(AZURE_TEST_DATA_PATH);
testDataPath.append("/fileData");
Azure::Core::IO::FileBodyStream stream(testDataPath);
ProgressBodyStream progress(stream, [&bytesTransferred](int64_t bt) { bytesTransferred = bt; });
std::vector<uint8_t> buffer(10);
for (int i = 0; i < stream.Length() / 10; i++)
{
size_t readSize = progress.ReadToCount(buffer.data(), 10);
EXPECT_EQ(bytesTransferred, (i + 1) * 10);
EXPECT_EQ(progress.Length(), stream.Length());
EXPECT_EQ(readSize, 10);
}
}
TEST(ProgressBodyStream, ReadMultipleChunksRewind)
{
int64_t bytesTransferred = -1;
std::string testDataPath(AZURE_TEST_DATA_PATH);
testDataPath.append("/fileData");
Azure::Core::IO::FileBodyStream stream(testDataPath);
ProgressBodyStream progress(stream, [&bytesTransferred](int64_t bt) { bytesTransferred = bt; });
std::vector<uint8_t> buffer(10);
for (int i = 0; i < stream.Length() / 100; i++)
{
size_t readSize = progress.ReadToCount(buffer.data(), 10);
EXPECT_EQ(bytesTransferred, (i + 1) * 10);
EXPECT_EQ(progress.Length(), stream.Length());
EXPECT_EQ(readSize, 10);
}
progress.Rewind();
EXPECT_EQ(bytesTransferred, 0);
EXPECT_EQ(progress.Length(), stream.Length());
for (int i = 0; i < stream.Length() / 100; i++)
{
size_t readSize = progress.ReadToCount(buffer.data(), 10);
EXPECT_EQ(bytesTransferred, (i + 1) * 10);
EXPECT_EQ(progress.Length(), stream.Length());
EXPECT_EQ(readSize, 10);
}
}