I have a queue of jobs which can be populated by multiple threads (ConcurrentQueue<MyJob>). I need to implement continuous execution of this jobs asynchronously(not by main thread), but only by one thread at the same time. I've tried something like this:
public class ConcurrentLoop {
private static ConcurrentQueue<MyJob> _concurrentQueue = new ConcurrentQueue<MyJob>();
private static Task _currentTask;
private static object _lock = new object();
public static void QueueJob(Job job)
{
_concurrentQueue.Enqueue(job);
checkLoop();
}
private static void checkLoop()
{
if ( _currentTask == null || _currentTask.IsCompleted )
{
lock (_lock)
{
if ( _currentTask == null || _currentTask.IsCompleted )
{
_currentTask = Task.Run(() =>
{
MyJob current;
while( _concurrentQueue.TryDequeue( out current ) )
//Do something
});
}
}
}
}
}
This code in my opinion have a problem: if task finnishing to execute(TryDequeue returns false but task have not been marked as completed yet) and in this moment i get a new job, it will not be executed. Am i right? If so, how to fix this
Your problem statement looks like a producer-consumer problem, with a caveat that you only want a single consumer.
There is no need to reimplement such functionality manually. Instead, I suggest to use
BlockingCollection-- internally it usesConcurrentQueueand a separate thread for the consumption. Note, that this may or may not be suitable for your use case.Something like:
Multiple producers (tasks or threads) can add work items to the
_blockingCollectionno problem, and no need to worry about synchronizing producers/consumer.When you are done with producing task, call
_blockingCollection.CompleteAdding()(this method is not thread safe, so it is advised to stop all producers beforehand). Probably, you should also do_consumingThread.Join()somewhere to terminate your consuming thread.