From 8ac0167b69047b45d43c478dd12c19a275de0624 Mon Sep 17 00:00:00 2001 From: mingji Date: Fri, 8 Apr 2022 15:51:33 +0800 Subject: [PATCH] optimize unit test running time. --- .../rss/service/deploy/SparkTestBase.scala | 11 --- .../deploy/cluster/ClusterReviveTest.scala | 84 ------------------- .../deploy/integration/CombineHashTest.scala | 53 ------------ .../deploy/integration/CombineSortTest.scala | 52 ------------ .../deploy/integration/GroupHashTest.scala | 52 ------------ .../deploy/integration/GroupSortTest.scala | 52 ------------ .../integration/RepartionHashTest.scala | 52 ------------ .../integration/RepartitionSortTest.scala | 52 ------------ .../{SqlHashTest.scala => RssHashTests.scala} | 33 ++++++-- .../{SqlSortTest.scala => RssSortTests.scala} | 37 ++++++-- 10 files changed, 53 insertions(+), 425 deletions(-) delete mode 100644 server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/cluster/ClusterReviveTest.scala delete mode 100644 server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/CombineHashTest.scala delete mode 100644 server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/CombineSortTest.scala delete mode 100644 server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/GroupHashTest.scala delete mode 100644 server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/GroupSortTest.scala delete mode 100644 server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/RepartionHashTest.scala delete mode 100644 server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/RepartitionSortTest.scala rename server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/{SqlHashTest.scala => RssHashTests.scala} (61%) rename server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/{SqlSortTest.scala => RssSortTests.scala} (58%) diff --git a/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/SparkTestBase.scala b/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/SparkTestBase.scala index fee433262..f87842b3f 100644 --- a/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/SparkTestBase.scala +++ b/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/SparkTestBase.scala @@ -117,26 +117,18 @@ trait SparkTestBase extends Logging with MiniClusterFeature { val resultWithOutRss = inputRdd.combineByKey((k: Int) => (k, 1), (acc: (Int, Int), v: Int) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)).collectAsMap() - - Thread.sleep(3000L) - sparkSession.stop() resultWithOutRss } def repartition(sparkSession: SparkSession): collection.Map[Char, Int] = { val inputRdd = sparkSession.sparkContext.parallelize(sampleSeq, 2) val result = inputRdd.repartition(8).reduceByKey((acc, v) => acc + v).collectAsMap() - Thread.sleep(3000L) - sparkSession.stop() result } def groupBy(sparkSession: SparkSession): collection.Map[Char, String] = { val inputRdd = sparkSession.sparkContext.parallelize(sampleSeq, 2) val result = inputRdd.groupByKey().sortByKey().collectAsMap() - - Thread.sleep(3000L) - sparkSession.stop() result.map(k=>(k._1,k._2.toList.sorted.mkString(","))).toMap } @@ -146,9 +138,6 @@ trait SparkTestBase extends Logging with MiniClusterFeature { df.createOrReplaceTempView("tmp") val result = sparkSession.sql("select fa,count(fb) from tmp group by fa order by fa") val outMap = result.collect().map(row => row.getString(0) -> row.getLong(1)).toMap - - Thread.sleep(3000L) - sparkSession.stop() outMap } } diff --git a/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/cluster/ClusterReviveTest.scala b/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/cluster/ClusterReviveTest.scala deleted file mode 100644 index 30d1f732d..000000000 --- a/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/cluster/ClusterReviveTest.scala +++ /dev/null @@ -1,84 +0,0 @@ -package com.aliyun.emr.rss.service.deploy.cluster - -import com.aliyun.emr.rss.client.ShuffleClientImpl - -import java.nio.charset.StandardCharsets -import org.apache.commons.lang3.RandomStringUtils -import org.junit.Test -import com.aliyun.emr.rss.client.write.LifecycleManager -import com.aliyun.emr.rss.common.RssConf -import com.aliyun.emr.rss.service.deploy.MiniClusterFeature - -class ClusterReviveTest extends MiniClusterFeature{ - @Test - def testWorkerLost(): Unit ={ - val (worker1, workerRpcEnv1, worker2, workerRpcEnv2, worker3, workerRpcEnv3, worker4, - workerRpcEnv4, workerRpcEnv5, worker5) = setUpMiniCluster(Map("rss.worker.timeout" -> "10s"), - Map("rss.worker.flush.queue.capacity" -> "4", "rss.worker.timeout" -> "10s")) - - val APP1 = "APP-1" - - val clientConf = new RssConf() - clientConf.set("rss.push.data.replicate", "true") - clientConf.set("rss.push.data.buffer.size", "256K") - val metaSystem = new LifecycleManager(APP1, clientConf) - val shuffleClient = new ShuffleClientImpl(clientConf) - shuffleClient.setupMetaServiceRef(metaSystem.self) - - val STR1 = RandomStringUtils.random(1024) - val DATA1 = STR1.getBytes(StandardCharsets.UTF_8) - val OFFSET1 = 0 - val LENGTH1 = DATA1.length - - val dataSize1 = shuffleClient.pushData(APP1, 1, 0, 0, 0, DATA1, OFFSET1, LENGTH1, 3, 3) - logInfo(s"push data data size ${dataSize1}") - shuffleClient.pushData(APP1, 1, 0, 0, 0, DATA1, OFFSET1, LENGTH1, 3, 3) - - shuffleClient.pushData(APP1, 1, 0, 0, 0, DATA1, OFFSET1, LENGTH1, 3, 3) - shuffleClient.pushData(APP1, 1, 1, 0, 1, DATA1, OFFSET1, LENGTH1, 3, 3) - shuffleClient.pushData(APP1, 1, 2, 0, 2, DATA1, OFFSET1, LENGTH1, 3, 3) - - shuffleClient.pushData(APP1, 1, 0, 0, 0, DATA1, OFFSET1, LENGTH1, 3, 3) - shuffleClient.pushData(APP1, 1, 1, 0, 1, DATA1, OFFSET1, LENGTH1, 3, 3) - shuffleClient.pushData(APP1, 1, 2, 0, 2, DATA1, OFFSET1, LENGTH1, 3, 3) - - shuffleClient.pushData(APP1, 1, 0, 0, 0, DATA1, OFFSET1, LENGTH1, 3, 3) - shuffleClient.pushData(APP1, 1, 1, 0, 1, DATA1, OFFSET1, LENGTH1, 3, 3) - shuffleClient.pushData(APP1, 1, 2, 0, 2, DATA1, OFFSET1, LENGTH1, 3, 3) - - worker2.rpcEnv.shutdown() - worker2.stop() - Thread.sleep(10000L) - - shuffleClient.pushData(APP1, 1, 0, 0, 0, DATA1, OFFSET1, LENGTH1, 3, 3) - shuffleClient.pushData(APP1, 1, 1, 0, 1, DATA1, OFFSET1, LENGTH1, 3, 3) - shuffleClient.pushData(APP1, 1, 2, 0, 2, DATA1, OFFSET1, LENGTH1, 3, 3) - - shuffleClient.pushData(APP1, 1, 0, 0, 0, DATA1, OFFSET1, LENGTH1, 3, 3) - shuffleClient.pushData(APP1, 1, 1, 0, 1, DATA1, OFFSET1, LENGTH1, 3, 3) - shuffleClient.pushData(APP1, 1, 2, 0, 2, DATA1, OFFSET1, LENGTH1, 3, 3) - - shuffleClient.pushData(APP1, 1, 0, 0, 0, DATA1, OFFSET1, LENGTH1, 3, 3) - shuffleClient.pushData(APP1, 1, 1, 0, 1, DATA1, OFFSET1, LENGTH1, 3, 3) - shuffleClient.pushData(APP1, 1, 2, 0, 2, DATA1, OFFSET1, LENGTH1, 3, 3) - - shuffleClient.pushData(APP1, 1, 0, 0, 0, DATA1, OFFSET1, LENGTH1, 3, 3) - shuffleClient.pushData(APP1, 1, 1, 0, 1, DATA1, OFFSET1, LENGTH1, 3, 3) - shuffleClient.pushData(APP1, 1, 2, 0, 2, DATA1, OFFSET1, LENGTH1, 3, 3) - - Thread.sleep(5000L) - - shuffleClient.pushData(APP1, 1, 0, 0, 0, DATA1, OFFSET1, LENGTH1, 3, 3) - shuffleClient.pushData(APP1, 1, 1, 0, 1, DATA1, OFFSET1, LENGTH1, 3, 3) - shuffleClient.pushData(APP1, 1, 2, 0, 2, DATA1, OFFSET1, LENGTH1, 3, 3) - - shuffleClient.mapperEnd(APP1, 1, 0, 1, 3) - shuffleClient.mapperEnd(APP1, 1, 1, 1, 3) - shuffleClient.mapperEnd(APP1, 1, 2, 1, 3) - - Thread.sleep(2000L) - shuffleClient.unregisterShuffle(APP1, 1, true) - Thread.sleep(2000L) - } - -} diff --git a/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/CombineHashTest.scala b/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/CombineHashTest.scala deleted file mode 100644 index 418ead155..000000000 --- a/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/CombineHashTest.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.aliyun.emr.rss.service.deploy.integration - -import org.apache.spark.SparkConf -import org.apache.spark.sql.SparkSession -import org.junit.{AfterClass, BeforeClass, Ignore, Test} - -import com.aliyun.emr.rss.service.deploy.SparkTestBase - -class CombineHashTest extends SparkTestBase { - - @Test - def testHashRss(): Unit = { - val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[4]") - val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() - val resultWithoutRss = combine(sparkSession) - - val resultWithRss = combine(SparkSession.builder() - .config(updateSparkConf(sparkConf, true)).getOrCreate()) - - assert(resultWithoutRss.equals(resultWithRss)) - } -} - -object CombineHashTest extends SparkTestBase { - @BeforeClass - def beforeAll(): Unit = { - logInfo("test initialized , setup rss mini cluster") - tuple = setupRssMiniCluster() - } - - @AfterClass - def afterAll(): Unit = { - logInfo("all test complete , stop rss mini cluster") - clearMiniCluster(tuple) - } -} diff --git a/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/CombineSortTest.scala b/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/CombineSortTest.scala deleted file mode 100644 index 3ab8d927a..000000000 --- a/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/CombineSortTest.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.aliyun.emr.rss.service.deploy.integration - -import org.apache.spark.SparkConf -import org.apache.spark.sql.SparkSession -import org.junit.{AfterClass, BeforeClass, Ignore, Test} - -import com.aliyun.emr.rss.service.deploy.SparkTestBase - -class CombineSortTest extends SparkTestBase { - @Test - def test: Unit = { - val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[4]") - val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() - val resultWithoutRss = combine(sparkSession) - - val resultWithRss = combine(SparkSession.builder() - .config(updateSparkConf(sparkConf, true)).getOrCreate()) - - assert(resultWithoutRss.equals(resultWithRss)) - } -} - -object CombineSortTest extends SparkTestBase { - @BeforeClass - def beforeAll(): Unit = { - logInfo("test initialized , setup rss mini cluster") - tuple = setupRssMiniCluster() - } - - @AfterClass - def afterAll(): Unit = { - logInfo("all test complete , stop rss mini cluster") - clearMiniCluster(tuple) - } -} \ No newline at end of file diff --git a/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/GroupHashTest.scala b/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/GroupHashTest.scala deleted file mode 100644 index 1b134d344..000000000 --- a/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/GroupHashTest.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.aliyun.emr.rss.service.deploy.integration - -import org.apache.spark.SparkConf -import org.apache.spark.sql.SparkSession -import org.junit.{AfterClass, BeforeClass, Ignore, Test} - -import com.aliyun.emr.rss.service.deploy.SparkTestBase - -class GroupHashTest extends SparkTestBase { - @Test - def test(): Unit = { - val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[4]") - val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() - val resultWithoutRss = groupBy(sparkSession) - - val resultWithHashRss = groupBy(SparkSession.builder() - .config(updateSparkConf(sparkConf, false)).getOrCreate()) - - assert(resultWithoutRss.equals(resultWithHashRss)) - } -} - -object GroupHashTest extends SparkTestBase { - @BeforeClass - def beforeAll(): Unit = { - logInfo("test initialized , setup rss mini cluster") - tuple = setupRssMiniCluster() - } - - @AfterClass - def afterAll(): Unit = { - logInfo("all test complete , stop rss mini cluster") - clearMiniCluster(tuple) - } -} diff --git a/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/GroupSortTest.scala b/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/GroupSortTest.scala deleted file mode 100644 index 385e29224..000000000 --- a/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/GroupSortTest.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.aliyun.emr.rss.service.deploy.integration - -import org.apache.spark.SparkConf -import org.apache.spark.sql.SparkSession -import org.junit.{AfterClass, BeforeClass, Ignore, Test} - -import com.aliyun.emr.rss.service.deploy.SparkTestBase - -class GroupSortTest extends SparkTestBase { - @Test - def test(): Unit = { - val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[4]") - val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() - val resultWithoutRss = groupBy(sparkSession) - - val resultWithRss = groupBy(SparkSession.builder() - .config(updateSparkConf(sparkConf, true)).getOrCreate()) - - assert(resultWithoutRss.equals(resultWithRss)) - } -} - -object GroupSortTest extends SparkTestBase { - @BeforeClass - def beforeAll(): Unit = { - logInfo("test initialized , setup rss mini cluster") - tuple = setupRssMiniCluster() - } - - @AfterClass - def afterAll(): Unit = { - logInfo("all test complete , stop rss mini cluster") - clearMiniCluster(tuple) - } -} \ No newline at end of file diff --git a/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/RepartionHashTest.scala b/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/RepartionHashTest.scala deleted file mode 100644 index 2163ac0ad..000000000 --- a/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/RepartionHashTest.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.aliyun.emr.rss.service.deploy.integration - -import org.apache.spark.SparkConf -import org.apache.spark.sql.SparkSession -import org.junit.{AfterClass, BeforeClass, Ignore, Test} - -import com.aliyun.emr.rss.service.deploy.SparkTestBase - -class RepartionHashTest extends SparkTestBase { - @Test - def test(): Unit = { - val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[4]") - val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() - val resultWithoutRss = repartition(sparkSession) - - val resultWithHashRss = repartition(SparkSession.builder() - .config(updateSparkConf(sparkConf, false)).getOrCreate()) - - assert(resultWithoutRss.equals(resultWithHashRss)) - } -} - -object RepartionHashTest extends SparkTestBase { - @BeforeClass - def beforeAll(): Unit = { - logInfo("test initialized , setup rss mini cluster") - tuple = setupRssMiniCluster() - } - - @AfterClass - def afterAll(): Unit = { - logInfo("all test complete , stop rss mini cluster") - clearMiniCluster(tuple) - } -} \ No newline at end of file diff --git a/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/RepartitionSortTest.scala b/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/RepartitionSortTest.scala deleted file mode 100644 index 53829670f..000000000 --- a/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/RepartitionSortTest.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.aliyun.emr.rss.service.deploy.integration - -import org.apache.spark.SparkConf -import org.apache.spark.sql.SparkSession -import org.junit.{AfterClass, BeforeClass, Ignore, Test} - -import com.aliyun.emr.rss.service.deploy.SparkTestBase - -class RepartitionSortTest extends SparkTestBase { - @Test - def test(): Unit = { - val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[4]") - val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() - val resultWithoutRss = repartition(sparkSession) - - val resultWithSortRss = repartition(SparkSession.builder() - .config(updateSparkConf(sparkConf, true)).getOrCreate()) - - assert(resultWithoutRss.equals(resultWithSortRss)) - } -} - -object RepartitionSortTest extends SparkTestBase { - @BeforeClass - def beforeAll(): Unit = { - logInfo("test initialized , setup rss mini cluster") - tuple = setupRssMiniCluster() - } - - @AfterClass - def afterAll(): Unit = { - logInfo("all test complete , stop rss mini cluster") - clearMiniCluster(tuple) - } -} diff --git a/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/SqlHashTest.scala b/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/RssHashTests.scala similarity index 61% rename from server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/SqlHashTest.scala rename to server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/RssHashTests.scala index 69d463446..cb124e2b4 100644 --- a/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/SqlHashTest.scala +++ b/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/RssHashTests.scala @@ -23,23 +23,38 @@ import org.junit.{AfterClass, BeforeClass, Test} import com.aliyun.emr.rss.service.deploy.SparkTestBase -class SqlHashTest extends SparkTestBase { +class RssHashTests extends SparkTestBase { + @Test def test(): Unit = { val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[4]") val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() - logInfo("run sql test without rss") - val resultWithoutRss = runsql(sparkSession) + val combineResult = combine(sparkSession) + val groupbyResult = groupBy(sparkSession) + val repartitionResult = repartition(sparkSession) + val sqlResult = runsql(sparkSession) - logInfo("run sql test with hash rss") - val resultWithHashRss = runsql(SparkSession.builder() - .config(updateSparkConf(sparkConf, false)).getOrCreate()) + Thread.sleep(3000L) + sparkSession.stop() - assert(resultWithoutRss.equals(resultWithHashRss)) + val rssSparkSession = SparkSession.builder() + .config(updateSparkConf(sparkConf, false)).getOrCreate() + val rssCombineResult = combine(rssSparkSession) + val rssGroupbyResult = groupBy(rssSparkSession) + val rssRepartitionResult = repartition(rssSparkSession) + val rssSqlResult = runsql(rssSparkSession) + + assert(combineResult.equals(rssCombineResult)) + assert(groupbyResult.equals(rssGroupbyResult)) + assert(repartitionResult.equals(rssRepartitionResult)) + assert(combineResult.equals(rssCombineResult)) + assert(sqlResult.equals(rssSqlResult)) + + rssSparkSession.stop() } } -object SqlHashTest extends SparkTestBase { +object RssHashTests extends SparkTestBase { @BeforeClass def beforeAll(): Unit = { logInfo("test initialized , setup rss mini cluster") @@ -52,3 +67,5 @@ object SqlHashTest extends SparkTestBase { clearMiniCluster(tuple) } } + + diff --git a/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/SqlSortTest.scala b/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/RssSortTests.scala similarity index 58% rename from server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/SqlSortTest.scala rename to server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/RssSortTests.scala index c59345e86..5566d1bd4 100644 --- a/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/SqlSortTest.scala +++ b/server-worker/src/test/scala/com/aliyun/emr/rss/service/deploy/integration/RssSortTests.scala @@ -22,23 +22,40 @@ import org.apache.spark.sql.SparkSession import org.junit.{AfterClass, BeforeClass, Test} import com.aliyun.emr.rss.service.deploy.SparkTestBase +import com.aliyun.emr.rss.service.deploy.integration.RssHashTests.{clearMiniCluster, logInfo, setupRssMiniCluster, tuple} + +class RssSortTests extends SparkTestBase { -class SqlSortTest extends SparkTestBase { @Test def test(): Unit = { val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[4]") val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() - logInfo("run sql test without rss") - val resultWithoutRss = runsql(sparkSession) + val combineResult = combine(sparkSession) + val groupbyResult = groupBy(sparkSession) + val repartitionResult = repartition(sparkSession) + val sqlResult = runsql(sparkSession) - logInfo("run sql test with hash rss") - val resultWithRss = runsql(SparkSession.builder() - .config(updateSparkConf(sparkConf, true)).getOrCreate()) + Thread.sleep(3000L) + sparkSession.stop() - assert(resultWithoutRss.equals(resultWithRss)) + val rssSparkSession = SparkSession.builder() + .config(updateSparkConf(sparkConf, true)).getOrCreate() + val rssCombineResult = combine(rssSparkSession) + val rssGroupbyResult = groupBy(rssSparkSession) + val rssRepartitionResult = repartition(rssSparkSession) + val rssSqlResult = runsql(rssSparkSession) + + assert(combineResult.equals(rssCombineResult)) + assert(groupbyResult.equals(rssGroupbyResult)) + assert(repartitionResult.equals(rssRepartitionResult)) + assert(combineResult.equals(rssCombineResult)) + assert(sqlResult.equals(rssSqlResult)) + + rssSparkSession.stop() } } -object SqlSortTest extends SparkTestBase { + +object RssSortTests extends SparkTestBase { @BeforeClass def beforeAll(): Unit = { logInfo("test initialized , setup rss mini cluster") @@ -50,4 +67,6 @@ object SqlSortTest extends SparkTestBase { logInfo("all test complete , stop rss mini cluster") clearMiniCluster(tuple) } -} \ No newline at end of file +} + +