Create a dataframe out of dbutils.fs.ls output in Databricks

11k Views Asked by At

So, I'm a beginner and learning spark programming (pyspark) on Databricks -

What am I trying to do ?

List all the files in a directory and save it into a dataframe so that I am able to apply filter, sort etc on this list of files. Why ? Because I am trying to find the biggest file in my directory.

Why doesn't below work ? What am I missing ?

from pyspark.sql.types import StringType

sklist = dbutils.fs.ls(sourceFile)

df = spark.createDataFrame(sklist,StringType())
4

There are 4 best solutions below

0
skrprince On

ok, actually, I figured it out :). Just wanna leave the question here incase some one benefits from it.

So basically, the problem was with the schema. Not all the elements in the list was of String Type. So I explicitly created a schema and used it in createDataFrame function.

Working code -

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

ddlSchema = StructType([
StructField('path',StringType()),
StructField('name',StringType()),
StructField('size',IntegerType())
])

sklist = dbutils.fs.ls(sourceFile)
df = spark.createDataFrame(sklist,ddlSchema)
0
OlegK On

Updating the answer by @skrprince. The schema has a new field "modtime" now that uses Unix epoch values. It's best to use LongType() for both size and modtime, since IntegerType will fail on larger values.

fslsSchema = StructType(
  [
    StructField('path', StringType()),
    StructField('name', StringType()),
    StructField('size', LongType()),
    StructField('modtime', LongType())
  ]
)

filelist = dbutils.fs.ls('<your path here>')
df_files = spark.createDataFrame(filelist, fslsSchema)

You can also create a temporary view to execute SQL queries against your dataframe data:

df_files.createTempView("files_view")

Then you can run queries in the same notebook like the example below:

%sql
SELECT name, size, modtime
FROM files_view
WHERE name LIKE '<your file pattern>%.parq'
ORDER BY modtime
0
Henrique Florencio On

You don't need to set the schema:

df = spark.createDataFrame(dbutils.fs.ls(sourceFile))
0
Julaayi On

Adding complete code based on @OlegK code since it was throwing me errors when ran.

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType
#------
root_dir = 'abfss://myPath'

fslsSchema = StructType(
    [
        StructField('path', StringType()),
        StructField('name', StringType()),
        StructField('size', LongType()),
        StructField('modtime', LongType())
    ]
)

filelist = dbutils.fs.ls(root_dir)
df_files = spark.createDataFrame(filelist, fslsSchema)

df_files.createOrReplaceTempView("files_view") #Replacing view so that I can run as many times as needed

And then one additional thing in the SQL.

SELECT REPLACE(name, '/', ''), size, modtime
FROM files_view
WHERE name LIKE '<your file pattern>%.parq'
ORDER BY modtime;