How to propagate exceptions from sub-functions in a threaded program?

91 Views Asked by At

I am trying to understand how the exceptions can be propagated between different functions and returned to main function in C++. I have a small setup:

main.cpp:

int run () {
.
.
try {
  testException(file);
} catch (const std::exception &e) {
    std::cerr << "Error: " << e.what() << std::endl;
}
return 0;
}

test.cpp

std::exception_ptr g_exceptionPtr = nullptr;

void testThread() {
   std::this_thread::sleep_for(std::chrono::milliseconds(100));
   .
   .
}

void testException(const std::string &file) {
  TestCSV csv(file);
  try {
    std::thread input(testThread);
    csv.writeToCSV(file, a, b, c);
    input.join();
  } catch (const std::runtime_error &e) {
      g_exceptionPtr = std::current_exception();
      std::rethrow_exception(g_exceptionPtr);
  }
}

In test_csv.cpp:

TestCSV::writeToCSV(const std::string &file, const std::string &a, const std::string &b, const std::string &c) {
.
.
std::ofstream outFile(file);
    if (!outFile.is_open()) {
      throw std::runtime_error("Unable to open file for writing.");
    }
}

Now I want to propagate the error from writeToCSV function and handle it in main.cpp. But, currently, this exception is caught in test.cpp but is not re-thrown to main.cpp.

What is the issue and how can I resolve it?

P.S: Above code is just an example and please let me know if any info is missing

3

There are 3 best solutions below

8
Remy Lebeau On BEST ANSWER

You are calling writeToCSV() in the context of the same thread that is calling run(), so any exception that writeToCSV() throws and is not caught will already propagate to run() as expected. You don't need to do anything extra for that. Simply don't catch the exception at all, or if you do then just re-throw it (not std::rethrow_exception()). Your usage of std::thread is irrelevant in this situation.

int run () {
  ...
  try {
    testException(file);
  } catch (const std::exception &e) {
    // this should catch whatever testException() throws...
    std::cerr << "Error: " << e.what() << std::endl;
  }
  return 0;
}

void testException(const std::string &file) {
  TestCSV csv(file);
  csv.writeToCSV(file, a, b, c);

  // or:

  try {
    TestCSV csv(file);
    csv.writeToCSV(file, a, b, c);
  }
  catch (const std::exception &e) {
    ...
    throw;
  }
}

On the other hand, if your question is about propagating exceptions across threads, then you should change your test to reflect that. Moving writeToCSV() into the std::thread would be a better test of std::current_exception and std::rethrow_exception.

std::current_exception allows you to capture a caught exception so you can access it outside of the catch. You can capture the exception in the original thread that threw it, move it to the desired thread, and then call std::rethrow_exception in that thread. In this case, after you have join'ed the std::thread if the exception_ptr was assigned, eg:

void testThread(const std::string &file, std::exception_ptr &exceptionPtr) {
  try {
    TestCSV csv(file);
    csv.writeToCSV(...);
  } catch (const std::exception &) {
    exceptionPtr = std::current_exception();
  }
}

void testException(const std::string &file) {
  std::exception_ptr exceptionPtr = nullptr;
  std::thread writer(testThread, file, std::ref(exceptionPtr));
  ...
  writer.join();
  if (exceptionPtr)
    std::rethrow_exception(exceptionPtr);
}

int run () {
  ...
  try {
    testException(file);
  } catch (const std::exception &e) {
    // this should catch whatever testThread() throws...
    std::cerr << "Error: " << e.what() << std::endl;
  }
  return 0;
}
0
Yakk - Adam Nevraumont On

One approach to solves this is to use a std::future to deliver your results.

std::future<T> is an asymmetric union between a T and an exception. When you call .get() it will throw if it holds an exception.

There are helpers to work with std::future, namely std::packaged_task<R(Args...)> and std::promise<T>.

Suppose you have a thread pool that operates on a queue of tasks.

First, a threadsafe queue design:

template<class T>
struct threadsafe_queue {
  T pop();
  std::queue<T> pop_all();
  template<class Duration>
  std::optional<T> wait_or_pop( Duration );

  void push(T);
  void push_many(std::queue<T>);
private:
  auto lock() const { return std::unique_lock{m}; }
  mutable std::mutex m;
  std::condition_variable cv;
  std::deque<T> q;
};

implementation left up to the reader.

Then a thread pool:

struct thread_pool {

  template<class F>
  auto queue_task( F f ) {
    using R = std::invoke_result_t<F&>;
    std::packaged_task<R()> t = std::forward<F>(f);
    std::future<R> f = t.get_future();
    q.push( std::packaged_task<void()>( std::move(t) );
    return f;
  }

  void start_thread( std::size_t n=1 );
private:
  threadsafe_queue<std::packaged_task<void()>> q;
  std::vector<std::thread> threads;
};

now you pass your tasks to the thread_pool. They get run in a packaged_task. If they throw, their exception is moved into the std::future.

To propagate the exception you just call .get() on the future.

0
sam On

In order to catch exceptions from each of threads generated from main we need to create an array of the exception_ptr(s) matching the number of threads and then deal with the exception(s) from each thread in the main thread as shown below.

#include<iostream>
#include<thread>
#include<exception>
#include<stdexcept>
#include <memory>
#include <sstream>
#include <ctime>

using namespace std;

exception_ptr eptr[10];

void f(int i)
{
    try {
        auto id = this_thread::get_id();
        cout << "Thread " << id << " sleeping ..." << '\n';
        this_thread::sleep_for(chrono::seconds(1));
        if(i & 1) {
          stringstream ss;
          ss << id ;
          string str ("Exception from " + ss.str());
          throw std::runtime_error(str.c_str());
        }
    }
    catch(...)
    {
        eptr[i] = current_exception();
    }
}

int main(int argc, char **argv)
{
  thread th[10];
  for(int i = 0; i < 10; ++i) {
    this_thread::sleep_for(chrono::seconds(1));
    th[i] = thread(f,i);
  }

  for(int i = 0; i < 10; ++i) {
    if(th[i].joinable())
      th[i].join();

    if (eptr[i]) {
      try{
            std::rethrow_exception(eptr[i]);
      }
      catch(const std::exception &ex)
      {
            std::cerr << "Thread exited with exception: " << ex.what() << "\n";
      }
    }
  }

  return 0;
}

Example output:

$ ./excep_thread1
Thread 0xa000128e0 sleeping ...
Thread 0xa000229f0 sleeping ...
Thread 0xa00022b10 sleeping ...
Thread 0xa00022c10 sleeping ...
Thread 0xa000233a0 sleeping ...
Thread 0xa00023050 sleeping ...
Thread 0xa00023150 sleeping ...
Thread 0xa000234a0 sleeping ...
Thread 0xa000235a0 sleeping ...
Thread 0xa000236a0 sleeping ...
Thread exited with exception: Exception from 0xa000229f0
Thread exited with exception: Exception from 0xa00022c10
Thread exited with exception: Exception from 0xa00023050
Thread exited with exception: Exception from 0xa000234a0
Thread exited with exception: Exception from 0xa000236a0