Error-Queries with streaming sources must be executed with writeStream.start();; kafka

487 Views Asked by At

I was trying is to handle real-time data streaming of kafka using pyspark. I got a table that get updated realtime . whenever there is a content on the table, i need to aggregate it and stream the count to another consumer. While I tried to do it I am getting an error defined as below

I went through lot of reference but couldn't find the solution.

Could anyone please help me resolve it.

My major reference was from Handling real-time Kafka data streams using PySpark

def func_count(df):
    dic_new = {}
    rows_count = df.count()
    if rows_count != 0:
        df_count = df.filter(df.c == 1)\
                     .groupBy('a','b').count('c').alias('count')
             
        print("row count:",rows_count)
        dic_new[df['a']] = df_count.to_dict(orient='records')
    return df_count, rows_count

selected_col = df_predict.select('a', 'b','c')

result, rows_count = func_count(selected_col)


result_1 = result.selectExpr(
            "CAST(a AS STRING)",
            "CAST(b AS STRING)",
            "CAST(count AS STRING)",
        )\
        .withColumn("value", to_json(struct("*")).cast("string"),)


result2_1 = result_1 \
        .select("value") \
        .writeStream \
        .trigger(processingTime="5 seconds") \
        .outputMode("complete") \
        .format("kafka") \
        .option("topic", "send_data") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .start() \
        .awaitTermination()

ERROR:
---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<ipython-input-96-c68859698611> in <module>
     15 selected_col = df_predict.select('a', 'b','c')
     16 
---> 17 result, rows_count = func_count(selected_col)
     18 
     19 

<ipython-input-96-c68859698611> in func_count(df)
      4 def func_count(df):
      5     dic_new = {}
----> 6     rows_count = df.count()
      7     if rows_count != 0:
      8         df_count = df.filter(df.c == 1)\\

~/.local/lib/python3.8/site-packages/pyspark/sql/dataframe.py in count(self)
    583         2
    584         """
--> 585         return int(self._jdf.count())
    586 
    587     @ignore_unicode_prefix

~/.local/lib/python3.8/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

~/.local/lib/python3.8/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
    135                 # Hide where the exception came from that shows a non-Pythonic
    136                 # JVM exception message.
--> 137                 raise_from(converted)
    138             else:
    139                 raise

~/.local/lib/python3.8/site-packages/pyspark/sql/utils.py in raise_from(e)

AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
1

There are 1 best solutions below

0
James On

So, this dataframe (per the error message) is from a streaming source: selected_col = df_predict.select('a', 'b', 'c')

When you call: result, rows_count = func_count(selected_col)

You in turn call: df.count() on the resulting dataframe.

Per: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations

count() - Cannot return a single count from a streaming Dataset. Instead, use ds.groupBy().count() which returns a streaming Dataset containing a running count.

It's also a little unclear what you're really trying to accomplish, since you're doing a lot of stuff to create a dic_new and get a rows_count that you don't appear to use.

It seems like instead of: result, rows_count = func_count(selected_col)

You could just be calling: df_count = selected_col.filter(selected_col.c == 1).groupBy('a','b').count('c').alias('count')

It's a bit unclear without context why you're checking if num rows is zero in the first place.