optimize unit test running time.

This commit is contained in:
mingji 2022-04-08 15:51:33 +08:00
parent a7449a9821
commit 8ac0167b69
10 changed files with 53 additions and 425 deletions

View File

@ -117,26 +117,18 @@ trait SparkTestBase extends Logging with MiniClusterFeature {
val resultWithOutRss = inputRdd.combineByKey((k: Int) => (k, 1),
(acc: (Int, Int), v: Int) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)).collectAsMap()
Thread.sleep(3000L)
sparkSession.stop()
resultWithOutRss
}
def repartition(sparkSession: SparkSession): collection.Map[Char, Int] = {
val inputRdd = sparkSession.sparkContext.parallelize(sampleSeq, 2)
val result = inputRdd.repartition(8).reduceByKey((acc, v) => acc + v).collectAsMap()
Thread.sleep(3000L)
sparkSession.stop()
result
}
def groupBy(sparkSession: SparkSession): collection.Map[Char, String] = {
val inputRdd = sparkSession.sparkContext.parallelize(sampleSeq, 2)
val result = inputRdd.groupByKey().sortByKey().collectAsMap()
Thread.sleep(3000L)
sparkSession.stop()
result.map(k=>(k._1,k._2.toList.sorted.mkString(","))).toMap
}
@ -146,9 +138,6 @@ trait SparkTestBase extends Logging with MiniClusterFeature {
df.createOrReplaceTempView("tmp")
val result = sparkSession.sql("select fa,count(fb) from tmp group by fa order by fa")
val outMap = result.collect().map(row => row.getString(0) -> row.getLong(1)).toMap
Thread.sleep(3000L)
sparkSession.stop()
outMap
}
}

View File

