Databricks Autoloader is getting stuck and does not pass to the next batch

2.5k Views Asked by At

I have a simple job scheduled every 5 min. Basically it listens to cloudfiles on storage account and writes them into delta table, extremely simple. The code is something like this:

df = (spark
  .readStream
  .format("cloudFiles")
  .option('cloudFiles.format', 'json')
  .load(input_path, schema = my_schema)
  .select(cols)
  .writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", f"{output_path}/_checkpoint")
  .trigger(once = True)
  .start(output_path))

Sometimes there are new files, sometimes not. After 40-60 batches it gets stuck on one particular batchId, as if there are no new files in the folder. If i run the script manually i get the same result: it points to the last actually processed batch.

{
  "id" : "xxx,
  "runId" : "xxx",
  "name" : null,
  "timestamp" : "2022-01-13T15:25:07.512Z",
  "batchId" : 64,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 663,
    "triggerExecution" : 1183
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "CloudFilesSource[/mnt/source/]",
    "startOffset" : {
      "seqNum" : 385,
      "sourceVersion" : 1,
      "lastBackfillStartTimeMs" : 1641982820801,
      "lastBackfillFinishTimeMs" : 1641982823560
    },
    "endOffset" : {
      "seqNum" : 385,
      "sourceVersion" : 1,
      "lastBackfillStartTimeMs" : 1641982820801,
      "lastBackfillFinishTimeMs" : 1641982823560
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[/mnt/db/table_name]",
    "numOutputRows" : -1
  }
}

But if I run only the readStream part - it correctly reads the entire list of files ( and starts a new batchId: 0 ). The strangest part is: I have absolutely no Idea what causes it and why it takes around 40-60 batches to get this kind of error. Can anyone help? Or give me some suggestion? I was thinking about using ForeachBatch() to append new data. Or using trigger .trigger(continuous='5 minutes')

I'm new to AutoLoader

Thank you so much!

1

There are 1 best solutions below

0
Maksym On

I resolved it by using

.option('cloudFiles.useIncrementalListing', 'false')

My filenames are composed of flowname + timestamp, like this:

flow_name_2022-01-18T14-19-50.018Z.json So my guess is: some combination of dots make the rocksdb go into non-existing directory, that's why the it reports that "found no new files". Once I disabled incremental listing rocksdb stopped making its mini checkpoints based on filenames and now reads the whole directory. This is the only explanation that I have. If anyone is having the same issue try changing the filename