From 9bae3fbd5e889315e77d02eb669461fecee786de Mon Sep 17 00:00:00 2001 From: HolyLow Date: Tue, 25 Mar 2025 17:54:34 +0800 Subject: [PATCH] [CELEBORN-1915][CIP-14] Add reader's ShuffleClient to cppClient ### What changes were proposed in this pull request? This PR adds reader end's ShuffleClient to cppClient. ### Why are the changes needed? ShuffleClient is the user interface for cppClient usage. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Compilation. Closes #3156 from HolyLow/issue/celeborn-1915-add-reader-shuffle-client-to-cppclient. Authored-by: HolyLow Signed-off-by: mingji --- cpp/celeborn/client/CMakeLists.txt | 3 +- cpp/celeborn/client/ShuffleClient.cpp | 141 ++++++++++++++++++++++++++ cpp/celeborn/client/ShuffleClient.h | 86 ++++++++++++++++ cpp/celeborn/utils/CelebornUtils.cpp | 4 + cpp/celeborn/utils/CelebornUtils.h | 12 +++ 5 files changed, 245 insertions(+), 1 deletion(-) create mode 100644 cpp/celeborn/client/ShuffleClient.cpp create mode 100644 cpp/celeborn/client/ShuffleClient.h diff --git a/cpp/celeborn/client/CMakeLists.txt b/cpp/celeborn/client/CMakeLists.txt index f9edfe949..2491b7d7f 100644 --- a/cpp/celeborn/client/CMakeLists.txt +++ b/cpp/celeborn/client/CMakeLists.txt @@ -15,7 +15,8 @@ add_library( client reader/WorkerPartitionReader.cpp - reader/CelebornInputStream.cpp) + reader/CelebornInputStream.cpp + ShuffleClient.cpp) target_include_directories(client PUBLIC ${CMAKE_BINARY_DIR}) diff --git a/cpp/celeborn/client/ShuffleClient.cpp b/cpp/celeborn/client/ShuffleClient.cpp new file mode 100644 index 000000000..35e593d1b --- /dev/null +++ b/cpp/celeborn/client/ShuffleClient.cpp @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "celeborn/client/ShuffleClient.h" + +#include "celeborn/utils/CelebornUtils.h" + +namespace celeborn { +namespace client { +ShuffleClientImpl::ShuffleClientImpl( + const std::string& appUniqueId, + const std::shared_ptr& conf, + const std::shared_ptr& clientFactory) + : appUniqueId_(appUniqueId), conf_(conf), clientFactory_(clientFactory) {} + +void ShuffleClientImpl::setupLifecycleManagerRef(std::string& host, int port) { + auto managerClient = clientFactory_->createClient(host, port); + { + std::lock_guard lock(mutex_); + lifecycleManagerRef_ = std::make_shared( + "LifecycleManagerEndpoint", "dummy", 0, host, port, managerClient); + } +} + +void ShuffleClientImpl::setupLifecycleManagerRef( + std::shared_ptr& lifecycleManagerRef) { + std::lock_guard lock(mutex_); + lifecycleManagerRef_ = lifecycleManagerRef; +} + +std::unique_ptr ShuffleClientImpl::readPartition( + int shuffleId, + int partitionId, + int attemptNumber, + int startMapIndex, + int endMapIndex) { + const auto& reducerFileGroupInfo = getReducerFileGroupInfo(shuffleId); + std::string shuffleKey = utils::makeShuffleKey(appUniqueId_, shuffleId); + std::vector> locations; + if (!reducerFileGroupInfo.fileGroups.empty() && + reducerFileGroupInfo.fileGroups.count(partitionId)) { + locations = std::move( + utils::toVector( + reducerFileGroupInfo.fileGroups.find(partitionId)->second)); + } + return std::make_unique( + shuffleKey, + conf_, + clientFactory_, + std::move(locations), + reducerFileGroupInfo.attempts, + attemptNumber, + startMapIndex, + endMapIndex); +} + +void ShuffleClientImpl::updateReducerFileGroup(int shuffleId) { + CELEBORN_CHECK( + lifecycleManagerRef_, "lifecycleManagerRef_ is not initialized"); + // Send the query request to lifecycleManager. + auto reducerFileGroupInfo = lifecycleManagerRef_->askSync( + protocol::GetReducerFileGroup{shuffleId}, + conf_->clientRpcGetReducerFileGroupRpcAskTimeout()); + + switch (reducerFileGroupInfo->status) { + case protocol::SUCCESS: { + VLOG(1) << "success to get reducerFileGroupInfo, shuffleId " << shuffleId; + std::lock_guard lock(mutex_); + if (reducerFileGroupInfos_.count(shuffleId) > 0) { + VLOG(1) << "reducerFileGroupInfo for shuffleId" << shuffleId + << " already exists, ignored"; + return; + } + reducerFileGroupInfos_[shuffleId] = std::move(reducerFileGroupInfo); + return; + } + case protocol::SHUFFLE_NOT_REGISTERED: { + // We cannot treat this as a failure. It indicates this is an empty + // shuffle. + LOG(WARNING) << "shuffleId " << shuffleId + << " is not registered when get reducerFileGroupInfo"; + std::lock_guard lock(mutex_); + if (reducerFileGroupInfos_.count(shuffleId) > 0) { + VLOG(1) << "reducerFileGroupInfo for shuffleId" << shuffleId + << " already exists, ignored"; + return; + } + reducerFileGroupInfos_[shuffleId] = std::move(reducerFileGroupInfo); + return; + } + case protocol::STAGE_END_TIME_OUT: + case protocol::SHUFFLE_DATA_LOST: { + LOG(ERROR) << "shuffleId " << shuffleId + << " failed getReducerFileGroupInfo with code " + << reducerFileGroupInfo->status; + CELEBORN_FAIL("failed protocol::GetReducerFileGroupResponse code"); + } + default: { + CELEBORN_FAIL("undefined protocol::GetReducerFileGroupResponse code"); + } + } +} + +bool ShuffleClientImpl::cleanupShuffle(int shuffleId) { + std::lock_guard lock(mutex_); + reducerFileGroupInfos_.erase(shuffleId); + return true; +} + +protocol::GetReducerFileGroupResponse& +ShuffleClientImpl::getReducerFileGroupInfo(int shuffleId) { + { + std::lock_guard lock(mutex_); + auto iter = reducerFileGroupInfos_.find(shuffleId); + if (iter != reducerFileGroupInfos_.end()) { + return *iter->second; + } + } + + updateReducerFileGroup(shuffleId); + { + std::lock_guard lock(mutex_); + return *reducerFileGroupInfos_[shuffleId]; + } +} +} // namespace client +} // namespace celeborn diff --git a/cpp/celeborn/client/ShuffleClient.h b/cpp/celeborn/client/ShuffleClient.h new file mode 100644 index 000000000..d66d1649d --- /dev/null +++ b/cpp/celeborn/client/ShuffleClient.h @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "celeborn/client/reader/CelebornInputStream.h" +#include "celeborn/network/NettyRpcEndpointRef.h" + +namespace celeborn { +namespace client { +class ShuffleClient { + public: + virtual void setupLifecycleManagerRef(std::string& host, int port) = 0; + + virtual void setupLifecycleManagerRef( + std::shared_ptr& lifecycleManagerRef) = 0; + + virtual void updateReducerFileGroup(int shuffleId) = 0; + + virtual std::unique_ptr readPartition( + int shuffleId, + int partitionId, + int attemptNumber, + int startMapIndex, + int endMapIndex) = 0; + + virtual bool cleanupShuffle(int shuffleId) = 0; + + virtual void shutdown() = 0; +}; + +class ShuffleClientImpl : public ShuffleClient { + public: + ShuffleClientImpl( + const std::string& appUniqueId, + const std::shared_ptr& conf, + const std::shared_ptr& clientFactory); + + void setupLifecycleManagerRef(std::string& host, int port) override; + + void setupLifecycleManagerRef( + std::shared_ptr& lifecycleManagerRef) + override; + + std::unique_ptr readPartition( + int shuffleId, + int partitionId, + int attemptNumber, + int startMapIndex, + int endMapIndex) override; + + void updateReducerFileGroup(int shuffleId) override; + + bool cleanupShuffle(int shuffleId) override; + + void shutdown() override {} + + private: + protocol::GetReducerFileGroupResponse& getReducerFileGroupInfo(int shuffleId); + + const std::string appUniqueId_; + std::shared_ptr conf_; + std::shared_ptr lifecycleManagerRef_; + std::shared_ptr clientFactory_; + std::mutex mutex_; + std::unordered_map< + long, + std::unique_ptr> + reducerFileGroupInfos_; +}; +} // namespace client +} // namespace celeborn diff --git a/cpp/celeborn/utils/CelebornUtils.cpp b/cpp/celeborn/utils/CelebornUtils.cpp index 862dad7cc..24340cb79 100644 --- a/cpp/celeborn/utils/CelebornUtils.cpp +++ b/cpp/celeborn/utils/CelebornUtils.cpp @@ -19,6 +19,10 @@ namespace celeborn { namespace utils { +std::string makeShuffleKey(const std::string& appId, const int shuffleId) { + return appId + "-" + std::to_string(shuffleId); +} + void writeUTF(memory::WriteOnlyByteBuffer& buffer, const std::string& msg) { buffer.write(msg.size()); buffer.writeFromString(msg); diff --git a/cpp/celeborn/utils/CelebornUtils.h b/cpp/celeborn/utils/CelebornUtils.h index 83e899533..1fab9352d 100644 --- a/cpp/celeborn/utils/CelebornUtils.h +++ b/cpp/celeborn/utils/CelebornUtils.h @@ -35,6 +35,18 @@ namespace utils { #define CELEBORN_SHUTDOWN_LOG(severity) \ LOG(severity) << CELEBORN_SHUTDOWN_LOG_PREFIX +template +std::vector toVector(const std::set& in) { + std::vector out{}; + out.reserve(in.size()); + for (const auto& i : in) { + out.emplace_back(i); + } + return std::move(out); +} + +std::string makeShuffleKey(const std::string& appId, int shuffleId); + void writeUTF(memory::WriteOnlyByteBuffer& buffer, const std::string& msg); void writeRpcAddress(