I've coded the "classical" bank account kata with F# MailboxProcessor to be thread safe. But when I try to parallelize adding a transaction to an account, it's very slow very quick: 10 parallel calls are responsive (2ms), 20 not (9 seconds)! (See last test Account can be updated from multiple threads beneath)
Since MailboxProcessor supports 30 million messages per second (see theburningmonk's article), where the problem comes from?
// -- Domain ----
type Message =
| Open of AsyncReplyChannel<bool>
| Close of AsyncReplyChannel<bool>
| Balance of AsyncReplyChannel<decimal option>
| Transaction of decimal * AsyncReplyChannel<bool>
type AccountState = { Opened: bool; Transactions: decimal list }
type Account() =
let agent = MailboxProcessor<Message>.Start(fun inbox ->
let rec loop (state: AccountState) =
async {
let! message = inbox.Receive()
match message with
| Close channel ->
channel.Reply state.Opened
return! loop { state with Opened = false }
| Open channel ->
printfn $"Opening"
channel.Reply (not state.Opened)
return! loop { state with Opened = true }
| Transaction (tran, channel) ->
printfn $"Adding transaction {tran}, nb = {state.Transactions.Length}"
channel.Reply true
return! loop { state with Transactions = tran :: state.Transactions }
| Balance channel ->
let balance =
if state.Opened then
state.Transactions |> List.sum |> Some
else
None
balance |> channel.Reply
return! loop state
}
loop { Opened = false; Transactions = [] }
)
member _.Open () = agent.PostAndReply(Open)
member _.Close () = agent.PostAndReply(Close)
member _.Balance () = agent.PostAndReply(Balance)
member _.Transaction (transaction: decimal) =
agent.PostAndReply(fun channel -> Transaction (transaction, channel))
// -- API ----
let mkBankAccount = Account
let openAccount (account: Account) =
match account.Open() with
| true -> Some account
| false -> None
let closeAccount (account: Account option) =
account |> Option.bind (fun a ->
match a.Close() with
| true -> Some a
| false -> None)
let updateBalance transaction (account: Account option) =
account |> Option.bind (fun a ->
match a.Transaction(transaction) with
| true -> Some a
| false -> None)
let getBalance (account: Account option) =
account |> Option.bind (fun a -> a.Balance())
// -- Tests ----
let should_equal expected actual =
if expected = actual then
Ok expected
else
Error (expected, actual)
let should_not_equal expected actual =
if expected <> actual then
Ok expected
else
Error (expected, actual)
let ``Returns empty balance after opening`` =
let account = mkBankAccount() |> openAccount
getBalance account |> should_equal (Some 0.0m)
let ``Check basic balance`` =
let account = mkBankAccount() |> openAccount
let openingBalance = account |> getBalance
let updatedBalance =
account
|> updateBalance 10.0m
|> getBalance
openingBalance |> should_equal (Some 0.0m),
updatedBalance |> should_equal (Some 10.0m)
let ``Balance can increment or decrement`` =
let account = mkBankAccount() |> openAccount
let openingBalance = account |> getBalance
let addedBalance =
account
|> updateBalance 10.0m
|> getBalance
let subtractedBalance =
account
|> updateBalance -15.0m
|> getBalance
openingBalance |> should_equal (Some 0.0m),
addedBalance |> should_equal (Some 10.0m),
subtractedBalance |> should_equal (Some -5.0m)
let ``Account can be closed`` =
let account =
mkBankAccount()
|> openAccount
|> closeAccount
getBalance account |> should_equal None,
account |> should_not_equal None
#time
let ``Account can be updated from multiple threads`` =
let account =
mkBankAccount()
|> openAccount
let updateAccountAsync =
async {
account
|> updateBalance 1.0m
|> ignore
}
let nb = 10 // 10 is quick (2ms), 20 is so long (9s)
updateAccountAsync
|> List.replicate nb
|> Async.Parallel
|> Async.RunSynchronously
|> ignore
getBalance account |> should_equal (Some (decimal nb))
#time
Your problem is that your code don't uses Async all the way up.
Your Account class has the method
Open,Close,BalanceandTransactionand you use aAsyncReplyChannelbut you usePostAndReplyto send the message. This means: You send a message to the MailboxProcessor with a channel to reply. But, at this point, the method waits Synchronously to finish.Even with
Async.Paralleland multiple threads it can mean a lot of threads lock themsels. If you change all your Methods to usePostAndAsyncReplythen your problem goes away.There are two other performance optimization that can speed up performance, but are not critical in your example.
Calling the Length of a list is bad. To calculate the length of a list, you must go through the whole list. You only use this in Transaction to print the length, but consider if the transaction list becomes longer. You alway must go through the whole list, whenever you add a transaction. This will be O(N) of your transaction list.
The same goes for calling (List.sum). You have to calculate the current Balance whenever you call Balance. Also O(N).
As you have a MailboxProcessor, you also could calculate those two values instead of completly recalculating those values again and again.Thus, they become O(1) operations.
On top, i would change the
Open,CloseandTransactionmessages to return nothing, as in my Opinion, it doesn't make sense that they return anything. Your examples even makes me confused of what theboolreturn values even mean.In the
Closemessage you returnstate.Openedbefore you set it to false. Why?In the
Openmessage you return the negatedstate.Opened. How you use it later it just looks wrong.If there is more meaning behind the
boolplease make a distinct Discriminated Union out of it, that describes the purpose of what it returns.You used an
option<Acount>throughout your code, i removed it, as i don't see any purpose of it.Anyway, here is a full example, of how i would write your code that don't have the speed problems.