I am quite new to the Snakemake practice. I would like to process a set of files in my Snakemake workflow of which I don't know in advance the file name or the number of input files. The input files are obtained with a function that accesses some remote files based on some parameters such as:
data = dataset(datatype=datatype, eventtype=eventtype, polarity=polarity)
data is a list of locations of remote files that I'd like to process but of which I don't know in advance the length or the filename since they depend on the parameters passed to the function dataset. The workflow should be such that the data is remotely accessed, processed locally and then finally merged.
I tried using checkpoints based on the example that I found here however I am failing to understand how to adapt it to my needs. Here is what I tried:
rule get_ap_loc:
output:
# data = dataset(datatype="2016", eventtype="11104043", polarity="magup")
data = ["path1", "path2", "path3"]
checkpoint split_data:
input:
data_loc = rules.get_ap_loc.output.data
output:
outdir = directory("temp/split_jobs/")
run:
pathlib.Path(output.outdir).mkdir(exist_ok=True)
subjob_numbers=[]
for subjob_number, remotepath in enumerate(input.data_loc):
with open(os.path.join(f'{output.outdir}',f'{subjob_number}.txt'), "w") as file:
file.write(remotepath)
subjob_numbers.append(subjob_number)
#processes the ith root subjob
rule process_job:
input:
subjob = "temp/split_jobs/{subjob_number}.txt"
output:
processed = "temp/split_jobs/{subjob_number}.root"
run:
stuff = pathlib.Path(output.processed)
stuff.parent.mkdir(exist_ok=True)
stuff.touch()
#collects paths to all root subjobs
def get_all_subjobs(wildcards):
subjob_dir = checkpoints.split_data.get(**wildcards).output[0]
subjob_numbers = glob_wildcards(f"{subjob_dir}/{{subjob_number}}.txt").subjob_number
all_subjobs = expand(rules.process_job.output.bam, **wildcards, subjob_number=subjob_numbers)
return all_subjobs
#will hadd root subjobs
rule merge_subjobs:
input:
get_all_subjobs
output:
hadded = "temp/merge_root/hadded.root"
shell:
"cat {input} > {output.hadded}"
Are checkpoints supposed to deal with these cases?
Thanks