@ -1,84 +0,0 @@
package com.aliyun.emr.rss.service.deploy.cluster
import com.aliyun.emr.rss.client.ShuffleClientImpl
import java.nio.charset.StandardCharsets
import org.apache.commons.lang3.RandomStringUtils
import org.junit.Test
import com.aliyun.emr.rss.client.write.LifecycleManager
import com.aliyun.emr.rss.common.RssConf
import com.aliyun.emr.rss.service.deploy.MiniClusterFeature
class ClusterReviveTest extends MiniClusterFeature{
@Test
def testWorkerLost(): Unit ={
val (worker1, workerRpcEnv1, worker2, workerRpcEnv2, worker3, workerRpcEnv3, worker4,
workerRpcEnv4, workerRpcEnv5, worker5) = setUpMiniCluster(Map("rss.worker.timeout" -> "10s"),
Map("rss.worker.flush.queue.capacity" -> "4", "rss.worker.timeout" -> "10s"))
val APP1 = "APP-1"
val clientConf = new RssConf()
clientConf.set("rss.push.data.replicate", "true")
clientConf.set("rss.push.data.buffer.size", "256K")
val metaSystem = new LifecycleManager(APP1, clientConf)
val shuffleClient = new ShuffleClientImpl(clientConf)
shuffleClient.setupMetaServiceRef(metaSystem.self)
val STR1 = RandomStringUtils.random(1024)
val DATA1 = STR1.getBytes(StandardCharsets.UTF_8)
val OFFSET1 = 0
val LENGTH1 = DATA1.length
val dataSize1 = shuffleClient.pushData(APP1, 1, 0, 0, 0, DATA1, OFFSET1, LENGTH1, 3, 3)
logInfo(s"push data data size ${dataSize1}")
shuffleClient.pushData(APP1, 1, 0, 0, 0, DATA1, OFFSET1, LENGTH1, 3, 3)
shuffleClient.pushData(APP1, 1, 0, 0, 0, DATA1, OFFSET1, LENGTH1, 3, 3)
shuffleClient.pushData(APP1, 1, 1, 0, 1, DATA1, OFFSET1, LENGTH1, 3, 3)
shuffleClient.pushData(APP1, 1, 2, 0, 2, DATA1, OFFSET1, LENGTH1, 3, 3)
shuffleClient.pushData(APP1, 1, 0, 0, 0, DATA1, OFFSET1, LENGTH1, 3, 3)
shuffleClient.pushData(APP1, 1, 1, 0, 1, DATA1, OFFSET1, LENGTH1, 3, 3)
shuffleClient.pushData(APP1, 1, 2, 0, 2, DATA1, OFFSET1, LENGTH1, 3, 3)
shuffleClient.pushData(APP1, 1, 0, 0, 0, DATA1, OFFSET1, LENGTH1, 3, 3)
shuffleClient.pushData(APP1, 1, 1, 0, 1, DATA1, OFFSET1, LENGTH1, 3, 3)
shuffleClient.pushData(APP1, 1, 2, 0, 2, DATA1, OFFSET1, LENGTH1, 3, 3)
worker2.rpcEnv.shutdown()
worker2.stop()
Thread.sleep(10000L)
shuffleClient.pushData(APP1, 1, 0, 0, 0, DATA1, OFFSET1, LENGTH1, 3, 3)
shuffleClient.pushData(APP1, 1, 1, 0, 1, DATA1, OFFSET1, LENGTH1, 3, 3)
shuffleClient.pushData(APP1, 1, 2, 0, 2, DATA1, OFFSET1, LENGTH1, 3, 3)
shuffleClient.pushData(APP1, 1, 0, 0, 0, DATA1, OFFSET1, LENGTH1, 3, 3)
shuffleClient.pushData(APP1, 1, 1, 0, 1, DATA1, OFFSET1, LENGTH1, 3, 3)
shuffleClient.pushData(APP1, 1, 2, 0, 2, DATA1, OFFSET1, LENGTH1, 3, 3)
shuffleClient.pushData(APP1, 1, 0, 0, 0, DATA1, OFFSET1, LENGTH1, 3, 3)
shuffleClient.pushData(APP1, 1, 1, 0, 1, DATA1, OFFSET1, LENGTH1, 3, 3)
shuffleClient.pushData(APP1, 1, 2, 0, 2, DATA1, OFFSET1, LENGTH1, 3, 3)
shuffleClient.pushData(APP1, 1, 0, 0, 0, DATA1, OFFSET1, LENGTH1, 3, 3)
shuffleClient.pushData(APP1, 1, 1, 0, 1, DATA1, OFFSET1, LENGTH1, 3, 3)
shuffleClient.pushData(APP1, 1, 2, 0, 2, DATA1, OFFSET1, LENGTH1, 3, 3)
Thread.sleep(5000L)
shuffleClient.pushData(APP1, 1, 0, 0, 0, DATA1, OFFSET1, LENGTH1, 3, 3)
shuffleClient.pushData(APP1, 1, 1, 0, 1, DATA1, OFFSET1, LENGTH1, 3, 3)
shuffleClient.pushData(APP1, 1, 2, 0, 2, DATA1, OFFSET1, LENGTH1, 3, 3)
shuffleClient.mapperEnd(APP1, 1, 0, 1, 3)
shuffleClient.mapperEnd(APP1, 1, 1, 1, 3)
shuffleClient.mapperEnd(APP1, 1, 2, 1, 3)
Thread.sleep(2000L)
shuffleClient.unregisterShuffle(APP1, 1, true)
Thread.sleep(2000L)
}
}

View File

@ -1,53 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.aliyun.emr.rss.service.deploy.integration
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.junit.{AfterClass, BeforeClass, Ignore, Test}
import com.aliyun.emr.rss.service.deploy.SparkTestBase
class CombineHashTest extends SparkTestBase {
@Test
def testHashRss(): Unit = {
val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[4]")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val resultWithoutRss = combine(sparkSession)
val resultWithRss = combine(SparkSession.builder()
.config(updateSparkConf(sparkConf, true)).getOrCreate())
assert(resultWithoutRss.equals(resultWithRss))
}
}
object CombineHashTest extends SparkTestBase {
@BeforeClass
def beforeAll(): Unit = {
logInfo("test initialized , setup rss mini cluster")
tuple = setupRssMiniCluster()
}
@AfterClass
def afterAll(): Unit = {
logInfo("all test complete , stop rss mini cluster")
clearMiniCluster(tuple)
}
}

View File

