Preventing access to concurrent queue while rebuilding

194 Views Asked by At

I have a ConcurrentQueue which holds an object with some helpers methods to enqueue and dequeue from the queue. There is a need to remove items from the queue which match some criteria and then rebuild the queue since they might not be in the first item dequeued.

How can I do this safely?

ConcurrentQueue is supposed to be thread safe, but I want to also prevent items from being added/removed from the queue when the removal and rebuilding is happening.

I'm trying this, but I'm not sure if it's correct.

public class EmailRepository
{
    private readonly object _lockObj _lockObj = new object();
    private ConcurrentQueue<EmailMessage> _emailMessages = new ConcurrentQueue<EmailMessage>();

    public ConcurrentQueue<EmailMessage> EmailMessages => _emailMessages;

    public void AddEmailMessage(string subject, string body)
    {
        _emailMessages.Enqueue(new EmailMessage(subject, body));
    }

    public void AddEmailMessage(EmailMessage message)
    {
        _emailMessages.Enqueue(message);
    }

    public bool RemoveEmailMessage(out EmailMessage message)
    {
        return _emailMessages.TryDequeue(out message);
    }

    public void RemoveSiteEmailsAndRebuildQueue(int key)
    {
        for (int i = 0; i < _emailMessages.Count; i++)
        {
            RemoveEmailMessage(out EmailMessage message);
            if (message.KeyValue.Equals(key))
            {
                continue;
            }

            AddEmailMessage(message);
        }
    }
}

I'm exposing _emailMessages but nothing is actually modifying the collection unless they call the appropriate methods.

I think I should be putting something around the logic for RemoveSiteEmailsAndRebuildQueue to ensure nothing will enqeueue or dequeue until that method is done executing, but I'm not sure what that something is.

1

There are 1 best solutions below

0
Amir On

You can even don't use ConcurrentQueue and use a simple lock object like this:

public class EmailRepository
{
    private readonly object _lockObj = new object();
    private Queue<EmailMessage> _emailMessages = new Queue<EmailMessage>();

    public Queue<EmailMessage> EmailMessages => _emailMessages;

    public void AddEmailMessage(string subject, string body)
    {
        lock (_lockObj)
        {
            _emailMessages.Enqueue(new EmailMessage(subject, body));
        }
    }

    public void AddEmailMessage(EmailMessage message)
    {
        lock (_lockObj)
        {
            AddWithoutLock(message);
        }
    }

    private void AddWithoutLock(EmailMessage message)
    {
        _emailMessages.Enqueue(message);

    }

    public bool RemoveEmailMessage(out EmailMessage message)
    {
        lock (_lockObj)
        {
            return _emailMessages.TryDequeue(out message);
        }
    }

    public void RemoveSiteEmailsAndRebuildQueue(int key)
    {
        lock (_lockObj)
        {
            for (int i = 0; i < _emailMessages.Count; i++)
            {
                RemoveEmailMessage(out EmailMessage message);
                if (message.KeyValue.Equals(key))
                {
                    continue;
                }

                AddWithoutLock(message);
            }
        }
    }
}