External sorting text lines in a huge file lexicographically using C++

1.5k Views Asked by At

We have 200Gb file filled with text lines divided by "\n". Our server has Linux on board, gcc, 8GB RAM and unlimited hard disk space. The requirement is to implement in C++ the most efficient way to lexicographically sort lines of this file from our point of view.

I have done the external sorting of text lines lexicographically for the input file (size up to 2GB) correctly. But when I test with a file size bigger than 2GB, the output is incorrect. The maximum file size needs to be tested bigger than 200GB.

Below is my code that worked well with file size lower than 2GB. Program has 3 parameters as command line argument: input file name, output file name and memory limit in bytes (we will test with different memory limits, not only 8GB) The program should be able to work correctly with simple boundary cases, e.g. with small files, empty files, files much bigger than 200GB. It should work well with non-ascii symbols, but we can assume that zero-byte is not present in the file. We can also assume that memory limit is much bigger than the size of the longest line.

Here's the code. Please help me point out the reason why it can't produce correct result with file size bigger than 2GB. I would appreciate if you can give me a modified version of my code below to make it works in all cases.

#include <stdio.h>
#include <stdlib.h>
#include <fstream>
#include <sstream>
#include <iostream> 
#include <algorithm>
#include <vector>
#include <string>
#include <iterator>

//using namespace std;

#define FILESIZE  200000000000          // size of the file on disk
#define TOTAL_MEM 7000000000            // max items the memory buffer can hold

typedef std::vector<std::string> DataContainer;


// Function to sort lines of text according to their length
/*bool compare(std::string &a, std::string &b) 
{ 
    if (a.size() < b.size())
        return true;
    else if (a.size() > b.size())
        return false;
    else
        return a < b;

    //return strcasecmp( a.c_str(), b.c_str() ) <= 0;
} 

long estimateSizeOfBlocks(long sizeoffile, int maxtmpfiles, long maxMemory) 
{

}
*/
std::string ToString(int val) {
    std::stringstream stream;
    stream << val;
    return stream.str();
}

void ExternalSort(std::string infilepath, std::string outfilepath)
{
    std::string line, new_line;
    std::string tfile = "temp-file-";
    int i, j, k;
    DataContainer v;

    std::ifstream infile;

    if(!infile.is_open())
    {   
        std::cout << "Unable to open file\n";           
    }
    infile.open(infilepath, std::ifstream::in);

    // Check if input file is empty
    if (infile.peek() == std::ifstream::traits_type::eof())
    {
        std::cout << "File is empty";
    }

    //std::vector<std::string> x(std::istream_iterator<std::string>(infile), {});

    // Get size of the file
    infile.seekg (0, infile.end);
    int input_file_size = infile.tellg();
    infile.seekg (0, infile.beg);
    std::cout << "The size of the file chosen is (in bytes): " << input_file_size << "\n";

    // Estimate required number of runs to read all data in the input file
    int runs_count;
    if(input_file_size % TOTAL_MEM > 0)
        runs_count = input_file_size/TOTAL_MEM + 1;     // if file size is fixed at 200GB, we just need to define FILESIZE: runs_count = FILESIZE/TOTAL_MEM
    else
        runs_count = input_file_size/TOTAL_MEM;

    // Iterate through the elements in the file
    for(i = 0; i < runs_count; i++)
    {
        // Step 1: Read M-element chunk at a time from the file
        for (j = 0; j < (TOTAL_MEM < input_file_size ? TOTAL_MEM : input_file_size); j++)
        {
            while(std::getline(infile, line))
            {
                // If line is empty, ignore it
                if(line.empty())
                    continue;
                new_line = line + "\n";
                // Line contains string of length > 0 then save it in vector
                if(new_line.size() > 0)
                    v.push_back(new_line);
            }   
        }
        // Step 2: Sort M elements
        sort(v.begin(), v.end());       //sort(v.begin(), v.end(), compare); 

        // Step 3: Create temporary files and write sorted data into those files.
        std::ofstream tf;
        tf.open(tfile + ToString(i) + ".txt", std::ofstream::out | std::ofstream::app); 
        std::ostream_iterator<std::string> output_iterator(tf, "\n");
        std::copy(v.begin(), v.end(), output_iterator);

    /*  for(vector<std::string>::const_iterator it = v.begin(); it != v.end(); ++it)
            tf << *it << "\n";
        */
        tf.close();
    }   
    infile.close();

    // Step 4: Now open each file and merge them, then write back to disk
    DataContainer vn;
    for(i = 0; i < runs_count; i++)     
    {
        std::ifstream sortedFiles[runs_count];                              
        std::ostringstream filename;
        filename << tfile << i << ".txt";
        sortedFiles[i].open(filename.str(), std::ifstream::in);
        //sortedFiles[i].open(tfile + ToString(i) + ".txt", std::ifstream::in); 

        std::string st, new_st;
        while(std::getline(sortedFiles[i], st))
        {
            // If line is empty, ignore it
            if(st.empty())
                continue;
            new_st = st + "\n";
            if(new_st.size() > 0)
                vn.push_back(new_st);
        }
        sortedFiles[i].close();
    }
    // Step 5: Merge total sorted data
    sort(vn.begin(), vn.end());

    std::ofstream outfile;
    outfile.open(outfilepath, std::ofstream::out | std::ofstream::app);     
    std::ostream_iterator<std::string> output_iterator(outfile, "\n");
    std::copy(vn.begin(), vn.end(), output_iterator);

    outfile.close();    // Close final sorted file

}