@ -1,52 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.aliyun.emr.rss.service.deploy.integration
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.junit.{AfterClass, BeforeClass, Ignore, Test}
import com.aliyun.emr.rss.service.deploy.SparkTestBase
class CombineSortTest extends SparkTestBase {
@Test
def test: Unit = {
val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[4]")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val resultWithoutRss = combine(sparkSession)
val resultWithRss = combine(SparkSession.builder()
.config(updateSparkConf(sparkConf, true)).getOrCreate())
assert(resultWithoutRss.equals(resultWithRss))
}
}
object CombineSortTest extends SparkTestBase {
@BeforeClass
def beforeAll(): Unit = {
logInfo("test initialized , setup rss mini cluster")
tuple = setupRssMiniCluster()
}
@AfterClass
def afterAll(): Unit = {
logInfo("all test complete , stop rss mini cluster")
clearMiniCluster(tuple)
}
}

View File

@ -1,52 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.aliyun.emr.rss.service.deploy.integration
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.junit.{AfterClass, BeforeClass, Ignore, Test}
import com.aliyun.emr.rss.service.deploy.SparkTestBase
class GroupHashTest extends SparkTestBase {
@Test
def test(): Unit = {
val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[4]")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val resultWithoutRss = groupBy(sparkSession)
val resultWithHashRss = groupBy(SparkSession.builder()
.config(updateSparkConf(sparkConf, false)).getOrCreate())
assert(resultWithoutRss.equals(resultWithHashRss))
}
}
object GroupHashTest extends SparkTestBase {
@BeforeClass
def beforeAll(): Unit = {
logInfo("test initialized , setup rss mini cluster")
tuple = setupRssMiniCluster()
}
@AfterClass
def afterAll(): Unit = {
logInfo("all test complete , stop rss mini cluster")
clearMiniCluster(tuple)
}
}

View File

@ -1,52 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.aliyun.emr.rss.service.deploy.integration
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.junit.{AfterClass, BeforeClass, Ignore, Test}
import com.aliyun.emr.rss.service.deploy.SparkTestBase
class GroupSortTest extends SparkTestBase {
@Test
def test(): Unit = {
val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[4]")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val resultWithoutRss = groupBy(sparkSession)
val resultWithRss = groupBy(SparkSession.builder()
.config(updateSparkConf(sparkConf, true)).getOrCreate())
assert(resultWithoutRss.equals(resultWithRss))
}
}
object GroupSortTest extends SparkTestBase {
@BeforeClass
def beforeAll(): Unit = {
logInfo("test initialized , setup rss mini cluster")
tuple = setupRssMiniCluster()
}
@AfterClass
def afterAll(): Unit = {
logInfo("all test complete , stop rss mini cluster")
clearMiniCluster(tuple)
}
}

View File

@ -1,52 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.aliyun.emr.rss.service.deploy.integration
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.junit.{AfterClass, BeforeClass, Ignore, Test}
import com.aliyun.emr.rss.service.deploy.SparkTestBase
class RepartionHashTest extends SparkTestBase {
@Test
def test(): Unit = {
val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[4]")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val resultWithoutRss = repartition(sparkSession)
val resultWithHashRss = repartition(SparkSession.builder()
.config(updateSparkConf(sparkConf, false)).getOrCreate())
assert(resultWithoutRss.equals(resultWithHashRss))
}
}
object RepartionHashTest extends SparkTestBase {
@BeforeClass
def beforeAll(): Unit = {
logInfo("test initialized , setup rss mini cluster")
tuple = setupRssMiniCluster()
}
@AfterClass
def afterAll(): Unit = {
logInfo("all test complete , stop rss mini cluster")
clearMiniCluster(tuple)
}
}

View File

@ -1,52 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.aliyun.emr.rss.service.deploy.integration
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.junit.{AfterClass, BeforeClass, Ignore, Test}
import com.aliyun.emr.rss.service.deploy.SparkTestBase
class RepartitionSortTest extends SparkTestBase {
@Test
def test(): Unit = {
val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[4]")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val resultWithoutRss = repartition(sparkSession)
val resultWithSortRss = repartition(SparkSession.builder()
.config(updateSparkConf(sparkConf, true)).getOrCreate())
assert(resultWithoutRss.equals(resultWithSortRss))
}
}
object RepartitionSortTest extends SparkTestBase {
@BeforeClass
def beforeAll(): Unit = {
logInfo("test initialized , setup rss mini cluster")
tuple = setupRssMiniCluster()
}
@AfterClass
def afterAll(): Unit = {
logInfo("all test complete , stop rss mini cluster")
clearMiniCluster(tuple)
}
}

