In a text file, count lines from string 'foo' to first empty line afterwards. Raise exception if 'foo' not found

168 Views Asked by At

Background: I want to read some data from a text file, into a polars dataframe. The data starts at the line containing the string foo, and stops at the first empty line afterwards. Example file test.txt:

stuff to skip
more stuff to skip


skip me too

foo bar foobar
1   2   A
4   5   B
7   8   C


other stuff
stuff

pl.read_csv has args skip_rows and n_rows. Thus, if I can find the line number of foo and the line number of the first empty line afterwards, I should be able to read the data into a polars dataframe. How can I do that? I'm able to find skip_rows:

from pathlib import Path

file_path = Path('test.txt')

with open(file_path, 'r') as file:
    skip_rows = 0
    n_rows = 0
    for line_number, line in enumerate(file, 1):
        if 'foo' in line:
            skip_rows = line_number - 1

But how can I find also n_rows without scanning the file again? Also, the solution must handle the case when there's no line containing foo, e.g.

stuff to skip
more stuff to skip


skip me too

1   2   A
4   5   B
7   8   C


other stuff
stuff

In that case, I would like to either return a value indicating that foo was not found, or raise an exception so that the caller knows something went wrong (maybe a ValueError exception?).

EDIT: I forgot an edge case. Sometimes the data may continue until the end of the file:

stuff to skip
more stuff to skip


skip me too

foo bar foobar
1   2   A
4   5   B
7   8   C
5

There are 5 best solutions below

4
Andrej Kesely On

You can try:

start, end = None, None
with open("your_file.txt", "r") as f_in:
    for line_no, line in enumerate(map(str.strip, f_in)):
        if line.startswith("foo"):  # or use `if "foo" in line:`
            start = line_no
        elif start is not None and line == "":
            end = line_no
            break
    else:
        # no break, but we found `foo`
        if start is not None:
            end = line_no
        else:
            print("foo not found!")

if start is not None:
    print(f"{start=} {end=}")

Prints (with the first input from your question):

start=6 end=10
3
OysterShucker On

Here is a possible solution. This solution covers some edge cases.

  1. it will not find "foo" in "foobar"
  2. it will find "Foo", "fOO", "FOO", etc
  3. it stops at the first empty line or the end of the file, whichever is first
try:
    with open('test.txt', 'r') as lines:
        for row, line in enumerate(lines):
            # maybe "foo" is present but mixed or uppercase
            # split on space so we find exactly "foo" and not "foo" in "footage"
            if 'foo' in line.lower().split(' '):
                for n_row, ln in enumerate(lines, row+1):
                    if not ln.strip(): 
                        break
                else:
                    # end of file, this line doesn't actually exist
                    # which is fine if you use this number for `stop` with range or splice
                    n_row += 1
                break
        else:
            raise Exception
except:
    print("foo was not found")
else:
    print(row, n_row) #6, 10

You may want to consider that, getting row numbers will likely mean that you need to go over the data a second time. With very little modification you can simply get the data, and still get the row numbers.

try:
    with open('test.txt', 'r') as lines:
        for row, line in enumerate(lines):
            if 'foo' in line.lower().split(' '):
                data = [line.strip()]
                for n_row, ln in enumerate(lines, row+1):
                    if not (line := ln.strip()): 
                        break
                    data.append(line)
                else:
                    n_row += 1
                break
        else:
            raise Exception
except:
    print("foo was not found")
else:
    print(row, n_row)
    print(*data, sep='\n')
output
6 10
foo bar foobar
1   2   A
4   5   B
7   8   C

Here's a version that skips all the row humbug and gets straight to parsing the data, and turning it into a dataframe.

import polars
from io import StringIO

try:
    with open('test.txt', 'r') as lines:
        for line in lines:
            if 'foo' in line.lower().split(' '):
                data = line
                for ln in lines:
                    if not ln.strip(): break
                    data += ln
                break
        else:
            raise Exception
except:
    print("foo was not found")
else:
    print(polars.read_csv(StringIO(data)))
output
shape: (3, 1)
┌────────────────┐
│ foo bar foobar │
│ ---            │
│ str            │
╞════════════════╡
│ 1   2   A      │
│ 4   5   B      │
│ 7   8   C      │
└────────────────┘
2
jqurious On

next() with a Generator Expression can be a useful pattern for this.

If no value is produced then next() will raise a StopIteration exception which you can catch and report to the caller.

with open("test.txt") as f:
    f = enumerate(f)
    
    try: 
        skip_rows = next(n for n, line in f if "foo" in line)
        
    except StopIteration:
        raise ValueError("Start line not found.")
        
    for n, line in f:
        if line.strip() == "":
            n -= 1
            break
            
    n_rows = n - skip_rows
    
    print(f"{skip_rows=}")
    print(f"{n_rows=}")
skip_rows=6
n_rows=3
0
Dean MacGregor On

Here are two all polars solutions where each relies on the fixed length delimination to distinguish what is data or not.

Scan for everything approach

csv_scan = (
    pl.scan_csv("ex.txt", 
                separator="\0", # null character as sep so just one column
                has_header=False, 
                schema={"a": pl.String} # name column here
                )
    .filter(pl.col('a').is_not_null()) 
)


