diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/AppDiskUsageMetric.scala b/common/src/main/scala/org/apache/celeborn/common/meta/AppDiskUsageMetric.scala index 497c73ce5..cd6bc1066 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/AppDiskUsageMetric.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/AppDiskUsageMetric.scala @@ -30,7 +30,7 @@ import org.apache.celeborn.common.util.{ThreadUtils, Utils} case class AppDiskUsage(var appId: String, var estimatedUsage: Long) { override def toString: String = - s"Application $appId used approximate ${Utils.bytesToString(estimatedUsage)} " + s"Application $appId used approximate ${Utils.bytesToString(estimatedUsage)}" } class AppDiskUsageSnapShot(val topItemCount: Int) extends Logging with Serializable { @@ -107,7 +107,7 @@ class AppDiskUsageSnapShot(val topItemCount: Int) extends Logging with Serializa s"Snapshot " + s"start ${LocalDateTime.ofInstant(Instant.ofEpochMilli(startSnapShotTime), zoneId)} " + s"end ${LocalDateTime.ofInstant(Instant.ofEpochMilli(endSnapShotTime), zoneId)}" + - s" ${topNItems.filter(_ != null).mkString(",")}" + s" ${topNItems.filter(_ != null).mkString(", ")}" } } @@ -143,13 +143,8 @@ class AppDiskUsageMetric(conf: CelebornConf) extends Logging { currentSnapShot.get().commit() } currentSnapShot.set(getNewSnapShot()) - val summaryStr = Some(summary()).map(str => - if (str != null && str.nonEmpty) { - "\n" + str - } else { - str - }).getOrElse("") - logInfo(s"App Disk Usage Top$usageCount Report $summaryStr") + val summaryStr = Some(summary()).getOrElse("") + logInfo(s"App Disk Usage Top$usageCount Report: $summaryStr") } }, 60, @@ -168,8 +163,8 @@ class AppDiskUsageMetric(conf: CelebornConf) extends Logging { val stringBuilder = new StringBuilder() for (i <- 0 until snapshotCount) { if (snapShots(i) != null && snapShots(i).topNItems.exists(_ != null)) { + stringBuilder.append("\n") stringBuilder.append(snapShots(i)) - stringBuilder.append(" \n") } } stringBuilder.toString() diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/AppDiskUsageMetricSuite.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/AppDiskUsageMetricSuite.scala index c91d197d7..f0820dd43 100644 --- a/master/src/test/scala/org/apache/celeborn/service/deploy/master/AppDiskUsageMetricSuite.scala +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/AppDiskUsageMetricSuite.scala @@ -36,26 +36,47 @@ class AppDiskUsageMetricSuite extends AnyFunSuite val WORKER2 = new WorkerInfo("host2", 211, 212, 213, 214, 215) val WORKER3 = new WorkerInfo("host3", 311, 312, 313, 314, 315) + def verifySnapShotOutput(snapShot: AppDiskUsageSnapShot, capacity: Int, appCount: Int): Unit = { + val topNItemsEstimatedUsage = snapShot.topNItems + .filter(usage => usage != null) + .map(_.estimatedUsage) + + assert(snapShot.topItemCount == capacity) + assert(topNItemsEstimatedUsage.length == appCount) + assert(topNItemsEstimatedUsage sameElements topNItemsEstimatedUsage.sorted.reverse) + } + test("test snapshot ordering") { + val snapShot = new AppDiskUsageSnapShot(50) + val rand = new Random() + for (i <- 1 to 5) { + snapShot.updateAppDiskUsage(s"app-${i}", rand.nextInt(100000000) + 1) + } + + verifySnapShotOutput(snapShot, 50, 5) + } + + test("test snapshot ordering with capacity") { val snapShot = new AppDiskUsageSnapShot(50) val rand = new Random() for (i <- 1 to 60) { snapShot.updateAppDiskUsage(s"app-${i}", rand.nextInt(100000000) + 1) } - println(snapShot.toString) + + verifySnapShotOutput(snapShot, 50, 50) } test("test snapshot ordering with duplicate entries") { val snapShot = new AppDiskUsageSnapShot(50) val rand = new Random() - for (i <- 1 to 60) { + for (i <- 1 to 10) { snapShot.updateAppDiskUsage(s"app-${i}", rand.nextInt(100000000) + 1) } - for (i <- 1 to 15) { + for (i <- 1 to 10) { snapShot.updateAppDiskUsage(s"app-${i}", rand.nextInt(100000000) + 1000000000) } - println(snapShot.toString) + verifySnapShotOutput(snapShot, 50, 10) } test("test app usage snapshot") {