How to read the latest line from the csv file using ReadLineAsync method?

72 Views Asked by At

This is my code to read the csv file asynchronoulsy using ReadLineAsync() function from the StreamReader class but it reads first line only of the csv file

    private async Task ReadAndSendJointDataFromCSVFileAsync(CancellationToken cancellationToken) {
     Stopwatch sw = new Stopwatch();
     sw.Start();

     string filePath = @ "/home/adwait/azure-iot-sdk-csharp/iothub/device/samples/solutions/PnpDeviceSamples/Robot/Data/Robots_data.csv";

     using(StreamReader oStreamReader = new StreamReader(File.OpenRead(filePath))) {
       string sFileLine = await oStreamReader.ReadLineAsync();

       string[] jointDataArray = sFileLine.Split(',');

       // Assuming the joint data is processed in parallel
       var tasks = new List < Task > ();

       // Process joint pose
       tasks.Add(Task.Run(async () => {
         var jointPose = jointDataArray.Take(7).Select(Convert.ToSingle).ToArray();
         var jointPoseJson = JsonSerializer.Serialize(jointPose);
         await SendTelemetryAsync("JointPose", jointPoseJson, cancellationToken);
       }));

       // Process joint velocity
       tasks.Add(Task.Run(async () => {
         var jointVelocity = jointDataArray.Skip(7).Take(7).Select(Convert.ToSingle).ToArray();
         var jointVelocityJson = JsonSerializer.Serialize(jointVelocity);
         await SendTelemetryAsync("JointVelocity", jointVelocityJson, cancellationToken);
       }));

       // Process joint acceleration
       tasks.Add(Task.Run(async () => {
         var jointAcceleration = jointDataArray.Skip(14).Take(7).Select(Convert.ToSingle).ToArray();
         var jointAccelerationJson = JsonSerializer.Serialize(jointAcceleration);
         await SendTelemetryAsync("JointAcceleration", jointAccelerationJson, cancellationToken);
       }));

       // Process external wrench
       tasks.Add(Task.Run(async () => {
         var externalWrench = jointDataArray.Skip(21).Take(6).Select(Convert.ToSingle).ToArray();
         var externalWrenchJson = JsonSerializer.Serialize(externalWrench);
         await SendTelemetryAsync("ExternalWrench", externalWrenchJson, cancellationToken);
       }));

       await Task.WhenAll(tasks);
     }

     sw.Stop();
     _logger.LogDebug(String.Format("Elapsed={0}", sw.Elapsed));
   }

Basically, the csv file has 10128 lines. I want to read the latest line which gets added to the csv file.

How do I do it?

Using File.ReadLine(filePath) throws this exception

Unhandled exception. System.IO.PathTooLongException: The path '/home/adwait/azure-iot-sdk-csharp/iothub/device/samples/solutions/PnpDeviceSamples/Robot/-2.27625e-06,-0.78542,-3.79241e-06,-2.35622,5.66111e-06,3.14159,0.785408,0.00173646,-0.0015847,0.000962475,-0.00044469,-0.000247682,-0.000270337,0.000704195,0.000477503,0.000466693,-6.50664e-05,0.00112044,-2.47425e-06,0.000445592,-0.000685786,1.21642,-0.853085,-0.586162,-0.357496,-0.688677,0.230229' is too long, or a component of the specified path is too long.

private async Task ReadAndSendJointDataFromCSVFileAsync(CancellationToken cancellationToken) {
  Stopwatch sw = new Stopwatch();
  sw.Start();

  string filePath = @ "/home/adwait/azure-iot-sdk-csharp/iothub/device/samples/solutions/PnpDeviceSamples/Robot/Data/Robots_data.csv";

  using(StreamReader oStreamReader = new StreamReader(File.ReadLines(filePath).Last())) {
    string sFileLine = await oStreamReader.ReadLineAsync();

    string[] jointDataArray = sFileLine.Split(',');

    // Assuming the joint data is processed in parallel
    var tasks = new List < Task > ();

    // Process joint pose
    tasks.Add(Task.Run(async () => {
      var jointPose = jointDataArray.Take(7).Select(Convert.ToSingle).ToArray();
      var jointPoseJson = JsonSerializer.Serialize(jointPose);
      await SendTelemetryAsync("JointPose", jointPoseJson, cancellationToken);
    }));

    // Process joint velocity
    tasks.Add(Task.Run(async () => {
      var jointVelocity = jointDataArray.Skip(7).Take(7).Select(Convert.ToSingle).ToArray();
      var jointVelocityJson = JsonSerializer.Serialize(jointVelocity);
      await SendTelemetryAsync("JointVelocity", jointVelocityJson, cancellationToken);
    }));

    // Process joint acceleration
    tasks.Add(Task.Run(async () => {
      var jointAcceleration = jointDataArray.Skip(14).Take(7).Select(Convert.ToSingle).ToArray();
      var jointAccelerationJson = JsonSerializer.Serialize(jointAcceleration);
      await SendTelemetryAsync("JointAcceleration", jointAccelerationJson, cancellationToken);
    }));

    // Process external wrench
    tasks.Add(Task.Run(async () => {
      var externalWrench = jointDataArray.Skip(21).Take(6).Select(Convert.ToSingle).ToArray();
      var externalWrenchJson = JsonSerializer.Serialize(externalWrench);
      await SendTelemetryAsync("ExternalWrench", externalWrenchJson, cancellationToken);
    }));

    await Task.WhenAll(tasks);
  }

  sw.Stop();
  _logger.LogDebug(String.Format("Elapsed={0}", sw.Elapsed));
}
1

