We should NOT deal with any network connections while holding mutex (#2355)

* try to fix hang issue in core sdk

* clang-format

* fix more

* fix
This commit is contained in:
JinmingHu 2021-06-09 15:40:44 +08:00 committed by GitHub
parent 5a477ad533
commit a3be057f59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -212,9 +212,12 @@ static void CleanupThread()
Logger::Level::Verbose,
"Clean pool - no connections on wake - return *************************");
CurlConnectionPool::g_curlConnectionPool.IsCleanThreadRunning = false;
return;
break;
}
decltype(CurlConnectionPool::g_curlConnectionPool
.ConnectionPoolIndex)::mapped_type connectionsToBeCleaned;
Log::Write(Logger::Level::Verbose, "Clean pool - inspect pool");
// loop the connection pool index - Note: lock is re-taken for the mutex
// Notes: The size of each host-index is always expected to be greater than 0 because the
@ -227,35 +230,37 @@ static void CleanupThread()
// of this, the oldest connection in the pool can be found at the end of the list. Looping the
// connection pool backwards until a connection that is not expired is found or until all
// connections are removed.
for (auto connection = --(index->second.end());
index->second.size() > 0 && connection->get()->IsExpired();
connection = index->second.size() > 0 ? --connection : connection)
auto& connectionList = index->second;
auto connectionIter = connectionList.end();
while (connectionIter != connectionList.begin())
{
// remove connection from the pool and update the connection to the next one
// which is going to be list.end()
Log::Write(Logger::Level::Verbose, "Clean pool - remove connection");
connection = index->second.erase(connection);
--connectionIter;
if ((*connectionIter)->IsExpired())
{
// remove connection from the pool and update the connection to the next one
// which is going to be list.end()
connectionsToBeCleaned.emplace_back(std::move(*connectionIter));
connectionIter = connectionList.erase(connectionIter);
}
else
{
break;
}
}
if (index->second.size() == 0)
if (connectionList.empty())
{
Log::Write(Logger::Level::Verbose, "Clean pool - remove index " + index->first);
index = CurlConnectionPool::g_curlConnectionPool.ConnectionPoolIndex.erase(index);
}
else
{
index = ++index;
++index;
}
}
if (CurlConnectionPool::g_curlConnectionPool.ConnectionPoolIndex.size() == 0)
{
Log::Write(
Logger::Level::Verbose,
"Clean pool - all connections removed. Return**********************");
CurlConnectionPool::g_curlConnectionPool.IsCleanThreadRunning = false;
return;
}
lockForPoolCleaning.unlock();
// Do actual connections release work here, without holding the mutex.
}
}
} // namespace
@ -1243,9 +1248,12 @@ std::unique_ptr<CurlNetworkConnection> CurlConnectionPool::ExtractOrCreateCurlCo
std::string const connectionKey = GetConnectionKey(host, options);
{
decltype(CurlConnectionPool::g_curlConnectionPool
.ConnectionPoolIndex)::mapped_type connectionsToBeReset;
// Critical section. Needs to own ConnectionPoolMutex before executing
// Lock mutex to access connection pool. mutex is unlock as soon as lock is out of scope
std::lock_guard<std::mutex> lock(CurlConnectionPool::ConnectionPoolMutex);
std::unique_lock<std::mutex> lock(CurlConnectionPool::ConnectionPoolMutex);
// get a ref to the pool from the map of pools
auto hostPoolIndex = g_curlConnectionPool.ConnectionPoolIndex.find(connectionKey);
@ -1255,6 +1263,7 @@ std::unique_ptr<CurlNetworkConnection> CurlConnectionPool::ExtractOrCreateCurlCo
{
if (resetPool)
{
connectionsToBeReset = std::move(hostPoolIndex->second);
// clean the pool-index as requested in the call. Typically to force a new connection to be
// created and to discard all current connections in the pool for the host-index. A caller
// might request this after getting broken/closed connections multiple-times.
@ -1279,6 +1288,7 @@ std::unique_ptr<CurlNetworkConnection> CurlConnectionPool::ExtractOrCreateCurlCo
return connection;
}
}
lock.unlock();
}
// Creating a new connection is thread safe. No need to lock mutex here.
@ -1405,15 +1415,19 @@ void CurlConnectionPool::MoveConnectionBackToPool(
Log::Write(Logger::Level::Verbose, "Moving connection to pool...");
decltype(CurlConnectionPool::g_curlConnectionPool
.ConnectionPoolIndex)::mapped_type::value_type connectionToBeRemoved;
// Lock mutex to access connection pool. mutex is unlock as soon as lock is out of scope
std::lock_guard<std::mutex> lock(CurlConnectionPool::ConnectionPoolMutex);
std::unique_lock<std::mutex> lock(CurlConnectionPool::ConnectionPoolMutex);
auto& poolId = connection->GetConnectionKey();
auto& hostPool = g_curlConnectionPool.ConnectionPoolIndex[poolId];
if (hostPool.size() >= _detail::MaxConnectionsPerIndex)
if (hostPool.size() >= _detail::MaxConnectionsPerIndex && !hostPool.empty())
{
// Remove the last connection from the pool to insert this one.
auto lastConnection = --hostPool.end();
connectionToBeRemoved = std::move(*lastConnection);
hostPool.erase(lastConnection);
}
@ -1440,4 +1454,5 @@ void CurlConnectionPool::MoveConnectionBackToPool(
{
Log::Write(Logger::Level::Verbose, "Clean thread running. Won't start a new one.");
}
lock.unlock();
}