Qt: running callback in the main thread from the worker thread

55 Views Asked by At

Consider I have the following asynchronous task runner. It puts tasks in a queue and executes them one by one, calling lambdas on completion. If func yields result of some type, then onComplete callback should receive argument of the same type.

class TaskRunner
{
public:
    TaskRunner() :
        mPool(new QThreadPool())
    {
        mPool->setMaxThreadCount(1);
        mPool->setExpiryTimeout(-1);
    }
public:
    // For Funcs returning an instance of std::variant
    template <typename Func, typename OnComplete, typename OnError, typename... Args>
    std::enable_if_t<is_instance<std::invoke_result_t<Func, Args...>, std::variant>::value>
    async(Func func, OnComplete onComplete, OnError onError, Args... args)
    {
        QMutexLocker lock(&mutex);
        (void)QtConcurrent::run(mPool.get(), [=]() {
            const auto result = func(args...);
            if (result.index() == 0)
                onComplete(std::get<0>(result));
            else
                onError(std::get<1>(result));
        });
    }
    // ... similar templates for std::optional, etc. ...
private:
    QScopedPointer<QThreadPool> mPool;
    QMutex mutex;
};

Usage:

TaskRunner runner;
runner.async([](int a, int b) {
    if(b != 0)
        return a/b;
    else
        return std::string("division by zero");},
    [](auto result){ std::cout << result << '\n'; },
    [](auto error){ std::cout << error << '\n'; },
    10, 2);

The problem is onComplete and onError callbacks should be called from the main thread. I've heard this could be achieved via Qt's signals and slots system, but I am unable to figure out the solution, let alone implement it.

1

There are 1 best solutions below

6
Atmo On

An easy way to do what you are asking is with QtConcurrent::run and an object.

The way it goes is:

  1. Start a function in a separate thread using QtConcurrent::run. What you get from that is a QFuture that you can inquire to get the execution status.
    As you will see in the documentation, you can make that work with a thread pool created beforehand.
  2. You have 2 ways to detect the execution has ended:
  • Call QFuture::waitForFinished: it will block your thread until the execution is done.
    This can be useful if you are already in a worker thread, starting tasks in other worker threads; however, it is safe to say you should never want to call it in the main thread and block it.
  • Have an object send a signal from the worker thread, caught in the main thread. In that case, the main thread does not get blocked.

Here is a quick example that runs 5x a function in worker threads. A taskNo (from 1 to 5) is passed from the main thread to the worker thread in QtConcurrent::run and back in signals.
The tasks basically consist in printing the id of the thread executing them. There are also a number of lines that print what is executed in the main thread.

For illustration's sake, this example works with a single worker object.
You can work with separate worker objects though, but you'll need to create pointers inside the for loop, change their thread affinity in QtConcurrent::run (instead of just printing to the screen like I did), do the connection and only then start the work.

In any case, always explicitly mark your connection with Qt::QueuedConnection. That may be what QObject::connect will do by default but at least, it makes your intention clear.

In the main function, I have put comments in 3 places for illustration:

  • 1st comment: you may set up your connection so that it disconnects after the first taskCompleted signal is emitted.
  • 2nd comment: a thread pool is created in main. I have made it so that it is not used though, that is unless you uncomment it.
    BTW, pay close attention to the thread that executes QtConcurrent::run vs QFuture::then and how tasks won't start until the QFuture chain is over (you can make it more obvious adding QThread::msleep(1000); in Worker::run).
  • 3rd comment: This is an illustration of what QFuture::waitForFinished does. If you were to uncomment it, work would still be done in the worker thread but with no parallelism: since it is blocking the main thread before it can create the next worker thread (see my comment above).

Worker.h

#include <QObject>

class Worker  : public QObject
{
Q_OBJECT
public:
    void run(int taskNo);
signals:
    void taskCompleted(Qt::HANDLE, int);
};

Worker.cpp

#include "Worker.h"

#include <QtCore/QDebug>
#include <QtCore/QThread>

void Worker::run(int taskNo) {
    qDebug() << "Executing task" << taskNo << "in worker thread" << QThread::currentThreadId();
    emit taskCompleted(QThread::currentThreadId(), taskNo);
}

main.cpp

#include <QtCore/QFuture>
#include <QtCore/QThread>
#include <QtCore/QThreadPool>
#include <QtConcurrent/QtConcurrentRun>
    
#include "Worker.h"


int main(int argc, char* argv[])
{
    QApplication a(argc, argv);

    QThreadPool pool;
    pool.setMaxThreadCount(2);

    QObject catchObject;
    Worker worker;
    QObject::connect(&worker, &Worker::taskCompleted, &catchObject,
        [](Qt::HANDLE workerThreadId, int taskNo)
        {
            qDebug() << "Message from main thread" << QThread::currentThreadId() << ": Task" << taskNo << "completed in thread" << workerThreadId;
        }
        , static_cast<Qt::ConnectionType>(Qt::QueuedConnection /*| Qt::SingleShotConnection*/) //Uncomment to catch only the first task to complete.
    );

    qDebug() << "Tasks will now be created from main thread" << QThread::currentThreadId();
    for (int taskNo = 1; taskNo <= 5; ++taskNo) {   
        QtConcurrent::run(
            /*&pool,*/ //Uncomment to work with the 2 threads of the thread pool.
            [](int taskNo) -> int
            {
                qDebug() << "Hello from worker thread" << QThread::currentThreadId() << ", starting task" << taskNo;
                return taskNo;
            }, taskNo
        ).then(
            [&worker](int taskNo) { worker.run(taskNo); }
        )/*.waitForFinished()*/; //Uncomment to make the main thread wait for the task to finish before resuming the loop.
    }

    return a.exec();
}