How to create an uninitialised Dataframe variable in scala. So that same variable can be initialized in if else condition

541 Views Asked by At

I need to create an uninitialized Dataframe variable. So that post initializing the value in it, I can add it into the Seq

var df: org.apache.spark.sql.DataFrame = spark.emptyDataFrame

queries.foreach(q=>{
    var view_name = q._1
    var sourceType = q._2
    var query = q._3
    var df: org.apache.spark.sql.DataFrame = spark.emptyDataFrame

    if(sourceType == "sqlserver"){
        df = jdbcConn.option("query", query).load()
    }else if(sourceType == "mongodb"){
        var connectionString = connectionInt.setCollection(view_name);
        df = spark.read.format("com.mongodb.spark.sql.DefaultSource").
                                option("spark.mongodb.input.partitioner", "MongoSinglePartitioner").
                                option("uri", connectionString).    
                                load();
    }

    df.createOrReplaceTempView(view_name)
    var tup: Tuple2[String, org.apache.spark.sql.DataFrame] = dataframes :+ (view_name, df)
    dataframes = dataframes :+ tup
});

I am getting following error

input.scala:102: error: type mismatch; found : Seq[(String, org.apache.spark.sql.DataFrame)] (which expands to) Seq[(String, org.apache.spark.sql.Dataset[org.apache.spark.sql.Row])] required: (String, org.apache.spark.sql.DataFrame) (which expands to) (String, org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]) var tup: Tuple2[String, org.apache.spark.sql.DataFrame] = dataframes :+ (view_name, df) ^

1

There are 1 best solutions below

1
pasha701 On BEST ANSWER

Using "map" instead of "foreach" looks better:

  val dataframes = queries.map({ case (view_name, sourceType, query) => {
  val df: org.apache.spark.sql.DataFrame =
    if (sourceType == "sqlserver") {
      jdbcConn.option("query", query).load()
    } else if (sourceType == "mongodb") {
      var connectionString = connectionInt.setCollection(view_name);
      spark.read.format("com.mongodb.spark.sql.DefaultSource").
        option("spark.mongodb.input.partitioner", "MongoSinglePartitioner").
        option("uri", connectionString).
        load();
    }
    else {
      spark.emptyDataFrame
    }

  df.createOrReplaceTempView(view_name)
  (view_name, df)
}
});