same instance being shared across multiple class for a lazy val declared in a trait?

51 Views Asked by At

I have the tests failing intermittently and after some debugging I am observing that the spark val declared in the trait is being shared across the two separate suites (came to this conclusion because the same hashCode is being printed in beforeAll in the log). Sharing here the base classes, 2 test suites and the log. What am I missing?

UPDATE:

When I removed the var tableDeltaDf: DataFrame = spark.emptyDataFrame from DataValidationTest and MetadataTest, the issue is gone. I am still not clear on what is happening.

Base trait and class

trait SparkSessionTestWrapper {
      lazy val spark: SparkSession = { SparkSession.builder().getOrCreate() }
    }
    
abstract class SparkTestBaseSpec extends AnyFunSuite
      with SparkSessionTestWrapper with DatasetComparer
      with Logging
      with Matchers with OptionValues with Inside with Inspectors with BeforeAndAfterAll {
      override protected def beforeAll(): Unit = {
        println("beforeAll spark.hashCode: " + spark.hashCode())
        logInfo("beforeAll spark.isStopped: " + spark.sparkContext.isStopped)
      }

  override protected def afterAll: Unit = {
    println("afterAll spark.hashCode: " + spark.hashCode())
    spark.stop()
    logInfo("afterAll spark.isStopped: " + spark.sparkContext.isStopped)
  }
}

TestSpec 1 & 2

import org.scalatest.BeforeAndAfterAll
    
class MetadataTest extends SparkTestBaseSpec with BeforeAndAfterAll {
      var tableDeltaDf: DataFrame = spark.emptyDataFrame

      override protected def beforeAll: Unit = {
            super.beforeAll()
            tableDeltaDf = createNewDF() //removed the def for brevity
      }
        
       test("test 1") {
        
       }
}
        
class DataValidationTest extends SparkTestBaseSpec with BeforeAndAfterAll {
        
var tableDeltaDf: DataFrame = spark.emptyDataFrame
          
override protected def beforeAll: Unit = {
            super.beforeAll()
tableDeltaDf = createNewDF() //removed the def for brevity
}
        
test("test 2") {
        
}
}

IntelliJ IDEA Log