int main(int argc, char** argv)         //      e.g.     argv[1] = "E:\\data.txt"           argv[2] = "E:\\sorted_data.txt"
{

    std::cout << "You have entered " << argc 
         << " arguments:\n"; 

    for (int i = 0; i < argc; ++i) 
        std::cout << argv[i] << "\n"; 

    ExternalSort(argv[1], argv[2]);

    return 0; 
}
1

There are 1 best solutions below

7
zdf On

See the edit below for a full implementation of external sort - no warranty.

Assuming you have two ascending sorted files (each number is a line):

  • File A: 1 3 5 7
  • File B: 10 2 4 6 8
  • The expected result is file C: 1 10 2 3 4 5 6 7 8

If you simply concatenate the files, as it seems you did, you will get: 1 3 5 7 10 2 4 6 8; which is not what you expected.

You should read from both files and write the lexicographically-smallest line. This can be easily understood with files containing the same number of lines and alternating lines (for instance, 1 3 5 & 2 4 6):

  1. Read the first line from A and B.
  2. Write the smallest to C.
  3. Read the next line from the file the written number belongs.
  4. Go to 2.

If the number of lines differs, once you reach the end of one file, write the remaining of the second file:

    File A: 1 3 5 7
    File B: 2 4 8

    A B    C
    --------
    1 2 -> 1
    3 2 -> 2
    3 4 -> 3
    5 4 -> 4
    5 8 -> 5
    7 8 -> 7
      8 -> 8

Once you have an algorithm that merges two files, run it on pair of files until you end with only one file. Assuming you have sorted files A, B, C & D:

  1. Merge A & B into AB.
  2. Delete A & B.
  3. Merge C & D into CD.
  4. Delete C & D.
  5. Merge AB & CD into ABCD. This is the result.
  6. Delete AB & CD.

    A   B  C   D
    |   |  |   |
     \ /    \ /
     AB     CD
      |      | 
      \     /
       ABCD
    

Here’s a two-way merger written in hurry – no warranty:

#include <iostream>
#include <iostream>
#include <fstream>
#include <string>
#include <algorithm>

class two_way_merge_t
{

public:
  two_way_merge_t(std::istream& a, std::istream& b, std::ostream& c) : a_{ a }, b_{ b }, c_{ c }
  {
    // nop
  }

  void execute()
  {
    using namespace std;

    // get first line from a; assumes a is not empty
    a_.get();

    // get first line from b; assumes b is not empty
    b_.get();

    // write whichever is less and advance
    while (write_one())
      ;
  }

protected:

  // helper
  struct reader_t
  {
    std::istream& is_;
    std::string line_;

    bool get()
    {
      return (bool)std::getline(is_, line_);
    }

    void write(std::ostream& os)
    {
      os << line_ << std::endl;
    }

    void write_rest(std::ostream& os)
    {
      do write(os); while (get());
    }
  };

  reader_t a_, b_;
  std::ostream& c_;

  bool write_one()
  {
    using namespace std;

    if (lexicographical_compare(a_.line_.begin(), a_.line_.end(), b_.line_.begin(), b_.line_.end()))
    {
      a_.write(c_);
      if (!a_.get())
        return b_.write_rest(c_), false;
    }
    else
    {
      b_.write(c_);
      if (!b_.get())
        return a_.write_rest(c_), false;
    }

    return true;
  }

};

void validate(std::istream& is) // lines count not validated
{
  using namespace std;

  string prev;
  string cur;
  while (getline(is, cur))
  {
    if (!lexicographical_compare(prev.begin(), prev.end(), cur.begin(), cur.end()))
      throw "bad two-way merge";
    swap(prev, cur);
  }
}

int main()
{
  using namespace std;

  {
    ifstream a("c:\\temp\\a.txt");
    if (!a)
      return -1;

    ifstream b("c:\\temp\\b.txt");
    if (!b)
      return -2;

    ofstream c("c:\\temp\\c.txt");
    if (!c)
      return -3;

    two_way_merge_t twm(a, b, c);
    twm.execute();
  }

  ifstream c("c:\\temp\\c.txt");
  if (!c)
    return -4;
  try
  {
    validate(c);
  }
  catch (const char* e)
  {
    cout << e;
    return -5;
  }
}

E D I T

#include <iostream>
#include <fstream>
#include <string>
#include <algorithm>
#include <vector>
#include <filesystem>

