I tried creating ElasticSearch embedded nodes in my spark 3 job using the following (pseudo) code:
import org.elasticsearch.node.Node;
public class ESNode extends Node {
public static createNode(String randomDataPath) {
//create embedded node using randomly generated temp data path set as Environment.PATH_HOME_SETTING
}
}
Dataset<MyType> ds;
ds.mapPartitions(part -> {
ESNode.createNode(randomDataPath);
//Do other stuff with ES node
})
If each executor is allocated 1 core only, i.e. only one task will be running on each executor at a time, the above code runs successfully. If each executor is allocated more than one core, multiple embedded nodes will be started with different data paths (verified using logs), however when they process requests, they seem to be all using the data path of the last node, thus resulting in:
java.lang.IllegalStateException: failed to obtain node locks, tried [[/yarn_container_dir/tmp/data_path_of_last_node]] with lock id [0]; maybe these locations are not writable or multiple nodes were started without increasing [node.max_local_storage_nodes] (was [1])?
I'm wondering if running multiple embedded nodes within the same JVM (i.e. executor) would result in any interference?