##teamcity[testSuiteStarted name='DataValidationTest' locationHint='scalatest://TopOfClass:DataValidationTestTestName:DataValidationTest' captureStandardOutput='true' nodeId='36' parentNodeId='0']
beforeAll spark.hashCode: 2024860730
afterAll spark.hashCode: 2024860730
beforeAll spark.hashCode: 2024860730
##teamcity[message text='Exception encountered when invoking run on a nested suite - Cannot call methods on a stopped SparkContext.|nThis stopped SparkContext was created at:|n|norg.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943)|ncom.tccc.dna.synapse.spark.test.SparkSessionTestWrapper.spark(SparkSessionTestWrapper.scala:23)|ncom.tccc.dna.synapse.spark.test.SparkSessionTestWrapper.spark$(SparkSessionTestWrapper.scala:10)|ncom.tccc.dna.synapse.spark.test.SparkTestBaseSpec.spark$lzycompute(SparkTestBaseSpec.scala:12)|ncom.tccc.dna.synapse.spark.test.SparkTestBaseSpec.spark(SparkTestBaseSpec.scala:12)|ncom.tccc.dna.synapse.spark.test.SparkTestBaseSpec.beforeAll(SparkTestBaseSpec.scala:18)|nCustomerSatisfactionSparkAppTest.beforeAll(CustomerSatisfactionSparkAppTest.scala:17)|norg.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)|norg.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)|norg.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)|ncom.tccc.dna.synapse.spark.test.SparkTestBaseSpec.run(SparkTestBaseSpec.scala:12)|norg.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)|norg.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)|norg.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)|nscala.collection.immutable.List.foreach(List.scala:431)|norg.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)|norg.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)|norg.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)|norg.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)|norg.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)|n|nThe currently active SparkContext was created at:|n|n(No active SparkContext.)|n         ' status='ERROR' errorDetails='java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.|nThis stopped SparkContext was created at:|n|norg.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943)|ncom.tccc.dna.synapse.spark.test.SparkSessionTestWrapper.spark(SparkSessionTestWrapper.scala:23)|ncom.tccc.dna.synapse.spark.test.SparkSessionTestWrapper.spark$(SparkSessionTestWrapper.scala:10)|ncom.tccc.dna.synapse.spark.test.SparkTestBaseSpec.spark$lzycompute(SparkTestBaseSpec.scala:12)|ncom.tccc.dna.synapse.spark.test.SparkTestBaseSpec.spark(SparkTestBaseSpec.scala:12)|ncom.tccc.dna.synapse.spark.test.SparkTestBaseSpec.beforeAll(SparkTestBaseSpec.scala:18)|nCustomerSatisfactionSparkAppTest.beforeAll(CustomerSatisfactionSparkAppTest.scala:17)|norg.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)|norg.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)|norg.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)|ncom.tccc.dna.synapse.spark.test.SparkTestBaseSpec.run(SparkTestBaseSpec.scala:12)|norg.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)|norg.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)|norg.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)|nscala.collection.immutable.List.foreach(List.scala:431)|norg.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)|norg.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)|norg.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)|norg.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)|norg.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)|n|nThe currently active SparkContext was created at:|n|n(No active SparkContext.)|n         |n   at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:118)|n at org.apache.spark.SparkContext.defaultParallelism(SparkContext.scala:2510)|n  at org.apache.spark.SparkContext.defaultMinPartitions(SparkContext.scala:2519)|n    at org.apache.spark.sql.execution.datasources.json.MultiLineJsonDataSource$.createBaseRdd(JsonDataSource.scala:188)|n   at org.apache.spark.sql.execution.datasources.json.MultiLineJsonDataSource$.infer(JsonDataSource.scala:161)|n   at org.apache.spark.sql.execution.datasources.json.JsonDataSource.inferSchema(JsonDataSource.scala:65)|n    at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.inferSchema(JsonFileFormat.scala:59)|n    at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:210)|n at scala.Option.orElse(Option.scala:447)|n  at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:207)|n at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:411)|n    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:274)|n   at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:245)|n    at scala.Option.getOrElse(Option.scala:189)|n   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:245)|n   at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:405)|n   at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:386)|n   at com.tccc.dna.supplychain.cs.files.JsonFile$.readFile(JsonFile.scala:13)|n    at com.tccc.dna.supplychain.cs.init.ApplicationConfig$.loadConfig(ApplicationConfig.scala:18)|n at DataValidationTest.beforeAll(DataValidationTest.scala:21)|n  at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)|n at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)|n   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)|n  at com.tccc.dna.synapse.spark.test.SparkTestBaseSpec.run(SparkTestBaseSpec.scala:12)|n  at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)|n  at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)|n   at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)|n   at scala.collection.immutable.List.foreach(List.scala:431)|n    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)|n   at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)|n    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)|n    at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)|n  at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)|n    at org.scalatest.tools.Runner$.run(Runner.scala:798)|n  at org.scalatest.tools.Runner.run(Runner.scala)|n   at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43)|n at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26)|n']

