[KYUUBI #5941] Drop Kubernetes Block Cleaner Tool from Kyuubi

# 🔍 Description
## Issue References 🔗

This pull request fixes #5941

## Describe Your Solution 🔧

Originally aims to support Spark On Kubernetes Shuffle Data Clean, limitations became more and more apparent over time, so let's drop this.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [x] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #5942 from zwangsheng/KYUUBI#5941.

Closes #5941

23bf14f37 [Cheng Pan] Update docs/tools/spark_block_cleaner.md
1c3350186 [zwangsheng] fix comment
0bdbb1104 [zwangsheng] nit
0a5aa2bfa [zwangsheng] fix comments

Lead-authored-by: zwangsheng <binjieyang@apache.org>
Co-authored-by: Cheng Pan <pan3793@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
zwangsheng 2024-01-04 15:58:10 +08:00 committed by Cheng Pan
parent 9fefd47e1f
commit c6bba915b2
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
14 changed files with 8 additions and 688 deletions

6
.github/labeler.yml vendored
View File

@ -129,8 +129,7 @@
'.dockerignore', '.dockerignore',
'bin/docker-image-tool.sh', 'bin/docker-image-tool.sh',
'docker/**/*', 'docker/**/*',
'integration-tests/kyuubi-kubernetes-it/**/*', 'integration-tests/kyuubi-kubernetes-it/**/*'
'tools/spark-block-cleaner/**/*'
] ]
"module:metrics": "module:metrics":
@ -164,8 +163,7 @@
- changed-files: - changed-files:
- any-glob-to-any-file: [ - any-glob-to-any-file: [
'externals/kyuubi-spark-sql-engine/**/*', 'externals/kyuubi-spark-sql-engine/**/*',
'extensions/spark/**/*', 'extensions/spark/**/*'
'tools/spark-block-cleaner/**/*'
] ]
"module:extensions": "module:extensions":

View File

@ -44,7 +44,7 @@ jobs:
check-latest: false check-latest: false
- run: >- - run: >-
build/mvn org.apache.rat:apache-rat-plugin:check build/mvn org.apache.rat:apache-rat-plugin:check
-Ptpcds -Pspark-block-cleaner -Pkubernetes-it -Ptpcds -Pkubernetes-it
-Pspark-3.1 -Pspark-3.2 -Pspark-3.3 -Pspark-3.4 -Pspark-3.5 -Pspark-3.1 -Pspark-3.2 -Pspark-3.3 -Pspark-3.4 -Pspark-3.5
- name: Upload rat report - name: Upload rat report
if: failure() if: failure()

View File

@ -34,7 +34,7 @@ jobs:
strategy: strategy:
matrix: matrix:
profiles: profiles:
- '-Pflink-provided,hive-provided,spark-provided,spark-block-cleaner,spark-3.5,spark-3.4,spark-3.3,spark-3.2,tpcds,kubernetes-it' - '-Pflink-provided,hive-provided,spark-provided,spark-3.5,spark-3.4,spark-3.3,spark-3.2,tpcds,kubernetes-it'
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
@ -65,7 +65,7 @@ jobs:
if: steps.modules-check.conclusion == 'success' && steps.modules-check.outcome == 'failure' if: steps.modules-check.conclusion == 'success' && steps.modules-check.outcome == 'failure'
run: | run: |
MVN_OPT="-DskipTests -Dorg.slf4j.simpleLogger.defaultLogLevel=warn -Dmaven.javadoc.skip=true -Drat.skip=true -Dscalastyle.skip=true -Dspotless.check.skip" MVN_OPT="-DskipTests -Dorg.slf4j.simpleLogger.defaultLogLevel=warn -Dmaven.javadoc.skip=true -Drat.skip=true -Dscalastyle.skip=true -Dspotless.check.skip"
build/mvn clean install ${MVN_OPT} -Pflink-provided,hive-provided,spark-provided,spark-block-cleaner,spark-3.2,tpcds build/mvn clean install ${MVN_OPT} -Pflink-provided,hive-provided,spark-provided,spark-3.2,tpcds
build/mvn clean install ${MVN_OPT} -pl extensions/spark/kyuubi-extension-spark-3-1 -Pspark-3.1 build/mvn clean install ${MVN_OPT} -pl extensions/spark/kyuubi-extension-spark-3-1 -Pspark-3.1
build/mvn clean install ${MVN_OPT} -pl extensions/spark/kyuubi-extension-spark-3-3,extensions/spark/kyuubi-spark-connector-hive -Pspark-3.3 build/mvn clean install ${MVN_OPT} -pl extensions/spark/kyuubi-extension-spark-3-3,extensions/spark/kyuubi-spark-connector-hive -Pspark-3.3
build/mvn clean install ${MVN_OPT} -pl extensions/spark/kyuubi-extension-spark-3-4 -Pspark-3.4 build/mvn clean install ${MVN_OPT} -pl extensions/spark/kyuubi-extension-spark-3-4 -Pspark-3.4

