I was trying is to handle real-time data streaming of kafka using pyspark. I got a table that get updated realtime . whenever there is a content on the table, i need to aggregate it and stream the count to another consumer. While I tried to do it I am getting an error defined as below
I went through lot of reference but couldn't find the solution.
Could anyone please help me resolve it.
My major reference was from Handling real-time Kafka data streams using PySpark
def func_count(df):
dic_new = {}
rows_count = df.count()
if rows_count != 0:
df_count = df.filter(df.c == 1)\
.groupBy('a','b').count('c').alias('count')
print("row count:",rows_count)
dic_new[df['a']] = df_count.to_dict(orient='records')
return df_count, rows_count
selected_col = df_predict.select('a', 'b','c')
result, rows_count = func_count(selected_col)
result_1 = result.selectExpr(
"CAST(a AS STRING)",
"CAST(b AS STRING)",
"CAST(count AS STRING)",
)\
.withColumn("value", to_json(struct("*")).cast("string"),)
result2_1 = result_1 \
.select("value") \
.writeStream \
.trigger(processingTime="5 seconds") \
.outputMode("complete") \
.format("kafka") \
.option("topic", "send_data") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.start() \
.awaitTermination()
ERROR:
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
<ipython-input-96-c68859698611> in <module>
15 selected_col = df_predict.select('a', 'b','c')
16
---> 17 result, rows_count = func_count(selected_col)
18
19
<ipython-input-96-c68859698611> in func_count(df)
4 def func_count(df):
5 dic_new = {}
----> 6 rows_count = df.count()
7 if rows_count != 0:
8 df_count = df.filter(df.c == 1)\\
~/.local/lib/python3.8/site-packages/pyspark/sql/dataframe.py in count(self)
583 2
584 """
--> 585 return int(self._jdf.count())
586
587 @ignore_unicode_prefix
~/.local/lib/python3.8/site-packages/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
~/.local/lib/python3.8/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
135 # Hide where the exception came from that shows a non-Pythonic
136 # JVM exception message.
--> 137 raise_from(converted)
138 else:
139 raise
~/.local/lib/python3.8/site-packages/pyspark/sql/utils.py in raise_from(e)
AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
So, this dataframe (per the error message) is from a streaming source:
selected_col = df_predict.select('a', 'b', 'c')When you call:
result, rows_count = func_count(selected_col)You in turn call:
df.count()on the resulting dataframe.Per: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations
It's also a little unclear what you're really trying to accomplish, since you're doing a lot of stuff to create a
dic_newand get arows_countthat you don't appear to use.It seems like instead of:
result, rows_count = func_count(selected_col)You could just be calling:
df_count = selected_col.filter(selected_col.c == 1).groupBy('a','b').count('c').alias('count')It's a bit unclear without context why you're checking if num rows is zero in the first place.