I want to read multiple .txt files by bag.read_text and record the file path and the line number of each line for some more processing.
The read_text function has an argument include_path=True which can keep the path, but how can I get the line number? Will the read_text() keep the line order after reading files?
def add_line_number(element):
line, path = element
# how to get the line number?
line_index = ...
return line, path, line_index
b = db.read_text([file1, file2, ...], blocksize='10 MiB', include_path)
b = b.map(add_line_number).compute()
# expect b to be: [('line 1', 'file path', ith line in that file), ...]
Edit2
Thanks for your help. Here are further questions.
Does the read_text guarantee to preserve line order in each partition when assigning files_per_partition=1?
In the source code, file seems to be read line by line
with OpenFile as f:
for line in f:
yield line or (line, path)
Will the parallelization affect it's sequence or this job will only be executed by one worker?
What you want is not really possible. When dask splits up a large file, it reads from arbitrary offsets (10MB in your example) and assumes that the next newline character is marks a new line. Thus, it cannot know, for this chunks (which gets processed with the previous chunks in parallel), how many lines preceded it.
You can easily enough enumerate the lines of a given chunks (with
.map_partitions(), not.map()), and you can find the number of lines in all chunks, but not in a single pass. To find the number of lines per chunk: