[CELEBORN-1915][CIP-14] Add reader's ShuffleClient to cppClient

### What changes were proposed in this pull request?
This PR adds reader end's ShuffleClient to cppClient.

### Why are the changes needed?
ShuffleClient is the user interface for cppClient usage.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Compilation.

Closes #3156 from HolyLow/issue/celeborn-1915-add-reader-shuffle-client-to-cppclient.

Authored-by: HolyLow <jiaming.xie7@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
This commit is contained in:
HolyLow 2025-03-25 17:54:34 +08:00 committed by mingji
parent 0a97ca0aa9
commit 9bae3fbd5e
5 changed files with 245 additions and 1 deletions

View File

@ -15,7 +15,8 @@
add_library(
client
reader/WorkerPartitionReader.cpp
reader/CelebornInputStream.cpp)
reader/CelebornInputStream.cpp
ShuffleClient.cpp)
target_include_directories(client PUBLIC ${CMAKE_BINARY_DIR})

View File

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "celeborn/client/ShuffleClient.h"
#include "celeborn/utils/CelebornUtils.h"
namespace celeborn {
namespace client {
ShuffleClientImpl::ShuffleClientImpl(
const std::string& appUniqueId,
const std::shared_ptr<const conf::CelebornConf>& conf,
const std::shared_ptr<network::TransportClientFactory>& clientFactory)
: appUniqueId_(appUniqueId), conf_(conf), clientFactory_(clientFactory) {}
void ShuffleClientImpl::setupLifecycleManagerRef(std::string& host, int port) {
auto managerClient = clientFactory_->createClient(host, port);
{
std::lock_guard<std::mutex> lock(mutex_);
lifecycleManagerRef_ = std::make_shared<network::NettyRpcEndpointRef>(
"LifecycleManagerEndpoint", "dummy", 0, host, port, managerClient);
}
}
void ShuffleClientImpl::setupLifecycleManagerRef(
std::shared_ptr<network::NettyRpcEndpointRef>& lifecycleManagerRef) {
std::lock_guard<std::mutex> lock(mutex_);
lifecycleManagerRef_ = lifecycleManagerRef;
}
std::unique_ptr<CelebornInputStream> ShuffleClientImpl::readPartition(
int shuffleId,
int partitionId,
int attemptNumber,
int startMapIndex,
int endMapIndex) {
const auto& reducerFileGroupInfo = getReducerFileGroupInfo(shuffleId);
std::string shuffleKey = utils::makeShuffleKey(appUniqueId_, shuffleId);
std::vector<std::shared_ptr<const protocol::PartitionLocation>> locations;
if (!reducerFileGroupInfo.fileGroups.empty() &&
reducerFileGroupInfo.fileGroups.count(partitionId)) {
locations = std::move(
utils::toVector(
reducerFileGroupInfo.fileGroups.find(partitionId)->second));
}
return std::make_unique<CelebornInputStream>(
shuffleKey,
conf_,
clientFactory_,
std::move(locations),
reducerFileGroupInfo.attempts,
attemptNumber,
startMapIndex,
endMapIndex);
}
void ShuffleClientImpl::updateReducerFileGroup(int shuffleId) {
CELEBORN_CHECK(
lifecycleManagerRef_, "lifecycleManagerRef_ is not initialized");
// Send the query request to lifecycleManager.
auto reducerFileGroupInfo = lifecycleManagerRef_->askSync(
protocol::GetReducerFileGroup{shuffleId},
conf_->clientRpcGetReducerFileGroupRpcAskTimeout());
switch (reducerFileGroupInfo->status) {
case protocol::SUCCESS: {
VLOG(1) << "success to get reducerFileGroupInfo, shuffleId " << shuffleId;
std::lock_guard<std::mutex> lock(mutex_);
if (reducerFileGroupInfos_.count(shuffleId) > 0) {
VLOG(1) << "reducerFileGroupInfo for shuffleId" << shuffleId
<< " already exists, ignored";
return;
}
reducerFileGroupInfos_[shuffleId] = std::move(reducerFileGroupInfo);
return;
}
case protocol::SHUFFLE_NOT_REGISTERED: {
// We cannot treat this as a failure. It indicates this is an empty
// shuffle.
LOG(WARNING) << "shuffleId " << shuffleId
<< " is not registered when get reducerFileGroupInfo";
std::lock_guard<std::mutex> lock(mutex_);
if (reducerFileGroupInfos_.count(shuffleId) > 0) {
VLOG(1) << "reducerFileGroupInfo for shuffleId" << shuffleId
<< " already exists, ignored";
return;
}
reducerFileGroupInfos_[shuffleId] = std::move(reducerFileGroupInfo);
return;
}
case protocol::STAGE_END_TIME_OUT:
case protocol::SHUFFLE_DATA_LOST: {
LOG(ERROR) << "shuffleId " << shuffleId
<< " failed getReducerFileGroupInfo with code "
<< reducerFileGroupInfo->status;
CELEBORN_FAIL("failed protocol::GetReducerFileGroupResponse code");
}
default: {
CELEBORN_FAIL("undefined protocol::GetReducerFileGroupResponse code");
}
}
}
bool ShuffleClientImpl::cleanupShuffle(int shuffleId) {
std::lock_guard<std::mutex> lock(mutex_);
reducerFileGroupInfos_.erase(shuffleId);
return true;
}
protocol::GetReducerFileGroupResponse&
ShuffleClientImpl::getReducerFileGroupInfo(int shuffleId) {
{
std::lock_guard<std::mutex> lock(mutex_);
auto iter = reducerFileGroupInfos_.find(shuffleId);
if (iter != reducerFileGroupInfos_.end()) {
return *iter->second;
}
}
updateReducerFileGroup(shuffleId);
{
std::lock_guard<std::mutex> lock(mutex_);
return *reducerFileGroupInfos_[shuffleId];
}
}
} // namespace client
} // namespace celeborn

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "celeborn/client/reader/CelebornInputStream.h"
#include "celeborn/network/NettyRpcEndpointRef.h"
namespace celeborn {
namespace client {
class ShuffleClient {
public:
virtual void setupLifecycleManagerRef(std::string& host, int port) = 0;
virtual void setupLifecycleManagerRef(
std::shared_ptr<network::NettyRpcEndpointRef>& lifecycleManagerRef) = 0;
virtual void updateReducerFileGroup(int shuffleId) = 0;
virtual std::unique_ptr<CelebornInputStream> readPartition(
int shuffleId,
int partitionId,
int attemptNumber,
int startMapIndex,
int endMapIndex) = 0;
virtual bool cleanupShuffle(int shuffleId) = 0;
virtual void shutdown() = 0;
};
class ShuffleClientImpl : public ShuffleClient {
public:
ShuffleClientImpl(
const std::string& appUniqueId,
const std::shared_ptr<const conf::CelebornConf>& conf,
const std::shared_ptr<network::TransportClientFactory>& clientFactory);
void setupLifecycleManagerRef(std::string& host, int port) override;
void setupLifecycleManagerRef(
std::shared_ptr<network::NettyRpcEndpointRef>& lifecycleManagerRef)
override;
std::unique_ptr<CelebornInputStream> readPartition(
int shuffleId,
int partitionId,
int attemptNumber,
int startMapIndex,
int endMapIndex) override;
void updateReducerFileGroup(int shuffleId) override;
bool cleanupShuffle(int shuffleId) override;
void shutdown() override {}
private:
protocol::GetReducerFileGroupResponse& getReducerFileGroupInfo(int shuffleId);
const std::string appUniqueId_;
std::shared_ptr<const conf::CelebornConf> conf_;
std::shared_ptr<network::NettyRpcEndpointRef> lifecycleManagerRef_;
std::shared_ptr<network::TransportClientFactory> clientFactory_;
std::mutex mutex_;
std::unordered_map<
long,
std::unique_ptr<protocol::GetReducerFileGroupResponse>>
reducerFileGroupInfos_;
};
} // namespace client
} // namespace celeborn

View File

@ -19,6 +19,10 @@
namespace celeborn {
namespace utils {
std::string makeShuffleKey(const std::string& appId, const int shuffleId) {
return appId + "-" + std::to_string(shuffleId);
}
void writeUTF(memory::WriteOnlyByteBuffer& buffer, const std::string& msg) {
buffer.write<short>(msg.size());
buffer.writeFromString(msg);

View File

@ -35,6 +35,18 @@ namespace utils {
#define CELEBORN_SHUTDOWN_LOG(severity) \
LOG(severity) << CELEBORN_SHUTDOWN_LOG_PREFIX
template <typename T>
std::vector<T> toVector(const std::set<T>& in) {
std::vector<T> out{};
out.reserve(in.size());
for (const auto& i : in) {
out.emplace_back(i);
}
return std::move(out);
}
std::string makeShuffleKey(const std::string& appId, int shuffleId);
void writeUTF(memory::WriteOnlyByteBuffer& buffer, const std::string& msg);
void writeRpcAddress(