[KYUUBI #768] [K8s] [Tool] tools shuffle-files-cleaner

### _Why are the changes needed?_
Add Kyuubi tools cache-file-cleaner on kubernetes.
This tools help to start daemonSet on Kubernetes to clean shuffle file and spark cache file.
By config some envs, cache-file-cleaner will help to check some old file(which depending on configuration) and delete them for free dist.
And after first cleaning, if the free capacity lower than threshold, cache file cleaner will start deep clean to help free dist.
With the cache-file-cleaner, add Dockerfile, entrypoint and daemonSet.yaml to help user to use this tool.
Using threadPool to help delete shuffle files.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request

Closes #768 from zwangsheng/shuffle-cleaner.

Closes #768

3e3d5c65 [Binjie Yang] tini
2f7eab6e [Binjie Yang] dist
d19b9d0c [Binjie Yang] dist
3c8b7828 [Binjie Yang] dist
21bf5b4d [Binjie Yang] big change
4d803504 [Binjie Yang] rename to spark-block-cleaner & fix bug
ba6515d0 [Binjie Yang] do some change
1084feaa [Binjie Yang] delete empty dir
ada682da [Binjie Yang] big change
aabbf9ba [Binjie Yang] bugfix
a504448a [Binjie Yang] big change
0e0d5811 [Binjie Yang] modify
262fe314 [Binjie Yang] pom
ad9c533c [Binjie Yang] dist
0364f6e9 [Binjie Yang] pom
6fbb5cae [Binjie Yang] dist
797bcc39 [Binjie Yang] shuffle cleaner

Authored-by: Binjie Yang <2213335496@qq.com>
Signed-off-by: ulysses-you <ulyssesyou18@gmail.com>
This commit is contained in:
Binjie Yang 2021-07-12 15:56:10 +08:00 committed by ulysses-you
parent 622783b4d1
commit 5e53748bb5
8 changed files with 425 additions and 0 deletions

View File

@ -179,6 +179,7 @@ mkdir -p "$DISTDIR/pid"
mkdir -p "$DISTDIR/logs"
mkdir -p "$DISTDIR/work"
mkdir -p "$DISTDIR/externals/engines/spark"
mkdir -p "$DISTDIR/tools/kubernetes/docker/spark-block-cleaner/jars"
echo "Kyuubi $VERSION $GITREVSTRING built for" > "$DISTDIR/RELEASE"
echo "Java $JAVA_VERSION" >> "$DISTDIR/RELEASE"
echo "Scala $SCALA_VERSION" >> "$DISTDIR/RELEASE"
@ -193,6 +194,11 @@ cp -r "$KYUUBI_HOME/kyuubi-assembly/target/scala-$SCALA_VERSION/jars/" "$DISTDIR
# Copy spark engines
cp "$KYUUBI_HOME/externals/kyuubi-spark-sql-engine/target/kyuubi-spark-sql-engine-$VERSION.jar" "$DISTDIR/externals/engines/spark"
# Copy kyuubi tools
cp -r "$KYUUBI_HOME/tools/kubernetes/docker/" "$DISTDIR/tools/kubernetes"
cp -r "$KYUUBI_HOME/kyuubi-assembly/target/scala-$SCALA_VERSION/jars/" "$DISTDIR/tools/kubernetes/docker/spark-block-cleaner/"
cp "$KYUUBI_HOME/tools/kubernetes/spark-block-cleaner/target/spark-block-cleaner-$VERSION.jar" "$DISTDIR/tools/kubernetes/docker/spark-block-cleaner/jars"
# Copy Kyuubi extension
SPARK_MID_VERSION=${SPARK_VERSION%.*}
if [[ -f $"$KYUUBI_HOME/dev/kyuubi-extension-spark_$SPARK_MID_VERSION/target/kyuubi-extension-spark_$SPARK_MID_VERSION-$VERSION.jar" ]]; then

View File

@ -37,6 +37,7 @@
<module>kyuubi-main</module>
<module>kyuubi-metrics</module>
<module>kyuubi-zookeeper</module>
<module>tools/kubernetes/spark-block-cleaner</module>
</modules>
<packaging>pom</packaging>

View File

@ -0,0 +1,32 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
FROM openjdk:8-jre-slim
RUN apt-get update && \
apt install -y tini && \
mkdir /data && \
mkdir -p /opt/block-cleaner && \
mkdir -p /log/cleanerLog
COPY jars /opt/block-cleaner
COPY entrypoint.sh /opt/entrypoint.sh
RUN chmod +x /opt/entrypoint.sh
ENV CLEANER_CLASSPATH /opt/block-cleaner/*
ENTRYPOINT ["/opt/entrypoint.sh"]

View File

@ -0,0 +1,24 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# entrypoint for spark-block-cleaner
# shellcheck disable=SC2046
exec /usr/bin/tini -s -- java -cp "${CLASS_PATH}:${CLEANER_CLASSPATH}" \
org.apache.kyuubi.tools.KubernetesSparkBlockCleaner \
| tee /log/cleanerLog/cleaner$(date "+%Y%m%d%H%M%S").out

View File

@ -0,0 +1,75 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
apiVersion: apps/v1
# A DaemonSet ensures that all (or some) Nodes run a copy of a Pod.
kind: DaemonSet
metadata:
name: kyuubi-kubernetes-spark-block-cleaner
# NameSpace help assigned daemonSet to the designated cluster resource
namespace: default
spec:
selector:
matchLabels:
name: block-cleaner
template:
metadata:
labels:
name: block-cleaner
spec:
containers:
# Container image which build by ./Dockerfile
# TODO official Image
- image: <image>
name: cleaner
volumeMounts:
- name: block-files-dir-1
mountPath: /data/data1
- name: block-files-dir-2
mountPath: /data/data2
- name: cleaner-log
mountPath: /log/cleanerLog
env:
# Set env to manager cleaner running
# the target dirs which in container
- name: CACHE_DIRS
value: /data/data1,/data/data2
# Cleaner will clean More distant block files
- name: FILE_EXPIRED_TIME
value: 604800000
# Deep clean fileExpiredTime
- name: DEEP_CLEAN_FILE_EXPIRED_TIME
value: 432000000
# After first clean, if free Space low than threshold
# trigger deep clean
- name: FREE_SPACE_THRESHOLD
value: 60
# Cleaner clean sleep times after cleaning
- name: SLEEP_TIME
value: 3600000
volumes:
# Directory on the host which store block dirs
- name: block-files-dir-1
hostPath:
path: /blockFilesDirs/data1
- name: block-files-dir-2
hostPath:
path: /blockFilesDirs/data2
# Directory on the host which you want to store clean log
- name: cleaner-log
hostPath:
path: /logDir

View File

@ -0,0 +1,55 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>kyuubi</artifactId>
<groupId>org.apache.kyuubi</groupId>
<version>1.3.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spark-block-cleaner</artifactId>
<name>Kyuubi Tool Kubernetes Spark Block Cleaner</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.tools
object Constants {
val CACHE_DIRS_KEY = "CACHE_DIRS"
val FILE_EXPIRED_TIME_KEY = "FILE_EXPIRED_TIME"
val FREE_SPACE_THRESHOLD_KEY = "FREE_SPACE_THRESHOLD"
val SLEEP_TIME_KEY = "SLEEP_TIME"
val DEEP_CLEAN_FILE_EXPIRED_TIME_KEY = "DEEP_CLEAN_FILE_EXPIRED_TIME"
}

View File

@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.tools
import java.io.File
import java.nio.file.{Files, Paths}
import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
import org.apache.kyuubi.Logging
import org.apache.kyuubi.tools.Constants._
/*
* Spark storage shuffle data as the following structure.
*
* local-dir1/
* blockmgr-uuid/
* hash-sub-dir/
* shuffle-data
* shuffle-index
*
* local-dir2/
* blockmgr-uuid/
* hash-sub-dir/
* shuffle-data
* shuffle-index
*
* ...
*/
object KubernetesSparkBlockCleaner extends Logging {
private val envMap = System.getenv()
private val freeSpaceThreshold = envMap.getOrDefault(FREE_SPACE_THRESHOLD_KEY,
"60").toInt
private val fileExpiredTime = envMap.getOrDefault(FILE_EXPIRED_TIME_KEY,
"604800000").toLong
private val sleepTime = envMap.getOrDefault(SLEEP_TIME_KEY,
"3600000").toLong
private val deepCleanFileExpiredTime = envMap.getOrDefault(DEEP_CLEAN_FILE_EXPIRED_TIME_KEY,
"432000000").toLong
private val cacheDirs = if (envMap.containsKey(CACHE_DIRS_KEY)) {
envMap.get(CACHE_DIRS_KEY).split(",").filter(!_.equals(""))
} else {
throw new IllegalArgumentException(s"the env ${CACHE_DIRS_KEY} can not be null")
}
def doClean(dir: File, time: Long) {
info(s"start clean ${dir.getName} with fileExpiredTime ${time}")
// clean blockManager shuffle file
dir.listFiles.filter(_.isDirectory).filter(_.getName.startsWith("blockmgr"))
.foreach(blockManagerDir => {
info(s"start check blockManager dir ${blockManagerDir.getName}")
// check blockManager directory
blockManagerDir.listFiles.filter(_.isDirectory).foreach(subDir => {
info(s"start check sub dir ${subDir.getName}")
// check sub directory
subDir.listFiles.foreach(file => checkAndDeleteFIle(file, time))
// delete empty sub directory
checkAndDeleteDir(subDir)
})
// delete empty blockManager directory
checkAndDeleteDir(blockManagerDir)
})
// clean spark cache file
dir.listFiles.filter(_.isDirectory).filter(_.getName.startsWith("spark"))
.foreach(cacheDir => {
info(s"start check cache dir ${cacheDir.getName}")
cacheDir.listFiles.foreach(file => checkAndDeleteFIle(file, time))
// delete empty spark cache file
checkAndDeleteDir(cacheDir)
})
}
def checkAndDeleteFIle(file: File, time: Long): Unit = {
info(s"check file ${file.getName}")
if (System.currentTimeMillis() - file.lastModified() > time) {
if (file.delete()) {
info(s"delete file ${file.getName} success")
} else {
warn(s"delete file ${file.getName} fail")
}
}
}
def checkAndDeleteDir(dir: File): Unit = {
if (dir.listFiles.isEmpty) {
if (dir.delete()) {
info(s"delete dir ${dir.getName} success")
} else {
warn(s"delete dir ${dir.getName} fail")
}
}
}
import scala.sys.process._
def checkUsedCapacity(dir: String): Boolean = {
val used = (s"df ${dir}" #| s"grep ${dir}").!!
.split(" ").filter(_.endsWith("%")) {
0
}.replace("%", "")
info(s"${dir} now used ${used}% space")
used.toInt > (100 - freeSpaceThreshold)
}
def initializeConfiguration(): Unit = {
if (fileExpiredTime < 0) {
throw new IllegalArgumentException(s"the env ${FILE_EXPIRED_TIME_KEY} " +
s"should be greater than 0")
}
if (deepCleanFileExpiredTime < 0) {
throw new IllegalArgumentException(s"the env ${DEEP_CLEAN_FILE_EXPIRED_TIME_KEY} " +
s"should be greater than 0")
}
if (sleepTime < 0) {
throw new IllegalArgumentException(s"the env ${SLEEP_TIME_KEY} " +
s"should be greater than 0")
}
if (freeSpaceThreshold < 0 || freeSpaceThreshold > 100) {
throw new IllegalArgumentException(s"the env ${FREE_SPACE_THRESHOLD_KEY} " +
s"should between 0 and 100")
}
info(s"finish initializing configuration, " +
s"use ${CACHE_DIRS_KEY}: ${cacheDirs.mkString(",")}, " +
s"${FILE_EXPIRED_TIME_KEY}: ${fileExpiredTime}, " +
s"${FREE_SPACE_THRESHOLD_KEY}: ${freeSpaceThreshold}, " +
s"${SLEEP_TIME_KEY}: ${sleepTime}, " +
s"${DEEP_CLEAN_FILE_EXPIRED_TIME_KEY}: ${deepCleanFileExpiredTime}")
}
def initializeThreadPool(): ThreadPoolExecutor = {
new ThreadPoolExecutor(cacheDirs.length,
cacheDirs.length * 2,
0,
TimeUnit.SECONDS,
new LinkedBlockingQueue[Runnable]())
}
def main(args: Array[String]): Unit = {
initializeConfiguration()
val threadPool = initializeThreadPool()
try {
while (true) {
info("start clean job")
cacheDirs.foreach(pathStr => {
val path = Paths.get(pathStr)
if (!Files.exists(path)) {
throw new IllegalArgumentException(s"this path ${pathStr} does not exists")
}
if (!Files.isDirectory(path)) {
throw new IllegalArgumentException(s"this path ${pathStr} is not directory")
}
// Clean up files older than $fileExpiredTime
threadPool.execute(() => {
doClean(path.toFile, fileExpiredTime)
})
if (checkUsedCapacity(pathStr)) {
info("start deep clean job")
// Clean up files older than $deepCleanFileExpiredTime
threadPool.execute(() => {
doClean(path.toFile, deepCleanFileExpiredTime)
})
if (checkUsedCapacity(pathStr)) {
warn(s"after deep clean ${pathStr} " +
s"used space still higher than ${freeSpaceThreshold}")
}
}
})
// Once $sleepTime
Thread.sleep(sleepTime)
}
} catch {
case exception: Exception => throw exception
} finally {
if (threadPool != null) {
threadPool.shutdown()
}
}
}
}