From 1f15e046e065190b489be912567961443236efc5 Mon Sep 17 00:00:00 2001 From: George Arama <50641385+gearama@users.noreply.github.com> Date: Tue, 20 Jul 2021 13:28:48 -0700 Subject: [PATCH] Progress stream reader (#2563) * Progress stream reader * format * Update sdk/core/azure-core/src/io/body_stream.cpp Co-authored-by: JinmingHu * PR comments * remove * one more comment * replaced if null with azure_assert * moved from pointer to reference * PR comments * clang Co-authored-by: JinmingHu --- sdk/core/azure-core/CHANGELOG.md | 1 + .../inc/azure/core/io/body_stream.hpp | 33 +++++ sdk/core/azure-core/src/io/body_stream.cpp | 28 +++++ .../azure-core/test/ut/bodystream_test.cpp | 117 ++++++++++++++++++ 4 files changed, 179 insertions(+) diff --git a/sdk/core/azure-core/CHANGELOG.md b/sdk/core/azure-core/CHANGELOG.md index 00d78fc12..c62c4c789 100644 --- a/sdk/core/azure-core/CHANGELOG.md +++ b/sdk/core/azure-core/CHANGELOG.md @@ -3,6 +3,7 @@ ## 1.2.0-beta.1 (Unreleased) ### Features Added +- Added `Azure::Core::IO::ProgressBodyStream` type that wraps an existing BodyStream based type stream and reports progress via callback when the stream position is updated. ### Breaking Changes diff --git a/sdk/core/azure-core/inc/azure/core/io/body_stream.hpp b/sdk/core/azure-core/inc/azure/core/io/body_stream.hpp index 24e0e6084..845877a22 100644 --- a/sdk/core/azure-core/inc/azure/core/io/body_stream.hpp +++ b/sdk/core/azure-core/inc/azure/core/io/body_stream.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -282,4 +283,36 @@ namespace Azure { namespace Core { namespace IO { int64_t Length() const override; }; + /** + * @brief A concrete implementation of #Azure::Core::IO::BodyStream that wraps another stream and + * reports progress + */ + class ProgressBodyStream : public BodyStream { + private: + BodyStream* m_bodyStream; + int64_t m_bytesTransferred; + std::function m_callback; + + private: + size_t OnRead(uint8_t* buffer, size_t count, Azure::Core::Context const& context) override; + + public: + /** + * @brief Constructs `%ProgressBodyStream` from a %BodyStream. + * + * @param bodyStream The body stream to wrap. + * + * @param callback The callback method used to report progress back to the caller. + * + * @remark The #Azure::Core::IO::ProgressBodyStream does not own the wrapped stream + * and is not responsible for closing / cleaning up resources. + */ + ProgressBodyStream( + BodyStream& bodyStream, + std::function callback); + + void Rewind() override; + + int64_t Length() const override; + }; }}} // namespace Azure::Core::IO diff --git a/sdk/core/azure-core/src/io/body_stream.cpp b/sdk/core/azure-core/src/io/body_stream.cpp index 020d40182..2fc536cde 100644 --- a/sdk/core/azure-core/src/io/body_stream.cpp +++ b/sdk/core/azure-core/src/io/body_stream.cpp @@ -190,3 +190,31 @@ size_t FileBodyStream::OnRead(uint8_t* buffer, size_t count, Azure::Core::Contex void FileBodyStream::Rewind() { m_randomAccessFileBodyStream->Rewind(); } int64_t FileBodyStream::Length() const { return m_randomAccessFileBodyStream->Length(); } + +ProgressBodyStream::ProgressBodyStream( + BodyStream& bodyStream, + std::function callback) + : m_bodyStream(&bodyStream), m_bytesTransferred(0), m_callback(std::move(callback)) +{ +} + +void ProgressBodyStream::Rewind() +{ + m_bodyStream->Rewind(); + m_bytesTransferred = 0; + m_callback(m_bytesTransferred); +} + +size_t ProgressBodyStream::OnRead( + uint8_t* buffer, + size_t count, + Azure::Core::Context const& context) +{ + size_t read = m_bodyStream->Read(buffer, count, context); + m_bytesTransferred += read; + m_callback(m_bytesTransferred); + + return read; +} + +int64_t ProgressBodyStream::Length() const { return m_bodyStream->Length(); } diff --git a/sdk/core/azure-core/test/ut/bodystream_test.cpp b/sdk/core/azure-core/test/ut/bodystream_test.cpp index 528177ef7..30e1a3e77 100644 --- a/sdk/core/azure-core/test/ut/bodystream_test.cpp +++ b/sdk/core/azure-core/test/ut/bodystream_test.cpp @@ -145,3 +145,120 @@ TEST(FileBodyStream, Read) EXPECT_EQ(readSize, FileSize); EXPECT_EQ(buffer[FileSize], 0); } + +TEST(ProgressBodyStream, Init) +{ + int64_t bytesTransferred = -1; + std::string testDataPath(AZURE_TEST_DATA_PATH); + testDataPath.append("/fileData"); + + Azure::Core::IO::FileBodyStream stream(testDataPath); + + ProgressBodyStream progress(stream, [&bytesTransferred](int64_t bt) { bytesTransferred = bt; }); + + EXPECT_EQ(bytesTransferred, -1); + EXPECT_EQ(progress.Length(), stream.Length()); +} + +TEST(ProgressBodyStream, ReadChunk) +{ + int64_t bytesTransferred = -1; + std::string testDataPath(AZURE_TEST_DATA_PATH); + testDataPath.append("/fileData"); + + Azure::Core::IO::FileBodyStream stream(testDataPath); + + ProgressBodyStream progress(stream, [&bytesTransferred](int64_t bt) { bytesTransferred = bt; }); + + std::vector buffer(30); + + size_t readSize = progress.ReadToCount(buffer.data(), 10); + + EXPECT_EQ(bytesTransferred, 10); + EXPECT_EQ(progress.Length(), stream.Length()); + EXPECT_EQ(readSize, 10); +} + +TEST(ProgressBodyStream, MultiWrapProgressStream) +{ + int64_t bytesTransferred = -1; + int64_t wrapBytesTransferred = -1; + std::string testDataPath(AZURE_TEST_DATA_PATH); + testDataPath.append("/fileData"); + + Azure::Core::IO::FileBodyStream stream(testDataPath); + + ProgressBodyStream progress(stream, [&bytesTransferred](int64_t bt) { bytesTransferred = bt; }); + ProgressBodyStream progress2( + progress, [&wrapBytesTransferred](int64_t bt) { wrapBytesTransferred = bt; }); + std::vector buffer(30); + + size_t readSize = progress2.ReadToCount(buffer.data(), 10); + + EXPECT_EQ(bytesTransferred, 10); + EXPECT_EQ(progress.Length(), stream.Length()); + EXPECT_EQ(readSize, 10); + + EXPECT_EQ(wrapBytesTransferred, 10); + EXPECT_EQ(progress2.Length(), stream.Length()); +} + +TEST(ProgressBodyStream, ReadMultipleChunks) +{ + int64_t bytesTransferred = -1; + std::string testDataPath(AZURE_TEST_DATA_PATH); + testDataPath.append("/fileData"); + + Azure::Core::IO::FileBodyStream stream(testDataPath); + + ProgressBodyStream progress(stream, [&bytesTransferred](int64_t bt) { bytesTransferred = bt; }); + + std::vector buffer(10); + + for (int i = 0; i < stream.Length() / 10; i++) + { + size_t readSize = progress.ReadToCount(buffer.data(), 10); + + EXPECT_EQ(bytesTransferred, (i + 1) * 10); + EXPECT_EQ(progress.Length(), stream.Length()); + EXPECT_EQ(readSize, 10); + } +} + +TEST(ProgressBodyStream, ReadMultipleChunksRewind) +{ + int64_t bytesTransferred = -1; + std::string testDataPath(AZURE_TEST_DATA_PATH); + testDataPath.append("/fileData"); + + Azure::Core::IO::FileBodyStream stream(testDataPath); + + ProgressBodyStream progress(stream, [&bytesTransferred](int64_t bt) { bytesTransferred = bt; }); + + std::vector buffer(10); + + for (int i = 0; i < stream.Length() / 100; i++) + { + + size_t readSize = progress.ReadToCount(buffer.data(), 10); + + EXPECT_EQ(bytesTransferred, (i + 1) * 10); + EXPECT_EQ(progress.Length(), stream.Length()); + EXPECT_EQ(readSize, 10); + } + + progress.Rewind(); + + EXPECT_EQ(bytesTransferred, 0); + EXPECT_EQ(progress.Length(), stream.Length()); + + for (int i = 0; i < stream.Length() / 100; i++) + { + + size_t readSize = progress.ReadToCount(buffer.data(), 10); + + EXPECT_EQ(bytesTransferred, (i + 1) * 10); + EXPECT_EQ(progress.Length(), stream.Length()); + EXPECT_EQ(readSize, 10); + } +}