##teamcity[testSuiteStarted name='MetaDataTest' locationHint='scalatest://TopOfClass:MetaDataTestTestName:MetaDataTest' captureStandardOutput='true' nodeId='37' parentNodeId='36']
afterAll spark.hashCode: 2024860730
##teamcity[message text='Exception encountered when invoking run on a nested suite - Cannot call methods on a stopped SparkContext.|nThis stopped SparkContext was created at:|n|norg.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943)|ncom.tccc.dna.synapse.spark.test.SparkSessionTestWrapper.spark(SparkSessionTestWrapper.scala:23)|ncom.tccc.dna.synapse.spark.test.SparkSessionTestWrapper.spark$(SparkSessionTestWrapper.scala:10)|ncom.tccc.dna.synapse.spark.test.SparkTestBaseSpec.spark$lzycompute(SparkTestBaseSpec.scala:12)|ncom.tccc.dna.synapse.spark.test.SparkTestBaseSpec.spark(SparkTestBaseSpec.scala:12)|ncom.tccc.dna.synapse.spark.test.SparkTestBaseSpec.beforeAll(SparkTestBaseSpec.scala:18)|nCustomerSatisfactionSparkAppTest.beforeAll(CustomerSatisfactionSparkAppTest.scala:17)|norg.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)|norg.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)|norg.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)|ncom.tccc.dna.synapse.spark.test.SparkTestBaseSpec.run(SparkTestBaseSpec.scala:12)|norg.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)|norg.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)|norg.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)|nscala.collection.immutable.List.foreach(List.scala:431)|norg.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)|norg.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)|norg.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)|norg.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)|norg.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)|n|nThe currently active SparkContext was created at:|n|n(No active SparkContext.)|n         ' status='ERROR' errorDetails='java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.|nThis stopped SparkContext was created at:|n|norg.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943)|ncom.tccc.dna.synapse.spark.test.SparkSessionTestWrapper.spark(SparkSessionTestWrapper.scala:23)|ncom.tccc.dna.synapse.spark.test.SparkSessionTestWrapper.spark$(SparkSessionTestWrapper.scala:10)|ncom.tccc.dna.synapse.spark.test.SparkTestBaseSpec.spark$lzycompute(SparkTestBaseSpec.scala:12)|ncom.tccc.dna.synapse.spark.test.SparkTestBaseSpec.spark(SparkTestBaseSpec.scala:12)|ncom.tccc.dna.synapse.spark.test.SparkTestBaseSpec.beforeAll(SparkTestBaseSpec.scala:18)|nCustomerSatisfactionSparkAppTest.beforeAll(CustomerSatisfactionSparkAppTest.scala:17)|norg.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)|norg.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)|norg.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)|ncom.tccc.dna.synapse.spark.test.SparkTestBaseSpec.run(SparkTestBaseSpec.scala:12)|norg.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)|norg.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)|norg.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)|nscala.collection.immutable.List.foreach(List.scala:431)|norg.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)|norg.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)|norg.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)|norg.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)|norg.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)|n|nThe currently active SparkContext was created at:|n|n(No active SparkContext.)|n         |n   at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:118)|n at org.apache.spark.SparkContext.defaultParallelism(SparkContext.scala:2510)|n  at org.apache.spark.SparkContext.defaultMinPartitions(SparkContext.scala:2519)|n    at org.apache.spark.sql.execution.datasources.json.MultiLineJsonDataSource$.createBaseRdd(JsonDataSource.scala:188)|n   at org.apache.spark.sql.execution.datasources.json.MultiLineJsonDataSource$.infer(JsonDataSource.scala:161)|n   at org.apache.spark.sql.execution.datasources.json.JsonDataSource.inferSchema(JsonDataSource.scala:65)|n    at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.inferSchema(JsonFileFormat.scala:59)|n    at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:210)|n at scala.Option.orElse(Option.scala:447)|n  at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:207)|n at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:411)|n    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:274)|n   at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:245)|n    at scala.Option.getOrElse(Option.scala:189)|n   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:245)|n   at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:405)|n   at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:386)|n   at com.tccc.dna.supplychain.cs.files.JsonFile$.readFile(JsonFile.scala:13)|n    at com.tccc.dna.supplychain.cs.init.ApplicationConfig$.loadConfig(ApplicationConfig.scala:18)|n at MetaDataTest.beforeAll(MetaDataTest.scala:28)|n  at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)|n at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)|n   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)|n  at com.tccc.dna.synapse.spark.test.SparkTestBaseSpec.run(SparkTestBaseSpec.scala:12)|n  at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)|n  at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)|n   at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)|n   at scala.collection.immutable.List.foreach(List.scala:431)|n    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)|n   at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)|n    at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)|n    at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)|n  at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)|n    at org.scalatest.tools.Runner$.run(Runner.scala:798)|n  at org.scalatest.tools.Runner.run(Runner.scala)|n   at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43)|n at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26)|n']
1

There are 1 best solutions below

3
Gastón Schabas On

I think the problem is when you call getOrCreate from SparkSession.Builder.

trait SparkSessionTestWrapper {
  lazy val spark: SparkSession = 
    SparkSession
      .builder()
      .getOrCreate() // <- here
}

If you look at the scaladoc says

    /**
     * Gets an existing [[SparkSession]] or, if there is no existing one, creates a new
     * one based on the options set in this builder.
     *
     * This method first checks whether there is a valid thread-local SparkSession,
     * and if yes, return that one. It then checks whether there is a valid global
     * default SparkSession, and if yes, return that one. If no valid global default
     * SparkSession exists, the method creates a new SparkSession and assigns the
     * newly created SparkSession as the global default.
     *
     * In case an existing SparkSession is returned, the non-static config options specified in
     * this builder will be applied to the existing SparkSession.
     *
     * @since 2.0.0
     */

If you want to use a different SparkSession you should use newSession where the docs says

  /**
   * Start a new session with isolated SQL configurations, temporary tables, registered
   * functions are isolated, but sharing the underlying `SparkContext` and cached data.
   *
   * @note Other than the `SparkContext`, all shared state is initialized lazily.
   * This method will force the initialization of the shared state to ensure that parent
   * and child sessions are set up with the same shared state. If the underlying catalog
   * implementation is Hive, this will initialize the metastore, which may take some time.
   *
   * @since 2.0.0
   */

your new SparkSessionTestWrapper should look like

trait SparkSessionTestWrapper {
  lazy val spark: SparkSession = 
    SparkSession
      .builder()
      .newSession()
}

Doing that, your suite tests should use a different session.