There are 1 best solutions below

8
Corey On BEST ANSWER

The slow (but reliable) method is to read all of the lines in the file and return the last one. Something like:

static string? GetLastLine(string filename)
{
    using StreamReader reader = new(filename);
    string? last = null;
    while (reader.ReadLine() is string line)
        last = line;
    return last;
}

Of course this allocates a lot of strings for no good reason, and the GC will have to get rid of them all shortly afterwards. Not ideal, but that's how it goes. (The async version is simple enough to make, but this runs in a few ms per MB. Up to you.)

To get it to go faster you can try just reading the last part of the file - enough to get a couple of lines - and process that. You'll need to know the maximum line length for this, just so you can guarantee that you get at least one full line. Assuming that you're working with ASCII or UTF8 data you can probably do something like this:

static string? GetLastLine(string filename, int maxLineLength)
{
    using Stream stream = File.OpenRead(filename);
    stream.Position = stream.Length - maxLineLength * 3 / 2;
    using StreamReader reader = new(stream);
    string? last = null;
    while (reader.ReadLine() is string line)
        last = line;
    return last;
}

(This should work in fairly constant time regardless of the size of the file, since it will always process the same amount of data - about 100 microseconds on my machine for a 1MB file using maxLineLength = 150. Again, conversion to async version is simple.)

And finally, if you're expecting lines to be added to the file over time and you want to just read the fresh lines from the file, keep track of the file size and resume reading from there when the file size changes.


As pointed out in the comments, the above isn't particularly useful if you want to read new lines as they come in from outside your program. Not only does it always return the last line of the file, regardless of changes, but if multiple lines are added you'll only get the last one. Almost certainly not what you want.

What you probably want is a way to read new lines as they come in, and not skip any new lines. This means tracking changes in the file's length and resuming read one line at a time. You don't want to lock the file too long, and since StreamReader buffers ahead you can't rely on stream.Position to line up with the end of the line you just read.

The solution is to read all the lines you can each time and save them for subsequent reads. We can pull new lines into a Queue<> and track the file's size to detect when there's more to read. Something like this:

sealed class LineReader
{
    // File to read from.
    private readonly string _filename;
    
    // Queue for when multiple new lines are added between reads.
    private readonly Queue<string> _queue = new();
    
    // Length of file after last read.
    private long _lastPosition;
    
    // True if there's something to read.
    public bool LinesAvailable
    {
        get 
        {
            if (_queue.Count > 0)
                return true;
            FileInfo fi = new(_filename);
            return fi.Exists && fi.Length != _lastPosition;
        }
    }
    
    public LineReader(string filename, bool readExistingLines = false)
    {
        _filename = filename;
        FileInfo fi = new(_filename);
        _lastPosition = readExistingLines || !fi.Exists ? 0 : fi.Length;
    }
    
    // Returns when a line is read or token is cancelled.
    public async Task<string?> WaitNextLineAsync(CancellationToken token)
    {
        while (!token.IsCancellationRequested)
        {
            if (_queue.TryDequeue(out string? queued))
                return queued;
            if (await GetNextLineAsync() is string line)
                return line;
            try
            {
                await Task.Delay(20, token);
            }
            catch (TaskCanceledException)
            {
                break;
            }
        }
        return null;
    }
    
    // Read next line, or `null` if none available.
    public async Task<string?> GetNextLineAsync()
    {
        if (_queue.TryDequeue(out string? line))
            return line;
        
        // Check if file length has changed.
        FileInfo fi = new(_filename);
        if (!fi.Exists || fi.Length == _lastPosition)
            return null;
        
        // Open the stream. May fail if file is locked.
        Stream stream;
        try
        {
            stream = fi.OpenRead();
        }
        catch 
        {
            return null;
        }
        
        using (stream)
        {
            // If file is smaller assume it was cleared, read all lines.
            // Otherwise, go to our last position.
            if (stream.Length >= _lastPosition)
                stream.Position = _lastPosition;
            
            using StreamReader reader = new(stream);
            
            // Queue up new lines
            while (await reader.ReadLineAsync() is string next)
            {
                // Ignore empty lines.
                if (!string.IsNullOrEmpty(next))
                    _queue.Enqueue(next);
            }
        }
            
        // Update state and return the last read line if not empty.
        _lastPosition = stream.Length;
        _queue.TryDequeue(out line);
        return line;
    }
}

GetNextLineAsync() tries to retrieve the next available line, WaitNextLineAsync(...) spins asynchronously until a line is read. Cancellation exits instead of throwing, which is handy if you want to provide a timeout with a CancelAfter token:

async Task ProcessNextLineAsync(LineReader reader, CancellationToken token)
{
    CancellationTokenSource source = CancellationTokenSource.CreateLinkedTokenSource(token);
    source.CancelAfter(TimeSpan.FromSeconds(1));

    if (await LineReader.WaitNextLineAsync(source.Token) is string line)
    {
        // do something with the line
    }
}

(Yes, I could have made a much simpler version. You asked for async though.)

NOTE: this is not thread safe.