I want to use PySpark in order to stream a Kudu table, process the streaming data and stream it back into another Kudu table.
Below is the driver code I've been trying out
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Kudu-stream") \
.getOrCreate()
df = spark \
.readStream \
.format("org.apache.kudu.spark.kudu") \
.option("kudu.master","example:7051") \
.option("kudu.table","db.source_test") \
.load()
def saveToKudu(batchDF, batchID):
batchDF.write \
.format("org.apache.kudu.spark.kudu") \
.option("kudu.master","example:7051") \
.option("kudu.table","db.sink_test") \
.mode("append") \
.save()
query = df \
.writeStream \
.outputMode("append") \
.foreachBatch(saveToKudu) \
.option("checkpointLocation","hdfs://example:8020/path/to/checkpoint/") \
.start()
query.awaitTermination()
This is the error message I've been getting.
py4j.protocol.Py4JJavaError: An error occurred while calling o40.load.
: java.lang.UnsupportedOperationException: Data source org.apache.kudu.spark.kudu does not support streamed reading
Is there a way to readStream directly from a Kudu table?
Extra: is it possible to writeStream to Kudu without using foreach or foreachBatch?