Neo4j throwing a ProtocolException when cancelling a transaction

82 Views Asked by At

Using neo4j as the db for an api, if a user makes repeated requests using the same or similar parameters (for example, they're requesting many pages of data by scrolling through a list and only the last is actually required) I want to cancel the previous query and run the latest. I have a working solution for this whereby cancellation is managed by a store of CancellationTokenSource which requests cancellation. The db query code uses a single Driver instance and a function to read the data;

//get the session
let getSession (sessionMode : AccessMode) (idriver:IDriver) =
        idriver.AsyncSession(Action<SessionConfigBuilder>(fun b -> b.WithDefaultAccessMode(sessionMode) |> ignore))

let readData
    (timezone : Timezone)
    idriver
    typeConversions
    (query:ReadQuery)
    (queryParams: QueryParams)
    : Async<Result<seq<Val list>, GraphError>> =
    asyncResult
        {
            use session = getSession AccessMode.Read idriver

            //provides cancellation
            let! token = Async.CancellationToken
            use! holder = Async.OnCancel(fun () -> session.CloseAsync() |> ignore)

            return!
                session.ExecuteReadAsync<Result<Val list seq, GraphError>>(fun tx ->
                    task {
                        try
                            let resultProcessF = asTypes (Timezone.value timezone) typeConversions
                            let! runResult = tx.RunAsync(ReadQuery.value query, queryParams)
                            let! results = runResult.ToListAsync(token)
                            let resultsSeq = results |> Seq.map resultProcessF
                            return Ok resultsSeq
                        with
                        | ex ->
                            return
                                Error (GraphError.ReadError
                                    {
                                        Error = (sprintf "Failed to run read data with query: %s" (ReadQuery.value query))
                                        ErrorDetails = ex.Message
                                    })
                    })
        }

Cancellation is provided as shown, with the async function using the token to cancel the ToListAsync method, and the Async.OnCancel(fun () -> session.CloseAsync() |> ignore) function closing the session and rolling back the transaction when cancellation is requested.

This usually works and can successfully rollback 10s or 100s of requests successfully. However, every so often I receive an exception which I'm struggling to diagnose;

 ---> System.AggregateException: One or more errors occurred. (Message 'ROLLBACK' cannot be handled by a session in the READY state.)
 ---> Neo4j.Driver.ProtocolException: Message 'ROLLBACK' cannot be handled by a session in the READY state.

The exception seems straightforward enough; the session is READY but the code is trying to ROLLBACK presumably as a result of the session.CloseAsync() request. There doesn't seem to be a mechanism for checking the session state prior to running CloseAsync() to prevent this, and slapping a try catch around it feels a bit of a bodge. I'm using the default bolt thread configurations.

What would cause this exception if my assumption is incorrect, and how do I resolve this issue?

1

There are 1 best solutions below

0
just_another_dotnet_dev On

I think the issue you have is that you are creating a race, with the cancellation token being passed to the runResult.ToListAsync(token) and using the registration to close the session.

  • When the session is closed it attempts to rollback and close the current transaction.
  • When the transaction functions (ExecuteReadAsync etc) catch an exception they too will rollback themselves.

I would recommend passing the token to runResult.ToListAsync(token) and not using Async.OnCancel(fun () -> session.CloseAsync() |> ignore)

This might not work for you as the driver only checks the cancellation token when the results are coming back so queries that take a while to respond will still block(This is an open issue for the driver). If you need to be able to cancel your queries too you could try using a racing call.

let racing(t: Task): Async<unit> =
    task {
        let! ct = Async.CancellationToken
        use race = Task.Delay(-1, ct)
        let! _ = Task.WhenAny(t, race)
        ct.ThrowIfCancellationRequested()
        return ()
    } |> Async.AwaitTask
    
let racingResult<'a>(t: Task<'a>): Async<'a> =
    task {
        let! ct = Async.CancellationToken
        use race = Task.Delay(-1, ct)
        let! _ = Task.WhenAny(t, race)
        ct.ThrowIfCancellationRequested()
        return t.Result
    } |> Async.AwaitTask

Furthermore, you should avoid catching exceptions inside the lambda of the transaction as these are used by the driver to know if it should rollback I think a better solution is to move the try catch around the call. so should end with something like

let readData
    (timezone : Timezone)
    idriver
    typeConversions
    (query:ReadQuery)
    (queryParams: QueryParams)
    : Async<Result<seq<Val list>, GraphError>> =
    asyncResult
        {
            use session = getSession AccessMode.Read idriver

            //provides cancellation
            let! token = Async.CancellationToken

            return!
                try
                    session.ExecuteReadAsync<Result<Val list seq, GraphError>>(fun tx ->
                        task {
                                let resultProcessF = asTypes (Timezone.value timezone) typeConversions
                                let! results = racingResult (task {
                                    let! result = tx.RunAsync(ReadQuery.value query, queryParams)
                                    return! result.ToListAsync(token)
                                }, token) 
                                let resultsSeq = results |> Seq.map resultProcessF
                                return Ok resultsSeq
                        })
                with
                | ex ->
                    return
                        Error (GraphError.ReadError
                            {
                                Error = (sprintf "Failed to run read data with query: %s" (ReadQuery.value query))
                                ErrorDetails = ex.Message
                            })
        }

I'm not an F# user, but I do hope that helps.

Related Questions in F#