Convert WrappedArray into Dataframe Columns

40 Views Asked by At

I have the structure of the avro File like the one given below and I would like to convert the WrappedArray values into specific columns:

root
 |-- reportId: string (nullable = true)
 |-- row: string (nullable = true)
 |-- col: struct (nullable = true)
 |    |-- fieldRec: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- value: string (nullable = true)

 

  //Reading and selecting specific column from Avro
  val empDF = spark.read.format("avro").load("/PRD/DRV/EMP_DETAILS/fact_date=20220404")
                    .select("col.fieldRec.value")
 //I have selected now the "value" which is having WrappedArray
 empDF.show()

The result looks like:

+------------------------------------+
| value                              |
+------------------------------------+
|WrappedArray(EmployeeId,Name,salary)|
|WrappedArray(100,James,300000)      |
|WrappedArray(101,John,400000)       |
|WrappedArray(102,Fern,350000)       |
|WrappedArray(103,Philip,310000)     |
+------------------------------------+

Now, I am changing the WrappedArray values getting populated in each columns:

import scala.collection.mutable.ArrayBuffer

//Defining Case Class
case class Employee(employeeId: String, name: String, salary: String)

//Reading and selecting specific column from DataFrame
val empDF = spark.read.format("avro").load("/PRD/DRV/EMP_DETAILS/fact_date=20220404")
                .select("col.fieldRec.value")

//Select the First record from the DataFrame                
val firstRec = empDF.first 

//Filtering out the first record
val empDFMinus = empDF.filter(row => row != firstRec)

//Identifying the size of the Array which could be used at the later stage
val arrayLength = empDF.select(size(col("value"))).first.get(0).toString.toInt - 1

import spark.implicits._

//Converting WrappedArray to a structure which I would like to represent
val empDFCols = empDFMinus.as[ArrayBuffer[String]].map(arr => Employee(arr(0),arr(1),arr(2)))

empDFCols.printSchema()

empDFCols.show()

root
 |-- employeeId: string (nullable = true)
 |-- name: string (nullable = true)
 |-- salary  : string (nullable = true)
 
+----------------------------+ 
|employeeId | name | salary  |
+----------------------------+
|100        |James | 300000  |
|101        |John  | 400000  |
|102        |Fern  | 350000  |
|103        |Philip| 310000  |
+----------------------------+

Could you please let me know of any other better way to do this, please?

0

There are 0 best solutions below