From bd42d8a2f8b46852bf9bfc30032600f8a680102a Mon Sep 17 00:00:00 2001 From: ulysses Date: Wed, 14 Jul 2021 11:42:06 +0800 Subject: [PATCH] Refactor Kyuubi kubernetes block cleaner (#794) Refactor Kyuubi kubernetes block cleaner --- build/dist | 10 +- pom.xml | 8 +- .../org/apache/kyuubi/tools/Constants.scala | 26 --- .../tools/KubernetesSparkBlockCleaner.scala | 206 ------------------ .../kubernetes/docker}/Dockerfile | 3 +- .../kubernetes/docker}/entrypoint.sh | 0 .../kubernetes}/spark-block-cleaner.yml | 14 +- .../spark-block-cleaner/pom.xml | 10 +- .../tools/KubernetesSparkBlockCleaner.scala | 206 ++++++++++++++++++ .../KubernetesSparkBlockCleanerSuite.scala | 109 +++++++++ 10 files changed, 344 insertions(+), 248 deletions(-) delete mode 100644 tools/kubernetes/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/Constants.scala delete mode 100644 tools/kubernetes/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/KubernetesSparkBlockCleaner.scala rename tools/{kubernetes/docker/spark-block-cleaner => spark-block-cleaner/kubernetes/docker}/Dockerfile (88%) rename tools/{kubernetes/docker/spark-block-cleaner => spark-block-cleaner/kubernetes/docker}/entrypoint.sh (100%) rename tools/{kubernetes/docker/spark-block-cleaner => spark-block-cleaner/kubernetes}/spark-block-cleaner.yml (88%) rename tools/{kubernetes => }/spark-block-cleaner/pom.xml (87%) create mode 100644 tools/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/KubernetesSparkBlockCleaner.scala create mode 100644 tools/spark-block-cleaner/src/test/scala/org.apache.kyuubi.tools/KubernetesSparkBlockCleanerSuite.scala diff --git a/build/dist b/build/dist index 216914ec2..03d54ca39 100755 --- a/build/dist +++ b/build/dist @@ -179,7 +179,6 @@ mkdir -p "$DISTDIR/pid" mkdir -p "$DISTDIR/logs" mkdir -p "$DISTDIR/work" mkdir -p "$DISTDIR/externals/engines/spark" -mkdir -p "$DISTDIR/tools/kubernetes/docker/spark-block-cleaner/jars" echo "Kyuubi $VERSION $GITREVSTRING built for" > "$DISTDIR/RELEASE" echo "Java $JAVA_VERSION" >> "$DISTDIR/RELEASE" echo "Scala $SCALA_VERSION" >> "$DISTDIR/RELEASE" @@ -195,9 +194,12 @@ cp -r "$KYUUBI_HOME/kyuubi-assembly/target/scala-$SCALA_VERSION/jars/" "$DISTDIR cp "$KYUUBI_HOME/externals/kyuubi-spark-sql-engine/target/kyuubi-spark-sql-engine-$VERSION.jar" "$DISTDIR/externals/engines/spark" # Copy kyuubi tools -cp -r "$KYUUBI_HOME/tools/kubernetes/docker/" "$DISTDIR/tools/kubernetes" -cp -r "$KYUUBI_HOME/kyuubi-assembly/target/scala-$SCALA_VERSION/jars/" "$DISTDIR/tools/kubernetes/docker/spark-block-cleaner/" -cp "$KYUUBI_HOME/tools/kubernetes/spark-block-cleaner/target/spark-block-cleaner-$VERSION.jar" "$DISTDIR/tools/kubernetes/docker/spark-block-cleaner/jars" +if [[ -f "$KYUUBI_HOME/tools/spark-block-cleaner/target/spark-block-cleaner-$VERSION.jar" ]]; then + mkdir -p "$DISTDIR/tools/spark-block-cleaner/kubernetes/docker" + mkdir -p "$DISTDIR/tools/spark-block-cleaner/jars" + cp -r "$KYUUBI_HOME/tools/spark-block-cleaner/kubernetes/" "$DISTDIR/tools/spark-block-cleaner/kubernetes/" + cp "$KYUUBI_HOME/tools/spark-block-cleaner/target/spark-block-cleaner-$VERSION.jar" "$DISTDIR/tools/spark-block-cleaner/jars/" +fi # Copy Kyuubi extension SPARK_MID_VERSION=${SPARK_VERSION%.*} diff --git a/pom.xml b/pom.xml index 107e14ac0..823e9df98 100644 --- a/pom.xml +++ b/pom.xml @@ -37,7 +37,6 @@ kyuubi-main kyuubi-metrics kyuubi-zookeeper - tools/kubernetes/spark-block-cleaner pom @@ -1694,5 +1693,12 @@ dev/kyuubi-extension-spark_3.1 + + + spark-block-cleaner + + tools/spark-block-cleaner + + diff --git a/tools/kubernetes/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/Constants.scala b/tools/kubernetes/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/Constants.scala deleted file mode 100644 index 0432969ef..000000000 --- a/tools/kubernetes/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/Constants.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kyuubi.tools - -object Constants { - val CACHE_DIRS_KEY = "CACHE_DIRS" - val FILE_EXPIRED_TIME_KEY = "FILE_EXPIRED_TIME" - val FREE_SPACE_THRESHOLD_KEY = "FREE_SPACE_THRESHOLD" - val SLEEP_TIME_KEY = "SLEEP_TIME" - val DEEP_CLEAN_FILE_EXPIRED_TIME_KEY = "DEEP_CLEAN_FILE_EXPIRED_TIME" -} diff --git a/tools/kubernetes/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/KubernetesSparkBlockCleaner.scala b/tools/kubernetes/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/KubernetesSparkBlockCleaner.scala deleted file mode 100644 index 642070041..000000000 --- a/tools/kubernetes/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/KubernetesSparkBlockCleaner.scala +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kyuubi.tools - -import java.io.File -import java.nio.file.{Files, Paths} -import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit} - -import org.apache.kyuubi.Logging -import org.apache.kyuubi.tools.Constants._ -/* -* Spark storage shuffle data as the following structure. -* -* local-dir1/ -* blockmgr-uuid/ -* hash-sub-dir/ -* shuffle-data -* shuffle-index -* -* local-dir2/ -* blockmgr-uuid/ -* hash-sub-dir/ -* shuffle-data -* shuffle-index -* -* ... -*/ -object KubernetesSparkBlockCleaner extends Logging { - private val envMap = System.getenv() - - private val freeSpaceThreshold = envMap.getOrDefault(FREE_SPACE_THRESHOLD_KEY, - "60").toInt - private val fileExpiredTime = envMap.getOrDefault(FILE_EXPIRED_TIME_KEY, - "604800000").toLong - private val sleepTime = envMap.getOrDefault(SLEEP_TIME_KEY, - "3600000").toLong - private val deepCleanFileExpiredTime = envMap.getOrDefault(DEEP_CLEAN_FILE_EXPIRED_TIME_KEY, - "432000000").toLong - private val cacheDirs = if (envMap.containsKey(CACHE_DIRS_KEY)) { - envMap.get(CACHE_DIRS_KEY).split(",").filter(!_.equals("")) - } else { - throw new IllegalArgumentException(s"the env ${CACHE_DIRS_KEY} can not be null") - } - - def doClean(dir: File, time: Long) { - info(s"start clean ${dir.getName} with fileExpiredTime ${time}") - - // clean blockManager shuffle file - dir.listFiles.filter(_.isDirectory).filter(_.getName.startsWith("blockmgr")) - .foreach(blockManagerDir => { - - info(s"start check blockManager dir ${blockManagerDir.getName}") - // check blockManager directory - blockManagerDir.listFiles.filter(_.isDirectory).foreach(subDir => { - - info(s"start check sub dir ${subDir.getName}") - // check sub directory - subDir.listFiles.foreach(file => checkAndDeleteFIle(file, time)) - // delete empty sub directory - checkAndDeleteDir(subDir) - }) - // delete empty blockManager directory - checkAndDeleteDir(blockManagerDir) - }) - - // clean spark cache file - dir.listFiles.filter(_.isDirectory).filter(_.getName.startsWith("spark")) - .foreach(cacheDir => { - info(s"start check cache dir ${cacheDir.getName}") - cacheDir.listFiles.foreach(file => checkAndDeleteFIle(file, time)) - // delete empty spark cache file - checkAndDeleteDir(cacheDir) - }) - } - - def checkAndDeleteFIle(file: File, time: Long): Unit = { - info(s"check file ${file.getName}") - if (System.currentTimeMillis() - file.lastModified() > time) { - if (file.delete()) { - info(s"delete file ${file.getName} success") - } else { - warn(s"delete file ${file.getName} fail") - } - } - } - - def checkAndDeleteDir(dir: File): Unit = { - if (dir.listFiles.isEmpty) { - if (dir.delete()) { - info(s"delete dir ${dir.getName} success") - } else { - warn(s"delete dir ${dir.getName} fail") - } - } - } - - import scala.sys.process._ - - def checkUsedCapacity(dir: String): Boolean = { - val used = (s"df ${dir}" #| s"grep ${dir}").!! - .split(" ").filter(_.endsWith("%")) { - 0 - }.replace("%", "") - info(s"${dir} now used ${used}% space") - - used.toInt > (100 - freeSpaceThreshold) - } - - def initializeConfiguration(): Unit = { - if (fileExpiredTime < 0) { - throw new IllegalArgumentException(s"the env ${FILE_EXPIRED_TIME_KEY} " + - s"should be greater than 0") - } - - if (deepCleanFileExpiredTime < 0) { - throw new IllegalArgumentException(s"the env ${DEEP_CLEAN_FILE_EXPIRED_TIME_KEY} " + - s"should be greater than 0") - } - - if (sleepTime < 0) { - throw new IllegalArgumentException(s"the env ${SLEEP_TIME_KEY} " + - s"should be greater than 0") - } - - if (freeSpaceThreshold < 0 || freeSpaceThreshold > 100) { - throw new IllegalArgumentException(s"the env ${FREE_SPACE_THRESHOLD_KEY} " + - s"should between 0 and 100") - } - - info(s"finish initializing configuration, " + - s"use ${CACHE_DIRS_KEY}: ${cacheDirs.mkString(",")}, " + - s"${FILE_EXPIRED_TIME_KEY}: ${fileExpiredTime}, " + - s"${FREE_SPACE_THRESHOLD_KEY}: ${freeSpaceThreshold}, " + - s"${SLEEP_TIME_KEY}: ${sleepTime}, " + - s"${DEEP_CLEAN_FILE_EXPIRED_TIME_KEY}: ${deepCleanFileExpiredTime}") - } - - def initializeThreadPool(): ThreadPoolExecutor = { - new ThreadPoolExecutor(cacheDirs.length, - cacheDirs.length * 2, - 0, - TimeUnit.SECONDS, - new LinkedBlockingQueue[Runnable]()) - } - - def main(args: Array[String]): Unit = { - initializeConfiguration() - val threadPool = initializeThreadPool() - try { - while (true) { - info("start clean job") - cacheDirs.foreach(pathStr => { - val path = Paths.get(pathStr) - - if (!Files.exists(path)) { - throw new IllegalArgumentException(s"this path ${pathStr} does not exists") - } - - if (!Files.isDirectory(path)) { - throw new IllegalArgumentException(s"this path ${pathStr} is not directory") - } - - // Clean up files older than $fileExpiredTime - threadPool.execute(() => { - doClean(path.toFile, fileExpiredTime) - }) - - if (checkUsedCapacity(pathStr)) { - info("start deep clean job") - // Clean up files older than $deepCleanFileExpiredTime - threadPool.execute(() => { - doClean(path.toFile, deepCleanFileExpiredTime) - }) - if (checkUsedCapacity(pathStr)) { - warn(s"after deep clean ${pathStr} " + - s"used space still higher than ${freeSpaceThreshold}") - } - } - }) - // Once $sleepTime - Thread.sleep(sleepTime) - } - } catch { - case exception: Exception => throw exception - } finally { - if (threadPool != null) { - threadPool.shutdown() - } - } - } -} diff --git a/tools/kubernetes/docker/spark-block-cleaner/Dockerfile b/tools/spark-block-cleaner/kubernetes/docker/Dockerfile similarity index 88% rename from tools/kubernetes/docker/spark-block-cleaner/Dockerfile rename to tools/spark-block-cleaner/kubernetes/docker/Dockerfile index 6ce054a08..d039dc355 100644 --- a/tools/kubernetes/docker/spark-block-cleaner/Dockerfile +++ b/tools/spark-block-cleaner/kubernetes/docker/Dockerfile @@ -23,7 +23,8 @@ RUN apt-get update && \ mkdir -p /log/cleanerLog COPY jars /opt/block-cleaner -COPY entrypoint.sh /opt/entrypoint.sh +COPY tools/spark-block-cleaner/jars /opt/block-cleaner +COPY tools/spark-block-cleaner/kubernetes/docker/entrypoint.sh /opt/entrypoint.sh RUN chmod +x /opt/entrypoint.sh diff --git a/tools/kubernetes/docker/spark-block-cleaner/entrypoint.sh b/tools/spark-block-cleaner/kubernetes/docker/entrypoint.sh similarity index 100% rename from tools/kubernetes/docker/spark-block-cleaner/entrypoint.sh rename to tools/spark-block-cleaner/kubernetes/docker/entrypoint.sh diff --git a/tools/kubernetes/docker/spark-block-cleaner/spark-block-cleaner.yml b/tools/spark-block-cleaner/kubernetes/spark-block-cleaner.yml similarity index 88% rename from tools/kubernetes/docker/spark-block-cleaner/spark-block-cleaner.yml rename to tools/spark-block-cleaner/kubernetes/spark-block-cleaner.yml index f3757c3b8..24d3cf617 100644 --- a/tools/kubernetes/docker/spark-block-cleaner/spark-block-cleaner.yml +++ b/tools/spark-block-cleaner/kubernetes/spark-block-cleaner.yml @@ -48,19 +48,19 @@ spec: # the target dirs which in container - name: CACHE_DIRS value: /data/data1,/data/data2 - # Cleaner will clean More distant block files + # Cleaner will clean More distant block files, seconds - name: FILE_EXPIRED_TIME - value: 604800000 - # Deep clean fileExpiredTime + value: 604800 + # Deep clean fileExpiredTime, seconds - name: DEEP_CLEAN_FILE_EXPIRED_TIME - value: 432000000 + value: 432000 # After first clean, if free Space low than threshold # trigger deep clean - name: FREE_SPACE_THRESHOLD value: 60 - # Cleaner clean sleep times after cleaning - - name: SLEEP_TIME - value: 3600000 + # Cleaner clean sleep times after cleaning, seconds + - name: SCHEDULE_INTERVAL + value: 3600 volumes: # Directory on the host which store block dirs - name: block-files-dir-1 diff --git a/tools/kubernetes/spark-block-cleaner/pom.xml b/tools/spark-block-cleaner/pom.xml similarity index 87% rename from tools/kubernetes/spark-block-cleaner/pom.xml rename to tools/spark-block-cleaner/pom.xml index e33cb4c70..7db8bda9c 100644 --- a/tools/kubernetes/spark-block-cleaner/pom.xml +++ b/tools/spark-block-cleaner/pom.xml @@ -22,16 +22,15 @@ kyuubi org.apache.kyuubi 1.3.0-SNAPSHOT - ../../../pom.xml + ../../pom.xml 4.0.0 spark-block-cleaner - Kyuubi Tool Kubernetes Spark Block Cleaner + Kyuubi Project Spark Block Cleaner jar - org.apache.kyuubi kyuubi-common @@ -46,6 +45,11 @@ test + + org.scalatest + scalatest_${scala.binary.version} + test + diff --git a/tools/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/KubernetesSparkBlockCleaner.scala b/tools/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/KubernetesSparkBlockCleaner.scala new file mode 100644 index 000000000..04b878737 --- /dev/null +++ b/tools/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/KubernetesSparkBlockCleaner.scala @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.tools + +import java.io.File +import java.nio.file.{Files, Paths} +import java.util.concurrent.{CountDownLatch, Executors} + +import org.apache.kyuubi.Logging + +/* +* Spark storage shuffle data as the following structure. +* +* local-dir1/ +* blockmgr-uuid/ +* hash-sub-dir/ +* shuffle-data +* shuffle-index +* +* local-dir2/ +* blockmgr-uuid/ +* hash-sub-dir/ +* shuffle-data +* shuffle-index +* +* ... +*/ +object KubernetesSparkBlockCleaner extends Logging { + import KubernetesSparkBlockCleanerConstants._ + + private val envMap = System.getenv() + + private val freeSpaceThreshold = envMap.getOrDefault(FREE_SPACE_THRESHOLD_KEY, + "60").toInt + private val fileExpiredTime = envMap.getOrDefault(FILE_EXPIRED_TIME_KEY, + "604800").toLong * 1000 + private val scheduleInterval = envMap.getOrDefault(SCHEDULE_INTERVAL, + "3600").toLong * 1000 + private val deepCleanFileExpiredTime = envMap.getOrDefault(DEEP_CLEAN_FILE_EXPIRED_TIME_KEY, + "432000").toLong * 1000 + private val cacheDirs = if (envMap.containsKey(CACHE_DIRS_KEY)) { + envMap.get(CACHE_DIRS_KEY).split(",").filter(!_.equals("")) + } else { + throw new IllegalArgumentException(s"the env $CACHE_DIRS_KEY must be set") + } + private val isTesting = envMap.getOrDefault("kyuubi.testing", "false").toBoolean + checkConfiguration() + + /** + * one thread clean one dir + */ + private val threadPool = Executors.newFixedThreadPool(cacheDirs.length) + + private def checkConfiguration(): Unit = { + require(fileExpiredTime > 0, + s"the env $FILE_EXPIRED_TIME_KEY should be greater than 0") + require(deepCleanFileExpiredTime > 0, + s"the env $DEEP_CLEAN_FILE_EXPIRED_TIME_KEY should be greater than 0") + require(scheduleInterval > 0, + s"the env $SCHEDULE_INTERVAL should be greater than 0") + require(freeSpaceThreshold > 0 && freeSpaceThreshold < 100, + s"the env $FREE_SPACE_THRESHOLD_KEY should between 0 and 100") + require(cacheDirs.nonEmpty, s"the env $CACHE_DIRS_KEY must be set") + cacheDirs.foreach { dir => + val path = Paths.get(dir) + require(Files.exists(path), + s"the input cache dir: $dir does not exists") + require(Files.isDirectory(path), + s"the input cache dir: $dir should be a directory") + } + + info(s"finish initializing configuration, " + + s"use $CACHE_DIRS_KEY: ${cacheDirs.mkString(",")}, " + + s"$FILE_EXPIRED_TIME_KEY: $fileExpiredTime, " + + s"$FREE_SPACE_THRESHOLD_KEY: $freeSpaceThreshold, " + + s"$SCHEDULE_INTERVAL: $scheduleInterval, " + + s"$DEEP_CLEAN_FILE_EXPIRED_TIME_KEY: $deepCleanFileExpiredTime") + } + + private def doClean(dir: File, time: Long) { + // clean blockManager shuffle file + dir.listFiles.filter(_.isDirectory).filter(_.getName.startsWith("blockmgr")) + .foreach { blockManagerDir => + info(s"start check blockManager dir ${blockManagerDir.getCanonicalPath}") + // check blockManager directory + val released = blockManagerDir.listFiles.filter(_.isDirectory).map { subDir => + debug(s"start check sub dir ${subDir.getCanonicalPath}") + // check sub directory + subDir.listFiles.map(file => checkAndDeleteFile(file, time)).sum + } + // delete empty blockManager directory and all empty sub directory + if (blockManagerDir.listFiles().forall( + subDir => subDir.isDirectory && subDir.listFiles().isEmpty)) { + blockManagerDir.listFiles().foreach(checkAndDeleteFile(_, time, true)) + checkAndDeleteFile(blockManagerDir, time, true) + } + info(s"finished clean blockManager dir ${blockManagerDir.getCanonicalPath}, " + + s"released space: ${released.sum / 1024 / 1024} MB.") + } + + // clean spark cache file + dir.listFiles.filter(_.isDirectory).filter(_.getName.startsWith("spark")) + .foreach { cacheDir => + info(s"start check cache dir ${cacheDir.getCanonicalPath}") + val released = cacheDir.listFiles.map(file => checkAndDeleteFile(file, time)) + // delete empty spark cache file + checkAndDeleteFile(cacheDir, time, true) + info(s"finished clean cache dir ${cacheDir.getCanonicalPath}, " + + s"released space: ${released.sum / 1024 / 1024} MB.") + } + } + + private def checkAndDeleteFile(file: File, time: Long, isDir: Boolean = false): Long = { + debug(s"check file ${file.getName}") + val shouldDeleteFile = if (isDir) { + file.listFiles.isEmpty && (System.currentTimeMillis() - file.lastModified() > time) + } else { + System.currentTimeMillis() - file.lastModified() > time + } + val length = if (isDir) 0 else file.length() + if (shouldDeleteFile) { + if (file.delete()) { + debug(s"delete file ${file.getAbsolutePath} success") + return length + } else { + warn(s"delete file ${file.getAbsolutePath} fail") + } + } + 0L + } + + import scala.sys.process._ + + private def needToDeepClean(dir: String): Boolean = { + val used = (s"df $dir" #| s"grep $dir").!! + .split(" ").filter(_.endsWith("%")) { + 0 + }.replace("%", "") + info(s"$dir now used $used% space") + + used.toInt > (100 - freeSpaceThreshold) + } + + private def doCleanJob(dir: String): Unit = { + val startTime = System.currentTimeMillis() + val path = Paths.get(dir) + info(s"start clean job for $dir") + doClean(path.toFile, fileExpiredTime) + // re check if the disk has enough space + if (needToDeepClean(dir)) { + info(s"start deep clean job for $dir") + doClean(path.toFile, deepCleanFileExpiredTime) + if (needToDeepClean(dir)) { + warn(s"after deep clean $dir, used space still higher than $freeSpaceThreshold") + } + } + val finishedTime = System.currentTimeMillis() + info(s"clean job $dir finished, elapsed time: ${(finishedTime - startTime) / 1000} s.") + } + + def main(args: Array[String]): Unit = { + do { + info(s"start all clean job") + val startTime = System.currentTimeMillis() + val hasFinished = new CountDownLatch(cacheDirs.length) + cacheDirs.foreach { dir => + threadPool.execute(() => { + doCleanJob(dir) + hasFinished.countDown() + }) + } + hasFinished.await() + + val usedTime = System.currentTimeMillis() - startTime + info(s"finished to clean all dir, elapsed time $usedTime") + if (usedTime > scheduleInterval) { + warn(s"clean job elapsed time $usedTime which is greater than $scheduleInterval") + } else { + Thread.sleep(scheduleInterval - usedTime) + } + } while (!isTesting) + } +} + +object KubernetesSparkBlockCleanerConstants { + val CACHE_DIRS_KEY = "CACHE_DIRS" + val FILE_EXPIRED_TIME_KEY = "FILE_EXPIRED_TIME" + val FREE_SPACE_THRESHOLD_KEY = "FREE_SPACE_THRESHOLD" + val SCHEDULE_INTERVAL = "SCHEDULE_INTERVAL" + val DEEP_CLEAN_FILE_EXPIRED_TIME_KEY = "DEEP_CLEAN_FILE_EXPIRED_TIME" +} diff --git a/tools/spark-block-cleaner/src/test/scala/org.apache.kyuubi.tools/KubernetesSparkBlockCleanerSuite.scala b/tools/spark-block-cleaner/src/test/scala/org.apache.kyuubi.tools/KubernetesSparkBlockCleanerSuite.scala new file mode 100644 index 000000000..dfaa1f412 --- /dev/null +++ b/tools/spark-block-cleaner/src/test/scala/org.apache.kyuubi.tools/KubernetesSparkBlockCleanerSuite.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.tools + +import java.io.File +import java.nio.file.Files +import java.util.UUID + +import org.apache.kyuubi.{KyuubiFunSuite, Utils} + +class KubernetesSparkBlockCleanerSuite extends KyuubiFunSuite { + import KubernetesSparkBlockCleanerConstants._ + + private val rootDir = Utils.createTempDir() + private val cacheDir = Seq("1", "2").map(rootDir.resolve) + private val block1 = new File(cacheDir.head.toFile, s"blockmgr-${UUID.randomUUID.toString}") + private val block2 = new File(cacheDir.head.toFile, s"blockmgr-${UUID.randomUUID.toString}") + + // do not remove + private val subDir1 = new File(block1, "01") + // do not remove + private val data11 = new File(subDir1, "shuffle_0_0_0") + // remove + private val data12 = new File(subDir1, "shuffle_0_0_1") + + // remove + private val subDir2 = new File(block2, "02") + // remove + private val data21 = new File(subDir1, "shuffle_0_1_0") + + private def deleteRecursive(path: File): Unit = { + path.listFiles.foreach { f => + if (f.isDirectory) { + deleteRecursive(f) + } else { + f.delete() + } + } + path.delete() + } + + override def beforeAll(): Unit = { + super.beforeAll() + cacheDir.foreach(Files.createDirectories(_)) + + // create some dir + Files.createDirectories(block1.toPath) + // hash sub dir + Files.createDirectory(subDir1.toPath) + data11.createNewFile() + data11.setLastModified(System.currentTimeMillis() - 10) + data12.createNewFile() + Files.write(data12.toPath, "111".getBytes()) + data12.setLastModified(System.currentTimeMillis() - 10000000) + + Files.createDirectories(block2.toPath) + Files.createDirectory(subDir2.toPath) + subDir2.setLastModified(System.currentTimeMillis() - 10000000) + data21.createNewFile() + data21.setLastModified(System.currentTimeMillis() - 10000000) + } + + override def afterAll(): Unit = { + deleteRecursive(block1) + deleteRecursive(block2) + + super.afterAll() + } + + private def updateEnv(name: String, value: String): Unit = { + val env = System.getenv + val field = env.getClass.getDeclaredField("m") + field.setAccessible(true) + field.get(env).asInstanceOf[java.util.Map[String, String]].put(name, value) + } + + test("test clean") { + updateEnv(CACHE_DIRS_KEY, cacheDir.mkString(",")) + updateEnv(FILE_EXPIRED_TIME_KEY, "600") + updateEnv(SCHEDULE_INTERVAL, "1") + updateEnv("kyuubi.testing", "true") + + KubernetesSparkBlockCleaner.main(Array.empty) + + assert(block1.exists()) + assert(subDir1.exists()) + assert(data11.exists()) + assert(!data12.exists()) + + assert(block2.exists()) + assert(!subDir2.exists()) + assert(!data21.exists()) + } +}