Refactor Kyuubi kubernetes block cleaner (#794)
Refactor Kyuubi kubernetes block cleaner
This commit is contained in:
parent
48ea3d1763
commit
bd42d8a2f8
10
build/dist
10
build/dist
@ -179,7 +179,6 @@ mkdir -p "$DISTDIR/pid"
|
||||
mkdir -p "$DISTDIR/logs"
|
||||
mkdir -p "$DISTDIR/work"
|
||||
mkdir -p "$DISTDIR/externals/engines/spark"
|
||||
mkdir -p "$DISTDIR/tools/kubernetes/docker/spark-block-cleaner/jars"
|
||||
echo "Kyuubi $VERSION $GITREVSTRING built for" > "$DISTDIR/RELEASE"
|
||||
echo "Java $JAVA_VERSION" >> "$DISTDIR/RELEASE"
|
||||
echo "Scala $SCALA_VERSION" >> "$DISTDIR/RELEASE"
|
||||
@ -195,9 +194,12 @@ cp -r "$KYUUBI_HOME/kyuubi-assembly/target/scala-$SCALA_VERSION/jars/" "$DISTDIR
|
||||
cp "$KYUUBI_HOME/externals/kyuubi-spark-sql-engine/target/kyuubi-spark-sql-engine-$VERSION.jar" "$DISTDIR/externals/engines/spark"
|
||||
|
||||
# Copy kyuubi tools
|
||||
cp -r "$KYUUBI_HOME/tools/kubernetes/docker/" "$DISTDIR/tools/kubernetes"
|
||||
cp -r "$KYUUBI_HOME/kyuubi-assembly/target/scala-$SCALA_VERSION/jars/" "$DISTDIR/tools/kubernetes/docker/spark-block-cleaner/"
|
||||
cp "$KYUUBI_HOME/tools/kubernetes/spark-block-cleaner/target/spark-block-cleaner-$VERSION.jar" "$DISTDIR/tools/kubernetes/docker/spark-block-cleaner/jars"
|
||||
if [[ -f "$KYUUBI_HOME/tools/spark-block-cleaner/target/spark-block-cleaner-$VERSION.jar" ]]; then
|
||||
mkdir -p "$DISTDIR/tools/spark-block-cleaner/kubernetes/docker"
|
||||
mkdir -p "$DISTDIR/tools/spark-block-cleaner/jars"
|
||||
cp -r "$KYUUBI_HOME/tools/spark-block-cleaner/kubernetes/" "$DISTDIR/tools/spark-block-cleaner/kubernetes/"
|
||||
cp "$KYUUBI_HOME/tools/spark-block-cleaner/target/spark-block-cleaner-$VERSION.jar" "$DISTDIR/tools/spark-block-cleaner/jars/"
|
||||
fi
|
||||
|
||||
# Copy Kyuubi extension
|
||||
SPARK_MID_VERSION=${SPARK_VERSION%.*}
|
||||
|
||||
8
pom.xml
8
pom.xml
@ -37,7 +37,6 @@
|
||||
<module>kyuubi-main</module>
|
||||
<module>kyuubi-metrics</module>
|
||||
<module>kyuubi-zookeeper</module>
|
||||
<module>tools/kubernetes/spark-block-cleaner</module>
|
||||
</modules>
|
||||
<packaging>pom</packaging>
|
||||
|
||||
@ -1694,5 +1693,12 @@
|
||||
<module>dev/kyuubi-extension-spark_3.1</module>
|
||||
</modules>
|
||||
</profile>
|
||||
|
||||
<profile>
|
||||
<id>spark-block-cleaner</id>
|
||||
<modules>
|
||||
<module>tools/spark-block-cleaner</module>
|
||||
</modules>
|
||||
</profile>
|
||||
</profiles>
|
||||
</project>
|
||||
|
||||
@ -1,26 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.tools
|
||||
|
||||
object Constants {
|
||||
val CACHE_DIRS_KEY = "CACHE_DIRS"
|
||||
val FILE_EXPIRED_TIME_KEY = "FILE_EXPIRED_TIME"
|
||||
val FREE_SPACE_THRESHOLD_KEY = "FREE_SPACE_THRESHOLD"
|
||||
val SLEEP_TIME_KEY = "SLEEP_TIME"
|
||||
val DEEP_CLEAN_FILE_EXPIRED_TIME_KEY = "DEEP_CLEAN_FILE_EXPIRED_TIME"
|
||||
}
|
||||
@ -1,206 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.tools
|
||||
|
||||
import java.io.File
|
||||
import java.nio.file.{Files, Paths}
|
||||
import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
|
||||
|
||||
import org.apache.kyuubi.Logging
|
||||
import org.apache.kyuubi.tools.Constants._
|
||||
/*
|
||||
* Spark storage shuffle data as the following structure.
|
||||
*
|
||||
* local-dir1/
|
||||
* blockmgr-uuid/
|
||||
* hash-sub-dir/
|
||||
* shuffle-data
|
||||
* shuffle-index
|
||||
*
|
||||
* local-dir2/
|
||||
* blockmgr-uuid/
|
||||
* hash-sub-dir/
|
||||
* shuffle-data
|
||||
* shuffle-index
|
||||
*
|
||||
* ...
|
||||
*/
|
||||
object KubernetesSparkBlockCleaner extends Logging {
|
||||
private val envMap = System.getenv()
|
||||
|
||||
private val freeSpaceThreshold = envMap.getOrDefault(FREE_SPACE_THRESHOLD_KEY,
|
||||
"60").toInt
|
||||
private val fileExpiredTime = envMap.getOrDefault(FILE_EXPIRED_TIME_KEY,
|
||||
"604800000").toLong
|
||||
private val sleepTime = envMap.getOrDefault(SLEEP_TIME_KEY,
|
||||
"3600000").toLong
|
||||
private val deepCleanFileExpiredTime = envMap.getOrDefault(DEEP_CLEAN_FILE_EXPIRED_TIME_KEY,
|
||||
"432000000").toLong
|
||||
private val cacheDirs = if (envMap.containsKey(CACHE_DIRS_KEY)) {
|
||||
envMap.get(CACHE_DIRS_KEY).split(",").filter(!_.equals(""))
|
||||
} else {
|
||||
throw new IllegalArgumentException(s"the env ${CACHE_DIRS_KEY} can not be null")
|
||||
}
|
||||
|
||||
def doClean(dir: File, time: Long) {
|
||||
info(s"start clean ${dir.getName} with fileExpiredTime ${time}")
|
||||
|
||||
// clean blockManager shuffle file
|
||||
dir.listFiles.filter(_.isDirectory).filter(_.getName.startsWith("blockmgr"))
|
||||
.foreach(blockManagerDir => {
|
||||
|
||||
info(s"start check blockManager dir ${blockManagerDir.getName}")
|
||||
// check blockManager directory
|
||||
blockManagerDir.listFiles.filter(_.isDirectory).foreach(subDir => {
|
||||
|
||||
info(s"start check sub dir ${subDir.getName}")
|
||||
// check sub directory
|
||||
subDir.listFiles.foreach(file => checkAndDeleteFIle(file, time))
|
||||
// delete empty sub directory
|
||||
checkAndDeleteDir(subDir)
|
||||
})
|
||||
// delete empty blockManager directory
|
||||
checkAndDeleteDir(blockManagerDir)
|
||||
})
|
||||
|
||||
// clean spark cache file
|
||||
dir.listFiles.filter(_.isDirectory).filter(_.getName.startsWith("spark"))
|
||||
.foreach(cacheDir => {
|
||||
info(s"start check cache dir ${cacheDir.getName}")
|
||||
cacheDir.listFiles.foreach(file => checkAndDeleteFIle(file, time))
|
||||
// delete empty spark cache file
|
||||
checkAndDeleteDir(cacheDir)
|
||||
})
|
||||
}
|
||||
|
||||
def checkAndDeleteFIle(file: File, time: Long): Unit = {
|
||||
info(s"check file ${file.getName}")
|
||||
if (System.currentTimeMillis() - file.lastModified() > time) {
|
||||
if (file.delete()) {
|
||||
info(s"delete file ${file.getName} success")
|
||||
} else {
|
||||
warn(s"delete file ${file.getName} fail")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def checkAndDeleteDir(dir: File): Unit = {
|
||||
if (dir.listFiles.isEmpty) {
|
||||
if (dir.delete()) {
|
||||
info(s"delete dir ${dir.getName} success")
|
||||
} else {
|
||||
warn(s"delete dir ${dir.getName} fail")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
import scala.sys.process._
|
||||
|
||||
def checkUsedCapacity(dir: String): Boolean = {
|
||||
val used = (s"df ${dir}" #| s"grep ${dir}").!!
|
||||
.split(" ").filter(_.endsWith("%")) {
|
||||
0
|
||||
}.replace("%", "")
|
||||
info(s"${dir} now used ${used}% space")
|
||||
|
||||
used.toInt > (100 - freeSpaceThreshold)
|
||||
}
|
||||
|
||||
def initializeConfiguration(): Unit = {
|
||||
if (fileExpiredTime < 0) {
|
||||
throw new IllegalArgumentException(s"the env ${FILE_EXPIRED_TIME_KEY} " +
|
||||
s"should be greater than 0")
|
||||
}
|
||||
|
||||
if (deepCleanFileExpiredTime < 0) {
|
||||
throw new IllegalArgumentException(s"the env ${DEEP_CLEAN_FILE_EXPIRED_TIME_KEY} " +
|
||||
s"should be greater than 0")
|
||||
}
|
||||
|
||||
if (sleepTime < 0) {
|
||||
throw new IllegalArgumentException(s"the env ${SLEEP_TIME_KEY} " +
|
||||
s"should be greater than 0")
|
||||
}
|
||||
|
||||
if (freeSpaceThreshold < 0 || freeSpaceThreshold > 100) {
|
||||
throw new IllegalArgumentException(s"the env ${FREE_SPACE_THRESHOLD_KEY} " +
|
||||
s"should between 0 and 100")
|
||||
}
|
||||
|
||||
info(s"finish initializing configuration, " +
|
||||
s"use ${CACHE_DIRS_KEY}: ${cacheDirs.mkString(",")}, " +
|
||||
s"${FILE_EXPIRED_TIME_KEY}: ${fileExpiredTime}, " +
|
||||
s"${FREE_SPACE_THRESHOLD_KEY}: ${freeSpaceThreshold}, " +
|
||||
s"${SLEEP_TIME_KEY}: ${sleepTime}, " +
|
||||
s"${DEEP_CLEAN_FILE_EXPIRED_TIME_KEY}: ${deepCleanFileExpiredTime}")
|
||||
}
|
||||
|
||||
def initializeThreadPool(): ThreadPoolExecutor = {
|
||||
new ThreadPoolExecutor(cacheDirs.length,
|
||||
cacheDirs.length * 2,
|
||||
0,
|
||||
TimeUnit.SECONDS,
|
||||
new LinkedBlockingQueue[Runnable]())
|
||||
}
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
initializeConfiguration()
|
||||
val threadPool = initializeThreadPool()
|
||||
try {
|
||||
while (true) {
|
||||
info("start clean job")
|
||||
cacheDirs.foreach(pathStr => {
|
||||
val path = Paths.get(pathStr)
|
||||
|
||||
if (!Files.exists(path)) {
|
||||
throw new IllegalArgumentException(s"this path ${pathStr} does not exists")
|
||||
}
|
||||
|
||||
if (!Files.isDirectory(path)) {
|
||||
throw new IllegalArgumentException(s"this path ${pathStr} is not directory")
|
||||
}
|
||||
|
||||
// Clean up files older than $fileExpiredTime
|
||||
threadPool.execute(() => {
|
||||
doClean(path.toFile, fileExpiredTime)
|
||||
})
|
||||
|
||||
if (checkUsedCapacity(pathStr)) {
|
||||
info("start deep clean job")
|
||||
// Clean up files older than $deepCleanFileExpiredTime
|
||||
threadPool.execute(() => {
|
||||
doClean(path.toFile, deepCleanFileExpiredTime)
|
||||
})
|
||||
if (checkUsedCapacity(pathStr)) {
|
||||
warn(s"after deep clean ${pathStr} " +
|
||||
s"used space still higher than ${freeSpaceThreshold}")
|
||||
}
|
||||
}
|
||||
})
|
||||
// Once $sleepTime
|
||||
Thread.sleep(sleepTime)
|
||||
}
|
||||
} catch {
|
||||
case exception: Exception => throw exception
|
||||
} finally {
|
||||
if (threadPool != null) {
|
||||
threadPool.shutdown()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -23,7 +23,8 @@ RUN apt-get update && \
|
||||
mkdir -p /log/cleanerLog
|
||||
|
||||
COPY jars /opt/block-cleaner
|
||||
COPY entrypoint.sh /opt/entrypoint.sh
|
||||
COPY tools/spark-block-cleaner/jars /opt/block-cleaner
|
||||
COPY tools/spark-block-cleaner/kubernetes/docker/entrypoint.sh /opt/entrypoint.sh
|
||||
|
||||
RUN chmod +x /opt/entrypoint.sh
|
||||
|
||||
@ -48,19 +48,19 @@ spec:
|
||||
# the target dirs which in container
|
||||
- name: CACHE_DIRS
|
||||
value: /data/data1,/data/data2
|
||||
# Cleaner will clean More distant block files
|
||||
# Cleaner will clean More distant block files, seconds
|
||||
- name: FILE_EXPIRED_TIME
|
||||
value: 604800000
|
||||
# Deep clean fileExpiredTime
|
||||
value: 604800
|
||||
# Deep clean fileExpiredTime, seconds
|
||||
- name: DEEP_CLEAN_FILE_EXPIRED_TIME
|
||||
value: 432000000
|
||||
value: 432000
|
||||
# After first clean, if free Space low than threshold
|
||||
# trigger deep clean
|
||||
- name: FREE_SPACE_THRESHOLD
|
||||
value: 60
|
||||
# Cleaner clean sleep times after cleaning
|
||||
- name: SLEEP_TIME
|
||||
value: 3600000
|
||||
# Cleaner clean sleep times after cleaning, seconds
|
||||
- name: SCHEDULE_INTERVAL
|
||||
value: 3600
|
||||
volumes:
|
||||
# Directory on the host which store block dirs
|
||||
- name: block-files-dir-1
|
||||
@ -22,16 +22,15 @@
|
||||
<artifactId>kyuubi</artifactId>
|
||||
<groupId>org.apache.kyuubi</groupId>
|
||||
<version>1.3.0-SNAPSHOT</version>
|
||||
<relativePath>../../../pom.xml</relativePath>
|
||||
<relativePath>../../pom.xml</relativePath>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>spark-block-cleaner</artifactId>
|
||||
<name>Kyuubi Tool Kubernetes Spark Block Cleaner</name>
|
||||
<name>Kyuubi Project Spark Block Cleaner</name>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.kyuubi</groupId>
|
||||
<artifactId>kyuubi-common</artifactId>
|
||||
@ -46,6 +45,11 @@
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.scalatest</groupId>
|
||||
<artifactId>scalatest_${scala.binary.version}</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
@ -0,0 +1,206 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.tools
|
||||
|
||||
import java.io.File
|
||||
import java.nio.file.{Files, Paths}
|
||||
import java.util.concurrent.{CountDownLatch, Executors}
|
||||
|
||||
import org.apache.kyuubi.Logging
|
||||
|
||||
/*
|
||||
* Spark storage shuffle data as the following structure.
|
||||
*
|
||||
* local-dir1/
|
||||
* blockmgr-uuid/
|
||||
* hash-sub-dir/
|
||||
* shuffle-data
|
||||
* shuffle-index
|
||||
*
|
||||
* local-dir2/
|
||||
* blockmgr-uuid/
|
||||
* hash-sub-dir/
|
||||
* shuffle-data
|
||||
* shuffle-index
|
||||
*
|
||||
* ...
|
||||
*/
|
||||
object KubernetesSparkBlockCleaner extends Logging {
|
||||
import KubernetesSparkBlockCleanerConstants._
|
||||
|
||||
private val envMap = System.getenv()
|
||||
|
||||
private val freeSpaceThreshold = envMap.getOrDefault(FREE_SPACE_THRESHOLD_KEY,
|
||||
"60").toInt
|
||||
private val fileExpiredTime = envMap.getOrDefault(FILE_EXPIRED_TIME_KEY,
|
||||
"604800").toLong * 1000
|
||||
private val scheduleInterval = envMap.getOrDefault(SCHEDULE_INTERVAL,
|
||||
"3600").toLong * 1000
|
||||
private val deepCleanFileExpiredTime = envMap.getOrDefault(DEEP_CLEAN_FILE_EXPIRED_TIME_KEY,
|
||||
"432000").toLong * 1000
|
||||
private val cacheDirs = if (envMap.containsKey(CACHE_DIRS_KEY)) {
|
||||
envMap.get(CACHE_DIRS_KEY).split(",").filter(!_.equals(""))
|
||||
} else {
|
||||
throw new IllegalArgumentException(s"the env $CACHE_DIRS_KEY must be set")
|
||||
}
|
||||
private val isTesting = envMap.getOrDefault("kyuubi.testing", "false").toBoolean
|
||||
checkConfiguration()
|
||||
|
||||
/**
|
||||
* one thread clean one dir
|
||||
*/
|
||||
private val threadPool = Executors.newFixedThreadPool(cacheDirs.length)
|
||||
|
||||
private def checkConfiguration(): Unit = {
|
||||
require(fileExpiredTime > 0,
|
||||
s"the env $FILE_EXPIRED_TIME_KEY should be greater than 0")
|
||||
require(deepCleanFileExpiredTime > 0,
|
||||
s"the env $DEEP_CLEAN_FILE_EXPIRED_TIME_KEY should be greater than 0")
|
||||
require(scheduleInterval > 0,
|
||||
s"the env $SCHEDULE_INTERVAL should be greater than 0")
|
||||
require(freeSpaceThreshold > 0 && freeSpaceThreshold < 100,
|
||||
s"the env $FREE_SPACE_THRESHOLD_KEY should between 0 and 100")
|
||||
require(cacheDirs.nonEmpty, s"the env $CACHE_DIRS_KEY must be set")
|
||||
cacheDirs.foreach { dir =>
|
||||
val path = Paths.get(dir)
|
||||
require(Files.exists(path),
|
||||
s"the input cache dir: $dir does not exists")
|
||||
require(Files.isDirectory(path),
|
||||
s"the input cache dir: $dir should be a directory")
|
||||
}
|
||||
|
||||
info(s"finish initializing configuration, " +
|
||||
s"use $CACHE_DIRS_KEY: ${cacheDirs.mkString(",")}, " +
|
||||
s"$FILE_EXPIRED_TIME_KEY: $fileExpiredTime, " +
|
||||
s"$FREE_SPACE_THRESHOLD_KEY: $freeSpaceThreshold, " +
|
||||
s"$SCHEDULE_INTERVAL: $scheduleInterval, " +
|
||||
s"$DEEP_CLEAN_FILE_EXPIRED_TIME_KEY: $deepCleanFileExpiredTime")
|
||||
}
|
||||
|
||||
private def doClean(dir: File, time: Long) {
|
||||
// clean blockManager shuffle file
|
||||
dir.listFiles.filter(_.isDirectory).filter(_.getName.startsWith("blockmgr"))
|
||||
.foreach { blockManagerDir =>
|
||||
info(s"start check blockManager dir ${blockManagerDir.getCanonicalPath}")
|
||||
// check blockManager directory
|
||||
val released = blockManagerDir.listFiles.filter(_.isDirectory).map { subDir =>
|
||||
debug(s"start check sub dir ${subDir.getCanonicalPath}")
|
||||
// check sub directory
|
||||
subDir.listFiles.map(file => checkAndDeleteFile(file, time)).sum
|
||||
}
|
||||
// delete empty blockManager directory and all empty sub directory
|
||||
if (blockManagerDir.listFiles().forall(
|
||||
subDir => subDir.isDirectory && subDir.listFiles().isEmpty)) {
|
||||
blockManagerDir.listFiles().foreach(checkAndDeleteFile(_, time, true))
|
||||
checkAndDeleteFile(blockManagerDir, time, true)
|
||||
}
|
||||
info(s"finished clean blockManager dir ${blockManagerDir.getCanonicalPath}, " +
|
||||
s"released space: ${released.sum / 1024 / 1024} MB.")
|
||||
}
|
||||
|
||||
// clean spark cache file
|
||||
dir.listFiles.filter(_.isDirectory).filter(_.getName.startsWith("spark"))
|
||||
.foreach { cacheDir =>
|
||||
info(s"start check cache dir ${cacheDir.getCanonicalPath}")
|
||||
val released = cacheDir.listFiles.map(file => checkAndDeleteFile(file, time))
|
||||
// delete empty spark cache file
|
||||
checkAndDeleteFile(cacheDir, time, true)
|
||||
info(s"finished clean cache dir ${cacheDir.getCanonicalPath}, " +
|
||||
s"released space: ${released.sum / 1024 / 1024} MB.")
|
||||
}
|
||||
}
|
||||
|
||||
private def checkAndDeleteFile(file: File, time: Long, isDir: Boolean = false): Long = {
|
||||
debug(s"check file ${file.getName}")
|
||||
val shouldDeleteFile = if (isDir) {
|
||||
file.listFiles.isEmpty && (System.currentTimeMillis() - file.lastModified() > time)
|
||||
} else {
|
||||
System.currentTimeMillis() - file.lastModified() > time
|
||||
}
|
||||
val length = if (isDir) 0 else file.length()
|
||||
if (shouldDeleteFile) {
|
||||
if (file.delete()) {
|
||||
debug(s"delete file ${file.getAbsolutePath} success")
|
||||
return length
|
||||
} else {
|
||||
warn(s"delete file ${file.getAbsolutePath} fail")
|
||||
}
|
||||
}
|
||||
0L
|
||||
}
|
||||
|
||||
import scala.sys.process._
|
||||
|
||||
private def needToDeepClean(dir: String): Boolean = {
|
||||
val used = (s"df $dir" #| s"grep $dir").!!
|
||||
.split(" ").filter(_.endsWith("%")) {
|
||||
0
|
||||
}.replace("%", "")
|
||||
info(s"$dir now used $used% space")
|
||||
|
||||
used.toInt > (100 - freeSpaceThreshold)
|
||||
}
|
||||
|
||||
private def doCleanJob(dir: String): Unit = {
|
||||
val startTime = System.currentTimeMillis()
|
||||
val path = Paths.get(dir)
|
||||
info(s"start clean job for $dir")
|
||||
doClean(path.toFile, fileExpiredTime)
|
||||
// re check if the disk has enough space
|
||||
if (needToDeepClean(dir)) {
|
||||
info(s"start deep clean job for $dir")
|
||||
doClean(path.toFile, deepCleanFileExpiredTime)
|
||||
if (needToDeepClean(dir)) {
|
||||
warn(s"after deep clean $dir, used space still higher than $freeSpaceThreshold")
|
||||
}
|
||||
}
|
||||
val finishedTime = System.currentTimeMillis()
|
||||
info(s"clean job $dir finished, elapsed time: ${(finishedTime - startTime) / 1000} s.")
|
||||
}
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
do {
|
||||
info(s"start all clean job")
|
||||
val startTime = System.currentTimeMillis()
|
||||
val hasFinished = new CountDownLatch(cacheDirs.length)
|
||||
cacheDirs.foreach { dir =>
|
||||
threadPool.execute(() => {
|
||||
doCleanJob(dir)
|
||||
hasFinished.countDown()
|
||||
})
|
||||
}
|
||||
hasFinished.await()
|
||||
|
||||
val usedTime = System.currentTimeMillis() - startTime
|
||||
info(s"finished to clean all dir, elapsed time $usedTime")
|
||||
if (usedTime > scheduleInterval) {
|
||||
warn(s"clean job elapsed time $usedTime which is greater than $scheduleInterval")
|
||||
} else {
|
||||
Thread.sleep(scheduleInterval - usedTime)
|
||||
}
|
||||
} while (!isTesting)
|
||||
}
|
||||
}
|
||||
|
||||
object KubernetesSparkBlockCleanerConstants {
|
||||
val CACHE_DIRS_KEY = "CACHE_DIRS"
|
||||
val FILE_EXPIRED_TIME_KEY = "FILE_EXPIRED_TIME"
|
||||
val FREE_SPACE_THRESHOLD_KEY = "FREE_SPACE_THRESHOLD"
|
||||
val SCHEDULE_INTERVAL = "SCHEDULE_INTERVAL"
|
||||
val DEEP_CLEAN_FILE_EXPIRED_TIME_KEY = "DEEP_CLEAN_FILE_EXPIRED_TIME"
|
||||
}
|
||||
@ -0,0 +1,109 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.tools
|
||||
|
||||
import java.io.File
|
||||
import java.nio.file.Files
|
||||
import java.util.UUID
|
||||
|
||||
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
|
||||
|
||||
class KubernetesSparkBlockCleanerSuite extends KyuubiFunSuite {
|
||||
import KubernetesSparkBlockCleanerConstants._
|
||||
|
||||
private val rootDir = Utils.createTempDir()
|
||||
private val cacheDir = Seq("1", "2").map(rootDir.resolve)
|
||||
private val block1 = new File(cacheDir.head.toFile, s"blockmgr-${UUID.randomUUID.toString}")
|
||||
private val block2 = new File(cacheDir.head.toFile, s"blockmgr-${UUID.randomUUID.toString}")
|
||||
|
||||
// do not remove
|
||||
private val subDir1 = new File(block1, "01")
|
||||
// do not remove
|
||||
private val data11 = new File(subDir1, "shuffle_0_0_0")
|
||||
// remove
|
||||
private val data12 = new File(subDir1, "shuffle_0_0_1")
|
||||
|
||||
// remove
|
||||
private val subDir2 = new File(block2, "02")
|
||||
// remove
|
||||
private val data21 = new File(subDir1, "shuffle_0_1_0")
|
||||
|
||||
private def deleteRecursive(path: File): Unit = {
|
||||
path.listFiles.foreach { f =>
|
||||
if (f.isDirectory) {
|
||||
deleteRecursive(f)
|
||||
} else {
|
||||
f.delete()
|
||||
}
|
||||
}
|
||||
path.delete()
|
||||
}
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
super.beforeAll()
|
||||
cacheDir.foreach(Files.createDirectories(_))
|
||||
|
||||
// create some dir
|
||||
Files.createDirectories(block1.toPath)
|
||||
// hash sub dir
|
||||
Files.createDirectory(subDir1.toPath)
|
||||
data11.createNewFile()
|
||||
data11.setLastModified(System.currentTimeMillis() - 10)
|
||||
data12.createNewFile()
|
||||
Files.write(data12.toPath, "111".getBytes())
|
||||
data12.setLastModified(System.currentTimeMillis() - 10000000)
|
||||
|
||||
Files.createDirectories(block2.toPath)
|
||||
Files.createDirectory(subDir2.toPath)
|
||||
subDir2.setLastModified(System.currentTimeMillis() - 10000000)
|
||||
data21.createNewFile()
|
||||
data21.setLastModified(System.currentTimeMillis() - 10000000)
|
||||
}
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
deleteRecursive(block1)
|
||||
deleteRecursive(block2)
|
||||
|
||||
super.afterAll()
|
||||
}
|
||||
|
||||
private def updateEnv(name: String, value: String): Unit = {
|
||||
val env = System.getenv
|
||||
val field = env.getClass.getDeclaredField("m")
|
||||
field.setAccessible(true)
|
||||
field.get(env).asInstanceOf[java.util.Map[String, String]].put(name, value)
|
||||
}
|
||||
|
||||
test("test clean") {
|
||||
updateEnv(CACHE_DIRS_KEY, cacheDir.mkString(","))
|
||||
updateEnv(FILE_EXPIRED_TIME_KEY, "600")
|
||||
updateEnv(SCHEDULE_INTERVAL, "1")
|
||||
updateEnv("kyuubi.testing", "true")
|
||||
|
||||
KubernetesSparkBlockCleaner.main(Array.empty)
|
||||
|
||||
assert(block1.exists())
|
||||
assert(subDir1.exists())
|
||||
assert(data11.exists())
|
||||
assert(!data12.exists())
|
||||
|
||||
assert(block2.exists())
|
||||
assert(!subDir2.exists())
|
||||
assert(!data21.exists())
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user