dfdata = (
    csv_scan
    # this is kind of like a row id but only increments at unique values
    .group_by(z=pl.col("a").str.len_chars().rle_id(), maintain_order=True)
    .agg("a", len=pl.len())
    # Assume that data rows have the same string length, that the data
    # rows will be continuous, and that the meta rows won't have as many
    # same str length rows in a row as the data
    .with_columns(z1=pl.col("z").filter(pl.col("len").max() == pl.col("len")))
    .filter(
        (pl.col("z") == pl.col('z1'))
        | (pl.col("z") + 1 == pl.col('z1'))
    )
    # extract all based on \S regex, can't use extract_groups b/c we
    # don't know how many proper columns there are
    .select(
        pl.col("a")
        .explode()
        .str.extract_all(r"(\S)+")
        .list.to_struct()
    )
    .unnest("a")
    .collect()
    .pipe(
        lambda df: (
            # promote 0th row to header and discard it from data itself
            df[1:].rename(
                {f"field_{x}": y 
                 for x, y in enumerate(next(df.iter_rows()))})
        )
    )
    # TODO: cast columns to intended datatypes (see below)
)

Scan for offsets approach then reread

csv_scan = pl.scan_csv(
    "ex.txt",
    separator="\0",  # null character as sep so just one column
    has_header=False,
    schema={"a": pl.String},  # name column here
)

offsets = (
    csv_scan.with_columns(i=pl.int_range(pl.len()))
    .group_by(z=pl.col("a").str.len_chars().rle_id(), maintain_order=True)
    .agg("a", "i", len=pl.len())
    .filter(
        (pl.col("z") == pl.col("z").filter(pl.col("len").max() == pl.col("len")))
        | (pl.col("z") + 1 == pl.col("z").filter(pl.col("len").max() == pl.col("len")))
    )
    .collect()
)

df = pl.read_csv(
    "ex.txt",
    skip_rows=offsets["i"][0][0] + 1,
    has_header=False,
    separator=" ",
    n_rows=offsets["len"][-1],
).pipe(lambda df: (
    df.select(
        df.melt()
        .filter(pl.col('value').is_not_null())
        .get_column('variable')
        .unique(maintain_order=True)
        .to_list()
        )
))
df.columns=offsets['a'][0].str.replace_all("(\s+)", " ").str.split(" ")[0]

The second approach would have the benefit that read_csv can infer the data types but when I run it, the foo column is still being inferred as strings maybe the first approach is best with manual dtype casting.

Try to cast strings to numerics

for col in dfdata.columns:
    try:
        dfdata=dfdata.with_columns(pl.col(col).cast(pl.Int64))
        continue
    except:
        pass
    try:
        dfdata=dfdata.with_columns(pl.col(col).cast(pl.Float64))
        continue
    except:
        pass
0
ΩΠΟΚΕΚΡΥΜΜΕΝΟΣ On

Here's a simple approach:

  1. read the csv as a one-column csv file
  2. use Polars string matching to find the offsets
  3. use write_csv to write the correct slice of the dataframe to a StringIO buffer
  4. use read_csv with the StringIO buffer as the source to parse your csv as needed.
def read_received_csv(path_to_file: str) -> DataFrame | None:
    """
    Read CSV file from the first occurrence of 'foo' until the next blank line or end of file.

    If 'foo' is not found, return None.
    """
    df_initial_read = pl.read_csv(
        path_to_file,
        has_header=False,
        separator="\0",  # no separator
        row_index_name="index",
        missing_utf8_is_empty_string=True,
        new_columns=["index", "unparsed_string"],
    )

    # Find the offset of the first occurrence of "foo".
    df_foo = df_initial_read.filter(pl.col("unparsed_string").str.contains(r"foo"))
    if df_foo.is_empty():
        return None

    slice_offset = df_foo.get_column("index").item(0)

    # Starting with the offset above, find the first blank line that follows "foo", if any
    df_next_blank = df_initial_read.slice(slice_offset).filter(
        pl.col("unparsed_string").str.strip_chars() == ""
    )

    if df_next_blank.is_empty():
        slice_length = None
    else:
        slice_length = df_next_blank.get_column("index").item(0) - slice_offset

    # Take a "slice" of the initial unparsed csv, write it out to a string buffer, and then
    # allow Polars to parse the string buffer per usual.
    return pl.read_csv(
        StringIO(
            df_initial_read.select("unparsed_string")
            .slice(slice_offset, slice_length)
            .write_csv(include_header=False)
        ),
        separator="\t",
    )

For your three cases above, this returns: (with foo and a blank line, and assuming the tab character as your delimiter):

>>> read_received_csv("data/raw/so_78196632/test1.txt")
shape: (3, 3)
 foo  bar  foobar 
 ---  ---  ---    
 i64  i64  str    
 1    2    A      
 4    5    B      
 7    8    C    

(no occurrence of foo)

>>> read_received_csv("data/raw/so_78196632/test2.txt")

(foo occurs, but no blank line follows it)

>>> read_received_csv("data/raw/so_78196632/test3.txt")
shape: (3, 3)
 foo  bar  foobar 
 ---  ---  ---    
 i64  i64  str    
 1    2    A      
 4    5    B      
 7    8    C      

It's a relatively simple approach. Perhaps not appropriate for very large files (as it reads the entire contents of the file into RAM).