I am using HoodieDeltaStreamer to connect kafka and store data to hoodie table Hudi version : 0.10.1 Spark : 3.2.4 Hadoop : 3.3.5
Only one spark-submit job is running
cmd :
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer "file://$HUDI_HOME/docker/hoodie/hadoop/hive_base/target/hoodie-utilities.jar" --continuous --table-type COPY_ON_WRITE --source-class org.apache.hudi.utilities.sources.AvroKafkaSource --source-ordering-field submit_date --target-base-path "hdfs://172.16.0.132:9000/data-lake/raw-zone/tables/temp_cow" --target-table temp_cow --props "file://$HUDI_HOME/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source-Table_May_live_v3.properties" --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider --source-limit 50000 > /data/data-lake/logs/pull_from_kafka_topic1.log 2>&1 &
StackTrace :
2023-06-27 12:23:37,238 INFO sources.AvroKafkaSource: About to read 0 from Kafka for topic :Table_May_live_v3
2023-06-27 12:23:37,238 INFO deltastreamer.DeltaSync: No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=(Option{val=Table_May_live_v3,0:200000}). New Checkpoint=(TempLMSTable_May_live_v3,0:200000)
2023-06-27 12:23:37,240 ERROR deltastreamer.HoodieDeltaStreamer: Shutting down delta-sync due to exception
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:494)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1729)
at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1752)
at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1749)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1764)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1760)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.refreshTimeline(DeltaSync.java:244)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:288)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:640)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
A "
Filesystem closed" typically means the Hadoop FileSystem instance being used by your application was closed while the application was still trying to use it.You can see an example of this error with
apache/hudiissue 227That particular error was supposed to be fixed with PR 620, Hudi 0.5.2+.
In general, try and avoid sharing
FileSysteminstances between threads: If possible, each thread should have its ownFileSysteminstance. This can help avoid concurrency issues.If your application detects that the
FileSysteminstance has been closed (e.g., by catching theIOException), it could try to recreate it.If you're using Java, consider using try-with-resources or similar constructs to ensure that resources are cleaned up properly.