diff --git a/build/dist b/build/dist
index be22bbc76..216914ec2 100755
--- a/build/dist
+++ b/build/dist
@@ -179,6 +179,7 @@ mkdir -p "$DISTDIR/pid"
mkdir -p "$DISTDIR/logs"
mkdir -p "$DISTDIR/work"
mkdir -p "$DISTDIR/externals/engines/spark"
+mkdir -p "$DISTDIR/tools/kubernetes/docker/spark-block-cleaner/jars"
echo "Kyuubi $VERSION $GITREVSTRING built for" > "$DISTDIR/RELEASE"
echo "Java $JAVA_VERSION" >> "$DISTDIR/RELEASE"
echo "Scala $SCALA_VERSION" >> "$DISTDIR/RELEASE"
@@ -193,6 +194,11 @@ cp -r "$KYUUBI_HOME/kyuubi-assembly/target/scala-$SCALA_VERSION/jars/" "$DISTDIR
# Copy spark engines
cp "$KYUUBI_HOME/externals/kyuubi-spark-sql-engine/target/kyuubi-spark-sql-engine-$VERSION.jar" "$DISTDIR/externals/engines/spark"
+# Copy kyuubi tools
+cp -r "$KYUUBI_HOME/tools/kubernetes/docker/" "$DISTDIR/tools/kubernetes"
+cp -r "$KYUUBI_HOME/kyuubi-assembly/target/scala-$SCALA_VERSION/jars/" "$DISTDIR/tools/kubernetes/docker/spark-block-cleaner/"
+cp "$KYUUBI_HOME/tools/kubernetes/spark-block-cleaner/target/spark-block-cleaner-$VERSION.jar" "$DISTDIR/tools/kubernetes/docker/spark-block-cleaner/jars"
+
# Copy Kyuubi extension
SPARK_MID_VERSION=${SPARK_VERSION%.*}
if [[ -f $"$KYUUBI_HOME/dev/kyuubi-extension-spark_$SPARK_MID_VERSION/target/kyuubi-extension-spark_$SPARK_MID_VERSION-$VERSION.jar" ]]; then
diff --git a/pom.xml b/pom.xml
index ca6bad6e8..01e6b2bba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,6 +37,7 @@
kyuubi-main
kyuubi-metrics
kyuubi-zookeeper
+ tools/kubernetes/spark-block-cleaner
pom
diff --git a/tools/kubernetes/docker/spark-block-cleaner/Dockerfile b/tools/kubernetes/docker/spark-block-cleaner/Dockerfile
new file mode 100644
index 000000000..6ce054a08
--- /dev/null
+++ b/tools/kubernetes/docker/spark-block-cleaner/Dockerfile
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+FROM openjdk:8-jre-slim
+
+RUN apt-get update && \
+ apt install -y tini && \
+ mkdir /data && \
+ mkdir -p /opt/block-cleaner && \
+ mkdir -p /log/cleanerLog
+
+COPY jars /opt/block-cleaner
+COPY entrypoint.sh /opt/entrypoint.sh
+
+RUN chmod +x /opt/entrypoint.sh
+
+ENV CLEANER_CLASSPATH /opt/block-cleaner/*
+
+ENTRYPOINT ["/opt/entrypoint.sh"]
diff --git a/tools/kubernetes/docker/spark-block-cleaner/entrypoint.sh b/tools/kubernetes/docker/spark-block-cleaner/entrypoint.sh
new file mode 100644
index 000000000..63a6e7c9f
--- /dev/null
+++ b/tools/kubernetes/docker/spark-block-cleaner/entrypoint.sh
@@ -0,0 +1,24 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# entrypoint for spark-block-cleaner
+
+# shellcheck disable=SC2046
+exec /usr/bin/tini -s -- java -cp "${CLASS_PATH}:${CLEANER_CLASSPATH}" \
+ org.apache.kyuubi.tools.KubernetesSparkBlockCleaner \
+ | tee /log/cleanerLog/cleaner$(date "+%Y%m%d%H%M%S").out
diff --git a/tools/kubernetes/docker/spark-block-cleaner/spark-block-cleaner.yml b/tools/kubernetes/docker/spark-block-cleaner/spark-block-cleaner.yml
new file mode 100644
index 000000000..f3757c3b8
--- /dev/null
+++ b/tools/kubernetes/docker/spark-block-cleaner/spark-block-cleaner.yml
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+apiVersion: apps/v1
+# A DaemonSet ensures that all (or some) Nodes run a copy of a Pod.
+kind: DaemonSet
+metadata:
+ name: kyuubi-kubernetes-spark-block-cleaner
+ # NameSpace help assigned daemonSet to the designated cluster resource
+ namespace: default
+spec:
+ selector:
+ matchLabels:
+ name: block-cleaner
+ template:
+ metadata:
+ labels:
+ name: block-cleaner
+ spec:
+ containers:
+ # Container image which build by ./Dockerfile
+ # TODO official Image
+ - image:
+ name: cleaner
+ volumeMounts:
+ - name: block-files-dir-1
+ mountPath: /data/data1
+ - name: block-files-dir-2
+ mountPath: /data/data2
+ - name: cleaner-log
+ mountPath: /log/cleanerLog
+ env:
+ # Set env to manager cleaner running
+ # the target dirs which in container
+ - name: CACHE_DIRS
+ value: /data/data1,/data/data2
+ # Cleaner will clean More distant block files
+ - name: FILE_EXPIRED_TIME
+ value: 604800000
+ # Deep clean fileExpiredTime
+ - name: DEEP_CLEAN_FILE_EXPIRED_TIME
+ value: 432000000
+ # After first clean, if free Space low than threshold
+ # trigger deep clean
+ - name: FREE_SPACE_THRESHOLD
+ value: 60
+ # Cleaner clean sleep times after cleaning
+ - name: SLEEP_TIME
+ value: 3600000
+ volumes:
+ # Directory on the host which store block dirs
+ - name: block-files-dir-1
+ hostPath:
+ path: /blockFilesDirs/data1
+ - name: block-files-dir-2
+ hostPath:
+ path: /blockFilesDirs/data2
+ # Directory on the host which you want to store clean log
+ - name: cleaner-log
+ hostPath:
+ path: /logDir
diff --git a/tools/kubernetes/spark-block-cleaner/pom.xml b/tools/kubernetes/spark-block-cleaner/pom.xml
new file mode 100644
index 000000000..e33cb4c70
--- /dev/null
+++ b/tools/kubernetes/spark-block-cleaner/pom.xml
@@ -0,0 +1,55 @@
+
+
+
+
+ kyuubi
+ org.apache.kyuubi
+ 1.3.0-SNAPSHOT
+ ../../../pom.xml
+
+ 4.0.0
+
+ spark-block-cleaner
+ Kyuubi Tool Kubernetes Spark Block Cleaner
+ jar
+
+
+
+
+ org.apache.kyuubi
+ kyuubi-common
+ ${project.version}
+
+
+
+ org.apache.kyuubi
+ kyuubi-common
+ ${project.version}
+ test-jar
+ test
+
+
+
+
+
+ target/scala-${scala.binary.version}/classes
+ target/scala-${scala.binary.version}/test-classes
+
+
diff --git a/tools/kubernetes/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/Constants.scala b/tools/kubernetes/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/Constants.scala
new file mode 100644
index 000000000..0432969ef
--- /dev/null
+++ b/tools/kubernetes/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/Constants.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.tools
+
+object Constants {
+ val CACHE_DIRS_KEY = "CACHE_DIRS"
+ val FILE_EXPIRED_TIME_KEY = "FILE_EXPIRED_TIME"
+ val FREE_SPACE_THRESHOLD_KEY = "FREE_SPACE_THRESHOLD"
+ val SLEEP_TIME_KEY = "SLEEP_TIME"
+ val DEEP_CLEAN_FILE_EXPIRED_TIME_KEY = "DEEP_CLEAN_FILE_EXPIRED_TIME"
+}
diff --git a/tools/kubernetes/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/KubernetesSparkBlockCleaner.scala b/tools/kubernetes/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/KubernetesSparkBlockCleaner.scala
new file mode 100644
index 000000000..642070041
--- /dev/null
+++ b/tools/kubernetes/spark-block-cleaner/src/main/scala/org/apache/kyuubi/tools/KubernetesSparkBlockCleaner.scala
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.tools
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
+
+import org.apache.kyuubi.Logging
+import org.apache.kyuubi.tools.Constants._
+/*
+* Spark storage shuffle data as the following structure.
+*
+* local-dir1/
+* blockmgr-uuid/
+* hash-sub-dir/
+* shuffle-data
+* shuffle-index
+*
+* local-dir2/
+* blockmgr-uuid/
+* hash-sub-dir/
+* shuffle-data
+* shuffle-index
+*
+* ...
+*/
+object KubernetesSparkBlockCleaner extends Logging {
+ private val envMap = System.getenv()
+
+ private val freeSpaceThreshold = envMap.getOrDefault(FREE_SPACE_THRESHOLD_KEY,
+ "60").toInt
+ private val fileExpiredTime = envMap.getOrDefault(FILE_EXPIRED_TIME_KEY,
+ "604800000").toLong
+ private val sleepTime = envMap.getOrDefault(SLEEP_TIME_KEY,
+ "3600000").toLong
+ private val deepCleanFileExpiredTime = envMap.getOrDefault(DEEP_CLEAN_FILE_EXPIRED_TIME_KEY,
+ "432000000").toLong
+ private val cacheDirs = if (envMap.containsKey(CACHE_DIRS_KEY)) {
+ envMap.get(CACHE_DIRS_KEY).split(",").filter(!_.equals(""))
+ } else {
+ throw new IllegalArgumentException(s"the env ${CACHE_DIRS_KEY} can not be null")
+ }
+
+ def doClean(dir: File, time: Long) {
+ info(s"start clean ${dir.getName} with fileExpiredTime ${time}")
+
+ // clean blockManager shuffle file
+ dir.listFiles.filter(_.isDirectory).filter(_.getName.startsWith("blockmgr"))
+ .foreach(blockManagerDir => {
+
+ info(s"start check blockManager dir ${blockManagerDir.getName}")
+ // check blockManager directory
+ blockManagerDir.listFiles.filter(_.isDirectory).foreach(subDir => {
+
+ info(s"start check sub dir ${subDir.getName}")
+ // check sub directory
+ subDir.listFiles.foreach(file => checkAndDeleteFIle(file, time))
+ // delete empty sub directory
+ checkAndDeleteDir(subDir)
+ })
+ // delete empty blockManager directory
+ checkAndDeleteDir(blockManagerDir)
+ })
+
+ // clean spark cache file
+ dir.listFiles.filter(_.isDirectory).filter(_.getName.startsWith("spark"))
+ .foreach(cacheDir => {
+ info(s"start check cache dir ${cacheDir.getName}")
+ cacheDir.listFiles.foreach(file => checkAndDeleteFIle(file, time))
+ // delete empty spark cache file
+ checkAndDeleteDir(cacheDir)
+ })
+ }
+
+ def checkAndDeleteFIle(file: File, time: Long): Unit = {
+ info(s"check file ${file.getName}")
+ if (System.currentTimeMillis() - file.lastModified() > time) {
+ if (file.delete()) {
+ info(s"delete file ${file.getName} success")
+ } else {
+ warn(s"delete file ${file.getName} fail")
+ }
+ }
+ }
+
+ def checkAndDeleteDir(dir: File): Unit = {
+ if (dir.listFiles.isEmpty) {
+ if (dir.delete()) {
+ info(s"delete dir ${dir.getName} success")
+ } else {
+ warn(s"delete dir ${dir.getName} fail")
+ }
+ }
+ }
+
+ import scala.sys.process._
+
+ def checkUsedCapacity(dir: String): Boolean = {
+ val used = (s"df ${dir}" #| s"grep ${dir}").!!
+ .split(" ").filter(_.endsWith("%")) {
+ 0
+ }.replace("%", "")
+ info(s"${dir} now used ${used}% space")
+
+ used.toInt > (100 - freeSpaceThreshold)
+ }
+
+ def initializeConfiguration(): Unit = {
+ if (fileExpiredTime < 0) {
+ throw new IllegalArgumentException(s"the env ${FILE_EXPIRED_TIME_KEY} " +
+ s"should be greater than 0")
+ }
+
+ if (deepCleanFileExpiredTime < 0) {
+ throw new IllegalArgumentException(s"the env ${DEEP_CLEAN_FILE_EXPIRED_TIME_KEY} " +
+ s"should be greater than 0")
+ }
+
+ if (sleepTime < 0) {
+ throw new IllegalArgumentException(s"the env ${SLEEP_TIME_KEY} " +
+ s"should be greater than 0")
+ }
+
+ if (freeSpaceThreshold < 0 || freeSpaceThreshold > 100) {
+ throw new IllegalArgumentException(s"the env ${FREE_SPACE_THRESHOLD_KEY} " +
+ s"should between 0 and 100")
+ }
+
+ info(s"finish initializing configuration, " +
+ s"use ${CACHE_DIRS_KEY}: ${cacheDirs.mkString(",")}, " +
+ s"${FILE_EXPIRED_TIME_KEY}: ${fileExpiredTime}, " +
+ s"${FREE_SPACE_THRESHOLD_KEY}: ${freeSpaceThreshold}, " +
+ s"${SLEEP_TIME_KEY}: ${sleepTime}, " +
+ s"${DEEP_CLEAN_FILE_EXPIRED_TIME_KEY}: ${deepCleanFileExpiredTime}")
+ }
+
+ def initializeThreadPool(): ThreadPoolExecutor = {
+ new ThreadPoolExecutor(cacheDirs.length,
+ cacheDirs.length * 2,
+ 0,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue[Runnable]())
+ }
+
+ def main(args: Array[String]): Unit = {
+ initializeConfiguration()
+ val threadPool = initializeThreadPool()
+ try {
+ while (true) {
+ info("start clean job")
+ cacheDirs.foreach(pathStr => {
+ val path = Paths.get(pathStr)
+
+ if (!Files.exists(path)) {
+ throw new IllegalArgumentException(s"this path ${pathStr} does not exists")
+ }
+
+ if (!Files.isDirectory(path)) {
+ throw new IllegalArgumentException(s"this path ${pathStr} is not directory")
+ }
+
+ // Clean up files older than $fileExpiredTime
+ threadPool.execute(() => {
+ doClean(path.toFile, fileExpiredTime)
+ })
+
+ if (checkUsedCapacity(pathStr)) {
+ info("start deep clean job")
+ // Clean up files older than $deepCleanFileExpiredTime
+ threadPool.execute(() => {
+ doClean(path.toFile, deepCleanFileExpiredTime)
+ })
+ if (checkUsedCapacity(pathStr)) {
+ warn(s"after deep clean ${pathStr} " +
+ s"used space still higher than ${freeSpaceThreshold}")
+ }
+ }
+ })
+ // Once $sleepTime
+ Thread.sleep(sleepTime)
+ }
+ } catch {
+ case exception: Exception => throw exception
+ } finally {
+ if (threadPool != null) {
+ threadPool.shutdown()
+ }
+ }
+ }
+}