//-------------------------------------------------------------------
class two_way_merge_2_t
{

public:
  two_way_merge_2_t(std::vector<std::string>& a, std::istream& b, std::ostream& c) : a_{ a }, b_{ b }, c_{ c }
  {
    // nop
  }

  void execute()
  {
    using namespace std;

    // get first line from a; assumes a is not empty
    if ( ! a_.get() )
      return b_.write_rest(c_);

    // get first line from b; assumes a is not empty
    if (!b_.get())
      return a_.write_rest(c_);

    // we have managed to read from a and b: write whichever is less and advance
    while (write_one())
      ;
  }

protected:

  // helper
  struct isreader_t
  {
    std::istream& is_;
    std::string line_;

    bool get()
    {
      return (bool)std::getline(is_, line_);
    }

    void write(std::ostream& os)
    {
      os << line_ << std::endl;
    }

    void write_rest(std::ostream& os)
    {
      do write(os); while (get());
    }

    const std::string& line() const
    {
      return line_;
    }

  };

  // helper
  struct vreader_t
  {
    std::vector<std::string>& v_;
    std::vector<std::string>::size_type i_ = std::vector<std::string>::size_type (-1);

    bool get()
    {
      return ++i_ < v_.size();
    }

    void write(std::ostream& os)
    {
      os << v_[i_] << std::endl;
    }

    void write_rest(std::ostream& os)
    {
      do write(os); while (get());
    }

    const std::string& line() const
    {
      return v_[i_];
    }
  };

  vreader_t a_;
  isreader_t b_;
  std::ostream& c_;

  bool write_one()
  {
    using namespace std;

    if (lexicographical_compare(a_.line().begin(), a_.line().end(), b_.line().begin(), b_.line().end()))
    {
      a_.write(c_);
      if (!a_.get())
        return b_.write_rest(c_), false;
    }
    else
    {
      b_.write(c_);
      if (!b_.get())
        return a_.write_rest(c_), false;
    }

    return true;
  }

};


//-------------------------------------------------------------------
class partial_sort_t
{
public:
  partial_sort_t(std::istream& a, std::istream& b, std::ostream& c, std::size_t mem)
  : a_{ a }
  , b_{ b }
  , c_{ c }
  , mem_{ mem }
  {
    // nop
  }

  void execute()
  {
    sort();
    two_way_merge_2_t(va_, b_, c_).execute();
  }

protected:
  std::istream& a_;
  std::istream& b_;
  std::ostream& c_;
  const std::size_t mem_;
  std::vector<std::string> va_;

  void sort()
  {
    std::streamsize max_gpos = a_.tellg() + std::streamsize(mem_);
    std::string line;
    va_.clear();
    while (a_.tellg() < max_gpos && std::getline(a_, line) )
      va_.push_back(std::move(line));
    std::sort(va_.begin(), va_.end());
  }
};


//-------------------------------------------------------------------
class external_sort_t
{

public:
  external_sort_t(const char* fn, const std::size_t mem) : is_{ fn }, mem_{ mem }
  {
    // nop
  }

  const char* execute()
  {
    make_b();
    make_c();
    while (is_)
    {
      partial_sort_t(is_, b_, c_, mem_).execute();
      swap_bc();
    }
    return done();
  }

protected:
  const std::size_t mem_;
  std::ifstream is_;

  std::ifstream b_;
  std::string bpath_;

  std::ofstream c_;
  std::string cpath_;

  static std::string make_temp_file()
  {

    std::string fn(512, 0);
    tmpnam_s(&fn.front(), fn.size());
    std::ofstream os(fn);
    os.close();
    return fn;
  }

  void make_b()
  {
    b_.close();
    bpath_ = make_temp_file();
    b_.open(bpath_);
  }

  void make_c()
  {
    c_.close();
    cpath_ = make_temp_file();
    c_.open(cpath_);
  }

  void swap_bc()
  {
    b_.close();
    c_.close();
    std::swap(bpath_, cpath_);
    b_.open(bpath_);
    c_.open(cpath_);
  }

  const char* done()
  {
    b_.close();
    c_.close();
    std::experimental::filesystem::remove(cpath_);
    return bpath_.c_str();
  }

};

void validate(std::istream& is) // lines count not validated
{
  using namespace std;

  string prev;
  string cur;
  while (getline(is, cur))
  {
    if ( strcmp(prev.c_str(), cur.c_str()) > 0 )
      throw "bad two-way merge";
    swap(prev, cur);
  }
}

void validate(const char* fn)
{
  using namespace std;
  ifstream is{ fn };
  if (!is)
    throw "cannot open";
  validate(is);
}

void test_external_sort()
{
  using namespace std;

  external_sort_t es{ "c:\\temp\\a.txt", 10 };
  std::string rfn = es.execute();

  try
  {
    validate(rfn.c_str());
  }
  catch (const char* e)
  {
    cout << e << endl;
  }
}

int main()
{
  test_external_sort();
}