View File

@ -23,23 +23,38 @@ import org.junit.{AfterClass, BeforeClass, Test}
import com.aliyun.emr.rss.service.deploy.SparkTestBase
class SqlHashTest extends SparkTestBase {
class RssHashTests extends SparkTestBase {
@Test
def test(): Unit = {
val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[4]")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
logInfo("run sql test without rss")
val resultWithoutRss = runsql(sparkSession)
val combineResult = combine(sparkSession)
val groupbyResult = groupBy(sparkSession)
val repartitionResult = repartition(sparkSession)
val sqlResult = runsql(sparkSession)
logInfo("run sql test with hash rss")
val resultWithHashRss = runsql(SparkSession.builder()
.config(updateSparkConf(sparkConf, false)).getOrCreate())
Thread.sleep(3000L)
sparkSession.stop()
assert(resultWithoutRss.equals(resultWithHashRss))
val rssSparkSession = SparkSession.builder()
.config(updateSparkConf(sparkConf, false)).getOrCreate()
val rssCombineResult = combine(rssSparkSession)
val rssGroupbyResult = groupBy(rssSparkSession)
val rssRepartitionResult = repartition(rssSparkSession)
val rssSqlResult = runsql(rssSparkSession)
assert(combineResult.equals(rssCombineResult))
assert(groupbyResult.equals(rssGroupbyResult))
assert(repartitionResult.equals(rssRepartitionResult))
assert(combineResult.equals(rssCombineResult))
assert(sqlResult.equals(rssSqlResult))
rssSparkSession.stop()
}
}
object SqlHashTest extends SparkTestBase {
object RssHashTests extends SparkTestBase {
@BeforeClass
def beforeAll(): Unit = {
logInfo("test initialized , setup rss mini cluster")
@ -52,3 +67,5 @@ object SqlHashTest extends SparkTestBase {
clearMiniCluster(tuple)
}
}

View File

@ -22,23 +22,40 @@ import org.apache.spark.sql.SparkSession
import org.junit.{AfterClass, BeforeClass, Test}
import com.aliyun.emr.rss.service.deploy.SparkTestBase
import com.aliyun.emr.rss.service.deploy.integration.RssHashTests.{clearMiniCluster, logInfo, setupRssMiniCluster, tuple}
class RssSortTests extends SparkTestBase {
class SqlSortTest extends SparkTestBase {
@Test
def test(): Unit = {
val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[4]")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
logInfo("run sql test without rss")
val resultWithoutRss = runsql(sparkSession)
val combineResult = combine(sparkSession)
val groupbyResult = groupBy(sparkSession)
val repartitionResult = repartition(sparkSession)
val sqlResult = runsql(sparkSession)
logInfo("run sql test with hash rss")
val resultWithRss = runsql(SparkSession.builder()
.config(updateSparkConf(sparkConf, true)).getOrCreate())
Thread.sleep(3000L)
sparkSession.stop()
assert(resultWithoutRss.equals(resultWithRss))
val rssSparkSession = SparkSession.builder()
.config(updateSparkConf(sparkConf, true)).getOrCreate()
val rssCombineResult = combine(rssSparkSession)
val rssGroupbyResult = groupBy(rssSparkSession)
val rssRepartitionResult = repartition(rssSparkSession)
val rssSqlResult = runsql(rssSparkSession)
assert(combineResult.equals(rssCombineResult))
assert(groupbyResult.equals(rssGroupbyResult))
assert(repartitionResult.equals(rssRepartitionResult))
assert(combineResult.equals(rssCombineResult))
assert(sqlResult.equals(rssSqlResult))
rssSparkSession.stop()
}
}
object SqlSortTest extends SparkTestBase {
object RssSortTests extends SparkTestBase {
@BeforeClass
def beforeAll(): Unit = {
logInfo("test initialized , setup rss mini cluster")
@ -50,4 +67,6 @@ object SqlSortTest extends SparkTestBase {
logInfo("all test complete , stop rss mini cluster")
clearMiniCluster(tuple)
}
}
}