I am trying to run below code with Delta Live Tables.
@dlt.view
def data():
return (spark.readStream
.option("skipChangeCommits", "true")
.format("delta")
.table("table")
.withColumnRenamed("col","col_name")
)
schema = StructType([
StructField("col_name", StringType(), True),
])
dlt.create_streaming_table(
name="table",
spark_conf={},
table_properties={"quality":"bronze"},
partition_cols=["col"],
schema=schema
)
dlt.apply_changes(
target="table",
source="data",
keys=["userId"],
sequence_by=col("col_name"),
stored_as_scd_type=1
)
While running this I get error
Caused by: org.apache.spark.memory.SparkOutOfMemoryError: Photon ran out of memory while executing this query. Photon failed to reserve 349.6 MiB for hash table var-len key data, in ParquetDictionaryEncoder, in FileWriterNode(id=34323, output_schema=[]), in task.
By increasing the memory allocation per executor and reducing the total number of executors, each executor will have more memory available.
You can do this by setting the
spark.databricks.delta.photon.buffer.maxMemory configurationproperty to a higher value. You can add the following line of code before creating the dlt object:I have tried the below:
Results:
Reference: SO link