View File

@ -330,14 +330,6 @@ for jar in $(ls "$DISTDIR/jars/"); do
fi fi
done done
# Copy kyuubi tools
if [[ -f "$KYUUBI_HOME/tools/spark-block-cleaner/target/spark-block-cleaner_${SCALA_VERSION}-${VERSION}.jar" ]]; then
mkdir -p "$DISTDIR/tools/spark-block-cleaner/kubernetes"
mkdir -p "$DISTDIR/tools/spark-block-cleaner/jars"
cp -r "$KYUUBI_HOME"/tools/spark-block-cleaner/kubernetes/* "$DISTDIR/tools/spark-block-cleaner/kubernetes/"
cp "$KYUUBI_HOME/tools/spark-block-cleaner/target/spark-block-cleaner_${SCALA_VERSION}-${VERSION}.jar" "$DISTDIR/tools/spark-block-cleaner/jars/"
fi
# Copy Kyuubi Spark extension # Copy Kyuubi Spark extension
SPARK_EXTENSION_VERSIONS=('3-1' '3-2' '3-3' '3-4' '3-5') SPARK_EXTENSION_VERSIONS=('3-1' '3-2' '3-3' '3-4' '3-5')
# shellcheck disable=SC2068 # shellcheck disable=SC2068

View File

@ -20,7 +20,7 @@ set -x
KYUUBI_HOME="$(cd "`dirname "$0"`/.."; pwd)" KYUUBI_HOME="$(cd "`dirname "$0"`/.."; pwd)"
PROFILES="-Pflink-provided,hive-provided,spark-provided,spark-block-cleaner,spark-3.5,spark-3.4,spark-3.3,spark-3.2,spark-3.1,tpcds,kubernetes-it" PROFILES="-Pflink-provided,hive-provided,spark-provided,spark-3.5,spark-3.4,spark-3.3,spark-3.2,spark-3.1,tpcds,kubernetes-it"
# python style checks rely on `black` in path # python style checks rely on `black` in path
if ! command -v black &> /dev/null if ! command -v black &> /dev/null

View File

@ -17,119 +17,5 @@
# Kubernetes Tools Spark Block Cleaner # Kubernetes Tools Spark Block Cleaner
## Requirements **Note**:
This tool has been removed since Kyuubi 1.9.0.
You'd better have cognition upon the following things when you want to use spark-block-cleaner.
* Read this article
* An active Kubernetes cluster
* [Kubectl](https://kubernetes.io/docs/reference/kubectl/overview/)
* [Docker](https://www.docker.com/)
## Scenes
When you're using Spark On Kubernetes with Client mode and don't use `emptyDir` for Spark `local-dir` type, you may face the same scenario that executor pods deleted without clean all the Block files. It may cause disk overflow.
Therefore, we chose to use Spark Block Cleaner to clear the block files accumulated by Spark.
## Principle
When deploying Spark Block Cleaner, we will configure volumes for the destination folder. Spark Block Cleaner will perceive the folder by the parameter `CACHE_DIRS`.
Spark Block Cleaner will clear the perceived folder in a fixed loop(which can be configured by `SCHEDULE_INTERVAL`). And Spark Block Cleaner will select folder start with `blockmgr` and `spark` for deletion using the logic Spark uses to create those folders.
Before deleting those files, Spark Block Cleaner will determine whether it is a recently modified file(depending on whether the file has not been acted on within the specified time which configured by `FILE_EXPIRED_TIME`). Only delete files those beyond that time interval.
And Spark Block Cleaner will check the disk utilization after clean, if the remaining space is less than the specified value(control by `FREE_SPACE_THRESHOLD`), will trigger deep clean(which file expired time control by `DEEP_CLEAN_FILE_EXPIRED_TIME`).
## Usage
Before you start using Spark Block Cleaner, you should build its docker images.
### Build Block Cleaner Docker Image
In the `KYUUBI_HOME` directory, you can use the following cmd to build docker image.
```shell
docker build ./tools/spark-block-cleaner/kubernetes/docker
```
### Modify spark-block-cleaner.yml
You need to modify the `${KYUUBI_HOME}/tools/spark-block-cleaner/kubernetes/spark-block-cleaner.yml` to fit your current environment.
In Kyuubi tools, we recommend using `DaemonSet` to start, and we offer default yaml file in daemonSet way.
Base file structure:
```yaml
apiVersion
kind
metadata
name
namespace
spec
select
template
metadata
spce
containers
- image
- volumeMounts
- env
volumes
```
You can use affect the performance of Spark Block Cleaner through configure parameters in containers env part of `spark-block-cleaner.yml`.
```yaml
env:
- name: CACHE_DIRS
value: /data/data1,/data/data2
- name: FILE_EXPIRED_TIME
value: 604800
- name: DEEP_CLEAN_FILE_EXPIRED_TIME
value: 432000
- name: FREE_SPACE_THRESHOLD
value: 60
- name: SCHEDULE_INTERVAL
value: 3600
```
The most important thing, configure volumeMounts and volumes corresponding to Spark local-dirs.
For example, Spark use /spark/shuffle1 as local-dir, you can configure like:
```yaml
volumes:
- name: block-files-dir-1
hostPath:
path: /spark/shuffle1
```
```yaml
volumeMounts:
- name: block-files-dir-1
mountPath: /data/data1
```
```yaml
env:
- name: CACHE_DIRS
value: /data/data1
```
### Start daemonSet
After you finishing modifying the above, you can use the following command `kubectl apply -f ${KYUUBI_HOME}/tools/spark-block-cleaner/kubernetes/spark-block-cleaner.yml` to start daemonSet.
## Related parameters
| Name | Default | unit | Meaning |
|------------------------------|-------------------------|---------|-----------------------------------------------------------------------------------------------------------------------|
| CACHE_DIRS | /data/data1,/data/data2 | | The target dirs in container path which will clean block files. |
| FILE_EXPIRED_TIME | 604800 | seconds | Cleaner will clean the block files which current time - last modified time more than the fileExpiredTime. |
| DEEP_CLEAN_FILE_EXPIRED_TIME | 432000 | seconds | Deep clean will clean the block files which current time - last modified time more than the deepCleanFileExpiredTime. |
| FREE_SPACE_THRESHOLD | 60 | % | After first clean, if free Space low than threshold trigger deep clean. |
| SCHEDULE_INTERVAL | 3600 | seconds | Cleaner sleep between cleaning. |

View File

@ -2376,13 +2376,6 @@
</properties> </properties>
</profile> </profile>
<profile>
<id>spark-block-cleaner</id>
<modules>
<module>tools/spark-block-cleaner</module>
</modules>
</profile>
<profile> <profile>
<id>spotless-python</id> <id>spotless-python</id>
<properties> <properties>

View File

@ -1,34 +0,0 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
FROM eclipse-temurin:8-jdk-focal
RUN apt-get update && \
apt install -y tini && \
rm -rf /var/cache/apt/* && \
mkdir /data && \
mkdir -p /opt/block-cleaner && \
mkdir -p /log/cleanerLog
COPY jars /opt/block-cleaner
COPY tools/spark-block-cleaner/jars /opt/block-cleaner
COPY tools/spark-block-cleaner/kubernetes/docker/entrypoint.sh /opt/entrypoint.sh
RUN chmod +x /opt/entrypoint.sh
ENV CLEANER_CLASSPATH /opt/block-cleaner/*
ENTRYPOINT ["/opt/entrypoint.sh"]

View File

@ -1,23 +0,0 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# entrypoint for spark-block-cleaner
# shellcheck disable=SC2046
exec /usr/bin/tini -s -- java -cp "${CLASS_PATH}:${CLEANER_CLASSPATH}" \
org.apache.kyuubi.tools.KubernetesSparkBlockCleaner

View File

@ -1,75 +0,0 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
apiVersion: apps/v1
# A DaemonSet ensures that all (or some) Nodes run a copy of a Pod.
kind: DaemonSet
metadata:
name: kyuubi-kubernetes-spark-block-cleaner
# NameSpace help assigned daemonSet to the designated cluster resource
namespace: default
spec:
selector:
matchLabels:
name: block-cleaner
template:
metadata:
labels:
name: block-cleaner
spec:
containers:
# Container image which build by Dockerfile
# TODO official Image
- image: <image>
name: cleaner
volumeMounts:
- name: block-files-dir-1
mountPath: /data/data1
- name: block-files-dir-2
mountPath: /data/data2
- name: cleaner-log
mountPath: /log/cleanerLog
env:
# Set env to manager cleaner running
# the target dirs which in container
- name: CACHE_DIRS
value: /data/data1,/data/data2
# Cleaner will clean More distant block files, seconds
- name: FILE_EXPIRED_TIME
value: 604800
# Deep clean fileExpiredTime, seconds
- name: DEEP_CLEAN_FILE_EXPIRED_TIME
value: 432000
# After first clean, if free Space low than threshold
# trigger deep clean
- name: FREE_SPACE_THRESHOLD
value: 60
# Cleaner clean sleep times after cleaning, seconds
- name: SCHEDULE_INTERVAL
value: 3600
volumes:
# Directory on the host which store block dirs
- name: block-files-dir-1
hostPath:
path: /blockFilesDirs/data1
- name: block-files-dir-2
hostPath:
path: /blockFilesDirs/data2
# Directory on the host which you want to store clean log
- name: cleaner-log
hostPath:
path: /logDir

View File

@ -1,53 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-parent</artifactId>
<version>1.9.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>spark-block-cleaner_${scala.binary.version}</artifactId>
<packaging>jar</packaging>
<name>Kyuubi Project Spark Block Cleaner</name>
<url>https://kyuubi.apache.org/</url>
<dependencies>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>

View File

@ -1,33 +0,0 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set everything to be logged to the console
log4j.rootCategory=INFO, console, logFile
### console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %d{yyyy} %p %c{2}: %m%n
### logFile
log4j.appender.logFile=org.apache.log4j.RollingFileAppender
log4j.appender.logFile.File=/logs/spark-block-cleaner-log/cleaner-log.out
log4j.appender.logFile.MaxFileSize=20MB
log4j.appender.logFile.MaxBackupIndex=5
log4j.appender.logFile.layout=org.apache.log4j.PatternLayout
log4j.appender.logFile.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %p %c{2}: %m%n

View File

@ -1,223 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.tools
import java.io.File
import java.nio.file.{Files, Paths}
import java.util.concurrent.{CountDownLatch, Executors}
import scala.util.control.NonFatal
import org.apache.log4j.PropertyConfigurator
import org.apache.kyuubi.Logging
/*
* Spark storage shuffle data as the following structure.
*
* local-dir1/
* blockmgr-uuid/
* hash-sub-dir/
* shuffle-data
* shuffle-index
*
* local-dir2/
* blockmgr-uuid/
* hash-sub-dir/
* shuffle-data
* shuffle-index
*
* ...
*/
object KubernetesSparkBlockCleaner extends Logging {
import KubernetesSparkBlockCleanerConstants._
private val envMap = System.getenv()
PropertyConfigurator.configure(
Thread.currentThread().getContextClassLoader.getResource("log4j-block-cleaner.properties"))
private val freeSpaceThreshold = envMap.getOrDefault(FREE_SPACE_THRESHOLD_KEY, "60").toInt
private val fileExpiredTime = envMap.getOrDefault(FILE_EXPIRED_TIME_KEY, "604800").toLong * 1000
private val scheduleInterval = envMap.getOrDefault(SCHEDULE_INTERVAL, "3600").toLong * 1000
private val deepCleanFileExpiredTime =
envMap.getOrDefault(DEEP_CLEAN_FILE_EXPIRED_TIME_KEY, "432000").toLong * 1000
private val cacheDirs =
if (envMap.containsKey(CACHE_DIRS_KEY)) {
envMap.get(CACHE_DIRS_KEY).split(",").filter(!_.equals(""))
} else {
throw new IllegalArgumentException(s"the env $CACHE_DIRS_KEY must be set")
}
private val isTesting = envMap.getOrDefault("kyuubi.testing", "false").toBoolean
checkConfiguration()
/**
* one thread clean one dir
*/
private val threadPool = Executors.newFixedThreadPool(cacheDirs.length)
private def checkConfiguration(): Unit = {
require(fileExpiredTime > 0, s"the env $FILE_EXPIRED_TIME_KEY should be greater than 0")
require(
deepCleanFileExpiredTime > 0,
s"the env $DEEP_CLEAN_FILE_EXPIRED_TIME_KEY should be greater than 0")
require(scheduleInterval > 0, s"the env $SCHEDULE_INTERVAL should be greater than 0")
require(
freeSpaceThreshold > 0 && freeSpaceThreshold < 100,
s"the env $FREE_SPACE_THRESHOLD_KEY should between 0 and 100")
require(cacheDirs.nonEmpty, s"the env $CACHE_DIRS_KEY must be set")
cacheDirs.foreach { dir =>
val path = Paths.get(dir)
require(Files.exists(path), s"the input cache dir: $dir does not exists")
require(Files.isDirectory(path), s"the input cache dir: $dir should be a directory")
}
info(s"finish initializing configuration, " +
s"use $CACHE_DIRS_KEY: ${cacheDirs.mkString(",")}, " +
s"$FILE_EXPIRED_TIME_KEY: $fileExpiredTime, " +
s"$FREE_SPACE_THRESHOLD_KEY: $freeSpaceThreshold, " +
s"$SCHEDULE_INTERVAL: $scheduleInterval, " +
s"$DEEP_CLEAN_FILE_EXPIRED_TIME_KEY: $deepCleanFileExpiredTime")
}
private def doClean(dir: File, time: Long) {
// clean blockManager shuffle file
dir.listFiles.filter(_.isDirectory).filter(_.getName.startsWith("blockmgr"))
.foreach { blockManagerDir =>
info(s"start check blockManager dir ${blockManagerDir.getCanonicalPath}")
// check blockManager directory
val released = blockManagerDir.listFiles.filter(_.isDirectory).map { subDir =>
debug(s"start check sub dir ${subDir.getCanonicalPath}")
// check sub directory
subDir.listFiles.map(file => checkAndDeleteFile(file, time)).sum
}
// delete empty blockManager directory and all empty sub directory
if (blockManagerDir.listFiles().forall(subDir =>
subDir.isDirectory && subDir.listFiles().isEmpty)) {
blockManagerDir.listFiles().foreach(checkAndDeleteFile(_, time, true))
checkAndDeleteFile(blockManagerDir, time, true)
}
info(s"finished clean blockManager dir ${blockManagerDir.getCanonicalPath}, " +
s"released space: ${released.sum / 1024 / 1024} MB")
}
// clean spark cache file
dir.listFiles.filter(_.isDirectory).filter(_.getName.startsWith("spark"))
.foreach { cacheDir =>
info(s"start check cache dir ${cacheDir.getCanonicalPath}")
val released = cacheDir.listFiles.map(file => checkAndDeleteFile(file, time))
// delete empty spark cache file
checkAndDeleteFile(cacheDir, time, true)
info(s"finished clean cache dir ${cacheDir.getCanonicalPath}, " +
s"released space: ${released.sum / 1024 / 1024} MB")
}
}
private def checkAndDeleteFile(file: File, time: Long, isDir: Boolean = false): Long = {
debug(s"check file ${file.getName}")
val shouldDeleteFile =
if (isDir) {
file.listFiles.isEmpty && (System.currentTimeMillis() - file.lastModified() > time)
} else {
System.currentTimeMillis() - file.lastModified() > time
}
val length = if (isDir) 0 else file.length()
if (shouldDeleteFile) {
if (file.delete()) {
debug(s"delete file ${file.getAbsolutePath} success")
return length
} else {
warn(s"delete file ${file.getAbsolutePath} fail")
}
}
0L
}
import scala.sys.process._
private def needToDeepClean(dir: String): Boolean = {
try {
val used = (s"df $dir" #| s"grep $dir").!!
.split(" ").filter(_.endsWith("%")) {
0
}.replace("%", "")
info(s"$dir now used $used% space")
used.toInt > (100 - freeSpaceThreshold)
} catch {
case NonFatal(e) =>
error(s"An error occurs when querying the disk $dir capacity, " +
s"return true to make sure the disk space will not overruns: ${e.getMessage}")
true
}
}
private def doCleanJob(dir: String): Unit = {
val startTime = System.currentTimeMillis()
val path = Paths.get(dir)
info(s"start clean job for $dir")
doClean(path.toFile, fileExpiredTime)
// re check if the disk has enough space
if (needToDeepClean(dir)) {
info(s"start deep clean job for $dir")
doClean(path.toFile, deepCleanFileExpiredTime)
if (needToDeepClean(dir)) {
warn(s"after deep clean $dir, used space still higher than $freeSpaceThreshold")
}
}
val finishedTime = System.currentTimeMillis()
info(s"clean job $dir finished, elapsed time: ${(finishedTime - startTime) / 1000} s")
}
def main(args: Array[String]): Unit = {
do {
info(s"start all clean job")
val startTime = System.currentTimeMillis()
val hasFinished = new CountDownLatch(cacheDirs.length)
cacheDirs.foreach { dir =>
threadPool.execute(() => {
try {
doCleanJob(dir)
} catch {
case NonFatal(e) =>
error(s"failed to clean dir: $dir", e)
} finally {
hasFinished.countDown()
}
})
}
hasFinished.await()
val usedTime = System.currentTimeMillis() - startTime
info(s"finished to clean all dir, elapsed time ${usedTime / 1000} s")
if (usedTime > scheduleInterval) {
warn(s"clean job elapsed time $usedTime which is greater than $scheduleInterval")
} else {
Thread.sleep(scheduleInterval - usedTime)
}
} while (!isTesting)
}
}
object KubernetesSparkBlockCleanerConstants {
val CACHE_DIRS_KEY = "CACHE_DIRS"
val FILE_EXPIRED_TIME_KEY = "FILE_EXPIRED_TIME"
val FREE_SPACE_THRESHOLD_KEY = "FREE_SPACE_THRESHOLD"
val SCHEDULE_INTERVAL = "SCHEDULE_INTERVAL"
val DEEP_CLEAN_FILE_EXPIRED_TIME_KEY = "DEEP_CLEAN_FILE_EXPIRED_TIME"
}

View File

@ -1,108 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.tools
import java.io.File
import java.nio.file.Files
import java.util.{Map => JMap}
import java.util.UUID
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
import org.apache.kyuubi.util.reflect.ReflectUtils._
class KubernetesSparkBlockCleanerSuite extends KyuubiFunSuite {
import KubernetesSparkBlockCleanerConstants._
private val rootDir = Utils.createTempDir()
private val cacheDir = Seq("1", "2").map(rootDir.resolve)
private val block1 = new File(cacheDir.head.toFile, s"blockmgr-${UUID.randomUUID.toString}")
private val block2 = new File(cacheDir.head.toFile, s"blockmgr-${UUID.randomUUID.toString}")
// do not remove
private val subDir1 = new File(block1, "01")
// do not remove
private val data11 = new File(subDir1, "shuffle_0_0_0")
// remove
private val data12 = new File(subDir1, "shuffle_0_0_1")
// remove
private val subDir2 = new File(block2, "02")
// remove
private val data21 = new File(subDir1, "shuffle_0_1_0")
private def deleteRecursive(path: File): Unit = {
path.listFiles.foreach { f =>
if (f.isDirectory) {
deleteRecursive(f)
} else {
f.delete()
}
}
path.delete()
}
override def beforeAll(): Unit = {
super.beforeAll()
cacheDir.foreach(Files.createDirectories(_))
// create some dir
Files.createDirectories(block1.toPath)
// hash sub dir
Files.createDirectory(subDir1.toPath)
data11.createNewFile()
data11.setLastModified(System.currentTimeMillis() - 10)
data12.createNewFile()
Files.write(data12.toPath, "111".getBytes())
data12.setLastModified(System.currentTimeMillis() - 10000000)
Files.createDirectories(block2.toPath)
Files.createDirectory(subDir2.toPath)
subDir2.setLastModified(System.currentTimeMillis() - 10000000)
data21.createNewFile()
data21.setLastModified(System.currentTimeMillis() - 10000000)
}
override def afterAll(): Unit = {
deleteRecursive(block1)
deleteRecursive(block2)
super.afterAll()
}
private def updateEnv(name: String, value: String): Unit = {
getField[JMap[String, String]](System.getenv, "m").put(name, value)
}
test("test clean") {
updateEnv(CACHE_DIRS_KEY, cacheDir.mkString(","))
updateEnv(FILE_EXPIRED_TIME_KEY, "600")
updateEnv(SCHEDULE_INTERVAL, "1")
updateEnv("kyuubi.testing", "true")
KubernetesSparkBlockCleaner.main(Array.empty)
assert(block1.exists())
assert(subDir1.exists())
assert(data11.exists())
assert(!data12.exists())
assert(block2.exists())
assert(!subDir2.exists())
assert(!data21.exists())
}
}