Working with large files with a particular extension using directory scan operator

75 Views Asked by At

I have a 1GB+ size file coming to my directory from a MQ, this takes some time to completely transfer the file, but a file will be produced in that directory even if it is not a complete one. I am afraid my directoryScan operator will pick up an incomplete file. Also, I cannot add an initial delay because I am not sure how much time will it take to transfer the file.

PS: I read somewhere that some of the file transfer protocols take care of this by adding a different extension to the file until it is complete. So say my directoryScan operator is waiting for any file with .txt extension, so this file transfer protocol will create a file with extension .abc until the transfer is complete.

How should I go ahead with this?

1

There are 1 best solutions below

2
ndsilva On BEST ANSWER

If you were going to use the regular expression route, here is an example of invoking the operator to only read files of a certain extension:

   // DirectoryScan operator with an absolute file argument and a file name pattern         
stream<rstring name> Dir2 = DirectoryScan()                                        
{                                                                                        
  param                                                                                  
    directory : "/tmp/work";                                                             
   pattern : "\\.txt$";                                                                 
}   

If that doesn't work, is it possible to set up MQ to write the file to a different directory and then move it into your target directory when it is complete?

One thing you could do, if you know the size of the file, is use the Size() function to ignore the file until it is the right size. This snippet uses a Custom operator to wait until the file size is at least 2000 bytes.

graph
        stream <rstring filename, uint64 size> DirScanOutput = DirectoryScan() {
            param
                directory: "test1";
                sleepTime: 10.0; //wait 10s between scans
                pattern: ".*\\.txt";

          output DirScanOutput : size= Size();
        } 


        stream<rstring file> FileNameStream= Custom(DirScanOutput as In){
            logic
                onTuple In:{
                    if (size < 2000ul){
                        printStringLn("Required size not met yet.");
                    } else {
                        printStringLn("Size of file reached.");
                        submit({file=filename}, FileNameStream);

                    }
                }
        }

        stream <cityData> CityDataRecord = FileSource(FileNameStream) {
            param
                format: csv;
        } 

I hope one of these suggestions works for you.