Delta table returns 0 records for spark streaming job on Databricks job

31 Views Asked by At

I am trying to run a Structured Spark Streaming job on Databricks Spark Cluster. I have created 2 notebooks. First notebook contains the code for the Spark streaming job and second notebook contains test suite to test first notebook code.

When I try to test to number of records in the delta table, it always contain 0 records. I am consuming 2 json files containing 500 and 501 records, but the delta table does not have that data. Checkpoint location also shows 0. The code is as below:

Notebook 1 : streaming-batch

class invoiceStreamBatch():
  def __init__(self):
    self.base_data_dir = "/FileStore/sample_data"


  def getSchema(self):
    return """InvoiceNumber string, CreatedTime bigint, StoreID string, PosID string, CashierID string,
                CustomerType string, CustomerCardNo string, TotalAmount double, NumberOfItems bigint, 
                PaymentMethod string, TaxableAmount double, CGST double, SGST double, CESS double, 
                DeliveryType string,
                DeliveryAddress struct<AddressLine string, City string, ContactNumber string, PinCode string, 
                State string>,
                InvoiceLineItems array<struct<ItemCode string, ItemDescription string, 
                    ItemPrice double, ItemQty bigint, TotalValue double>>
            """

  def readInvoices(self):
    return (spark.readStream
                 .format("json")
                 .schema(self.getSchema())
                 .load("f{self.base_data_dir}/data/invoices}")
           )

  def explodeInvoices(self, invoiceDF):
    return (invoiceDF.selectExpr("InvoiceNumber", "CreatedTime", "StoreID", "PosID",
                                      "CustomerType", "PaymentMethod", "DeliveryType", "DeliveryAddress.City",
                                      "DeliveryAddress.State","DeliveryAddress.PinCode", 
                                      "explode(InvoiceLineItems) as LineItem"))

  def flattenInvoices(self, explodedDF):
    from pyspark.sql.functions import expr
    return( explodedDF.withColumn("ItemCode", expr("LineItem.ItemCode"))
                        .withColumn("ItemDescription", expr("LineItem.ItemDescription"))
                        .withColumn("ItemPrice", expr("LineItem.ItemPrice"))
                        .withColumn("ItemQty", expr("LineItem.ItemQty"))
                        .withColumn("TotalValue", expr("LineItem.TotalValue"))
                        .drop("LineItem")
    )

  def appendInvoices(self, flattenedDF, trigger = "batch"):
    sQuery = (flattenedDF.writeStream
                    .format("delta")
                    .option("checkpointLocation", f"{self.base_data_dir}/chekpoint/invoices")
                    .outputMode("append")
                    .option("maxFilesPerTrigger", 1)                
                )     
    
    if ( trigger == "batch" ):
      return ( sQuery.trigger(availableNow = True)
                     .toTable("invoice_line_items"))
    else:
      print(f"trigger value is {trigger}")
      return ( sQuery.trigger(processingTime = trigger)
                     .toTable("invoice_line_items"))
      
  def printDFData(self, df_name, df):
    row_count = df.count()
    print(f"the total count for dataframe {df_name} is {row_count}")    

  def process(self, trigger = "batch"):
    print(f"Starting Invoice Processing Stream...", end='')
    invoicesDF = self.readInvoices()
    explodedDF = self.explodeInvoices(invoicesDF)
    resultDF = self.flattenInvoices(explodedDF)
    sQuery = self.appendInvoices(resultDF, trigger)
    print("Done\n")
    return sQuery 

Notebook 2: test-suite

Cell 1:

%run ./streaming-batch

Cell 2:

class streamingBatchTestSuite():
  def __init__(self):
    self.base_data_dir = "/FileStore/sample_data"

  def cleanTests(self):
    print(f"Starting Cleanup...", end='')
    spark.sql("drop table if exists invoice_line_items")
    dbutils.fs.rm("/user/hive/warehouse/invoice_line_items", True)
    dbutils.fs.rm(f"{self.base_data_dir}/chekpoint/invoices", True)
    dbutils.fs.rm(f"{self.base_data_dir}/data/invoices", True)
    dbutils.fs.mkdirs(f"{self.base_data_dir}/data/invoices")
    print("Done")

  def ingestData(self, itr):
    print(f"\tStarting Ingestion...", end='')
    dbutils.fs.cp(f"{self.base_data_dir}/datasets/invoices/invoices_{itr}.json", f"{self.base_data_dir}/data/invoices/")
    print("Done") 

  def printIngestedFileData(self, itr):
    df = spark.read.format("json").load(f"dbfs:/FileStore/sample_data/data/invoices/invoices_{itr}.json")
    row_count = df.count()
    print(f"the total count for file with index {itr} is {row_count}")

  def assertResult(self, expected_count):
    print(f"\tStarting validation...", end='')
    actual_count = spark.sql("select count(*) from invoice_line_items").collect()[0][0]
    assert expected_count == actual_count, f"Test failed! actual count is {actual_count}"
    print("Done")

  def waitForMicroBatch(self, sleep=30):
    import time
    print(f"\tWaiting for {sleep} seconds...", end='')
    time.sleep(sleep)
    print("Done.")

  def runBatchTests(self):
    self.cleanTests()
    iStream = invoiceStreamBatch()
    print("Testing first batch of invoice stream...") 
    self.ingestData(1)
    self.printIngestedFileData(1)
    self.ingestData(2)
    self.waitForMicroBatch(30)
    self.printIngestedFileData(2)
    iStream.process("batch")
    self.waitForMicroBatch(30)        
    self.assertResult(2506)
    print("Validation passed.\n")

    print("Testing second batch of invoice stream...") 
    self.ingestData(3)
    iStream.process("batch")
    self.waitForMicroBatch(30)        
    self.assertResult(3990)
    print("Validation passed.\n")

  def runStreamTests(self):
    self.cleanTests()
    iStream = invoiceStreamBatch()
    streamQuery = iStream.process("30 seconds")

    print("Testing first iteration of invoice stream...") 
    self.ingestData(1)
    self.waitForMicroBatch()        
    self.assertResult(1249)
    print("Validation passed.\n")

    print("Testing second iteration of invoice stream...") 
    self.ingestData(2)
    self.waitForMicroBatch()
    self.assertResult(2506)
    print("Validation passed.\n") 

    print("Testing third iteration of invoice stream...") 
    self.ingestData(3)
    self.waitForMicroBatch()
    self.assertResult(3990)
    print("Validation passed.\n")

    streamQuery.stop()

Cell 3 :

isTS = streamingBatchTestSuite()
isTS.runStreamTests()
#isTS.runBatchTests()

Output:

Starting Cleanup...Done
Starting Invoice Processing Stream...trigger value is 30 seconds
Done

Testing first iteration of invoice stream...
    Starting Ingestion...Done
    Waiting for 30 seconds...Done.
    Starting validation...Unexpected exception formatting exception. Falling back to standard exception

AssertionError: Test failed! actual count is 0

I get the 0 count in runStreamTests() and runBatchTests() method both. I run them by commenting any one of them at a time.

0

There are 0 best solutions below