LiteDB Deadlocks - I may be using LiteDB wrong

552 Views Asked by At

So I am writing a server in C#. It has a LiteDB database to store messages in. Every time the server receives a message, it stores the message in the DB, and sends a copy to the proper destination.

Since LiteDB is supposed to be thread-safe, I don't use locks to synchronize reads/writes to the DB.

I've made a test (on a WPF app) that connects 100 clients, each in a separate task. Each client sends a message to all other clients and expects to receive a message from all other clients. I then make sure that all messages were sent/received as expected. For each client and message, I add some random delay. (More on that in a bit.)

I observe some weird behavior in the test that I would love some help with. The first 400-2000 messages are properly received by the server, inserted to the DB, and forwarded to their destination. However, after a while, everything slows down. A message is processed once a second. That is, I see a text (outputted to console) saying that a message is received, then the message is inserted to the DB, but I don't see a text saying that it was successfully inserted. I assume that this message is awaiting some inner LiteDB lock to unlock or something.

Eventually, after 1 minute of this, I get the following exception:

Exception thrown: 'LiteDB.LiteException' in LiteDB.dll
Database lock timeout when entering in transaction mode after 00:01:00

I guess this exception is thrown because there IS some deadlock. I just can't figure out what is causing it. (So I don't really care about the exception per se, I just mention it since it may help determine the cause of the actual problem.)

When I have 0 delay between messages, this issue happens a lot sooner. When I set around ~50ms delay, I manage to pass the test. I assume that I either have some deadlock in LiteDB, or that (on the case of low delay) I have too many simultaneous tasks running? (Not sure if the latter is actually true.)

I've added a simplified version of my server's code.

public class Worker : BackgroundService
{
    private static readonly TimeSpan _collectionInterval = TimeSpan.FromSeconds(1);
    private static readonly TimeSpan _expirationThreshold = TimeSpan.FromMinutes(2);
    
    private readonly ConcurrentDictionary<string, Client> Clients = new();

    private readonly DBHandler _dbHandler = DBHandler.GetInstance(AppDomain.CurrentDomain.BaseDirectory);

    protected override async Task ExecuteAsync(CancellationToken cancellationToken)
    {
        CancellationTokenSource cancellationTokenSource = CancellationTokenSource
                .CreateLinkedTokenSource(cancellationToken);
        CancellationToken token = cancellationTokenSource.Token;
    
        List<Task> tasks = new();

        try
        {
            listener.Start();

            while (!token.IsCancellationRequested)
            {
                TcpClient tcpClient;
                try
                {
                    // Accept an incoming client connection
                    tcpClient = await listener.AcceptTcpClientAsync(token);
                }
                catch (Exception ex)
                {
                    cancellationTokenSource.Cancel();
                    break;
                }

                // Handle the client connection in a separate task
                tasks.Add(
                    HandleClientConnectionAsync(tcpClient, token)
                );
            }

            cancellationTokenSource.Cancel();
            await Task.WhenAll(tasks);
        }
        finally
        {
            _dbHandler.Dispose();
            listener.Stop();
            cancellationTokenSource.Dispose();
            token.ThrowIfCancellationRequested();
        }
    }

    private async Task HandleClientConnectionAsync(TcpClient tcpClient, CancellationToken token)
    {
        Client? client = null;

        try
        {
            client = new(tcpClient);
            client.EnableKeepAlive();

            await SendMessageAsync(new WelcomeMessage(), client, token);

            Message? message = await ReceiveMessageAsync(client, token);
            
            if (Clients.TryAdd(message.FromStationID, client))
            {
                await ReceiveMessagesAsync(client, token);
            }
        }
        finally
        {
            // Clean up closed connection
            if (client is not null)
            {
                if (client.StationID is not null)
                {
                    Clients.TryRemove(client.StationID, out _);
                }
                client.CloseTcpAndStream();
            }
        }
    }
    
    private static async Task SendMessageAsync(Message message, Client client, CancellationToken token)
    {
        //use message.Serialize(), then add 4 byte header to create byte[] buffer
        
        await client.WriteAsync(buffer, token);
    }
    
    private static async Task<Message?> ReceiveMessageAsync(Client client, CancellationToken token)
    {
        // use await client.ReadAsync to receive 4 byte header
        // use await client.ReadAsync to receive the message bytes into byte[] buffer
        
        return
            buffer is null ?
            null :
            Message.Deserialize(Encoding.UTF8.GetString(buffer, 0, buffer.Length));
    }

    private async Task ReceiveMessagesAsync(Client client, CancellationToken token)
    {
        while (client.IsConnected && !token.IsCancellationRequested)
        {
            Message? message = await ReceiveMessageAsync(client, token);
            
            if (token.IsCancellationRequested || message is null)
            {
                break;
            }

            await ProcessMessageByTypeAsync(message, token);
        }
    }

    private async Task ProcessMessageByTypeAsync(Message message, CancellationToken token)
    {
        if (message is null)
        {
            return;
        }
        else if (message is AckMessage ackMessage)
        {
            await ProcessAckMessageAsync(ackMessage, token);
        }
        else if (message is DataMessage || message is FeedbackMessage)
        {
            await ProcessDataMessageAsync(message, token);
        }
        // Ignore other messages
    }

    private async Task ProcessDataMessageAsync(Message message, CancellationToken token)
    {
        if (message is null)
        {
            return;
        }

        if (message.ToStationID != null)
        {
            _dbHandler.Insert(message);

            Client client;

            if (Clients.TryGetValue(message.ToStationID, out client))
            {
                await SendMessageAsync(message, client, token);
            }
        }
    }

    private async Task ProcessAckMessageAsync(AckMessage ackMessage, CancellationToken token)
    {
        _dbHandler.DeleteMessageByID(ackMessage.AckedMessageID);
    }
}

And the relevant DB code:

private ILiteCollection<Message> GetCollection(Message message)
{
    var msgCollection = _dataBase
        .GetCollection<Message>(DB.MESSAGES);

    msgCollection.EnsureIndex((Message m) => m.MessageID);

    return msgCollection;
}

private ILiteCollection<Message> GetMessages()
{
    return GetCollection((Message)null);
}

public BsonValue Insert(Message message)
{
    message.MessageID = 0;
    return GetMessages()
        .Insert(message);
}

public bool DeleteMessageByID(BsonValue messageID)
{
    return GetMessages()
        .Delete(messageID);
}
1

There are 1 best solutions below

1
Felix On

I have had a similar issue probably.

Exception was:

Collection 'MyCollection' lock timeout when entering in write mode after 00:01:00

... thrown by the same logic that was rock stable when awaited from the UI synchronization context. But as soon as a SignalR handler tried to execute this method these exception occured. This is where I see the similarities to yourBackgroundService.

Manually awaiting this method from the UI synchronization context again fixed all issues immediately. Like so:

private void MySignalrEventHandler()
{
    Task.Factory.StartNew(
        async () =>
        {
            await MyUsuallyRockstableMethod(); // executing up to thousands of upserts on tens of collections
        },
        CancellationToken.None,
        TaskCreationOptions.None,
        taskSchedulerForDesiredSyncContext // eg get via TaskScheduler.FromCurrentSynchronizationContext(); while on UI thread and pass around
        ); 
}

Hope it helps!