Functional listener with state

503 Views Asked by At

Let's say in my pure Scala program i have a dependency to a Java service. This Java service accepts a listener to notify me when some data changes.

Let's say the data is a tuple(x, y) and the java service calls the listener whenever X or Y changes but i'm interested only when X.

For this my listener has to save the last value of X, and forward the update/call only when oldX != X, so in order to have this my impure scala listener implementation has to hold a var oldX

val listener = new JavaServiceListener() {

 var oldX;
 def updated(val x, val y): Unit = {
  if (oldX != x) {
     oldX = x
    //do stuff
  }
}

javaService.register(listener)

How would i go about to design a wrapper for this kind of thing in Scala without val or mutable collections ? I can't at the JavaServiceListener level since i'm bound by the method signature, so I need another layer above which the java listener forwards to somehow

3

There are 3 best solutions below

0
AdrianS On BEST ANSWER

I found the solution I like with Cats and Cats-Effect:

trait MyListener {
  def onChange(n: Int): Unit
}

class MyDistinctFunctionalListener(private val last: Ref[IO, Int], consumer: Int => Unit) extends MyListener {
  override def onChange(newValue: Int): Unit = {
    val program =
      last
        .getAndSet(newValue)
        .flatMap(oldValue => notify(newValue, oldValue))

    program.unsafeRunSync()
  }

  private def notify(newValue: Int, oldValue: Int): IO[Unit] = {
    if (oldValue != newValue) IO(consumer(newValue)) else IO.delay(println("found duplicate"))
  }
}

object MyDistinctFunctionalListener {
  def create(consumer: Int => Unit): IO[MyDistinctFunctionalListener] =
    Ref[IO].of(0).map(v => new MyDistinctFunctionalListener(v, consumer))
}

val printer: Int => Unit = println(_)

val functionalDistinctPrinterIO =  MyDistinctFunctionalListener.create(printer)

functionalDistinctPrinterIO.map(fl =>
  List(1, 1, 2, 2, 3, 3, 3, 4, 5, 5).foreach(fl.onChange)
).unsafeRunSync()

More stuff about handling shared state here https://github.com/systemfw/scala-italy-2018

it is debatable if this is worth it over the private var solution

2
St.Antario On

First of all, if you are designing a purely functional program you cannot return Unit (neither Future[Unit], because Future does not suppress side effects).

If performance is not an issue I would make use of Kleisli[Option, xType, IO[Unit]] where T = Option. So the first thing you have to do is define (add the appropriate types)

def updated(oldX, x): Kleisli[Option, xType, xType] = Kleisli liftF {
   if(x != oldX) None
   else Some(x)
}

def doStuff(x, y): Kleisli[Option, xType, IO[Unit]] = Kleisli pure {
    IO{
       //doStuff
    }
}

and now you can compose them in a for-comprehension something like that:

val result: Kleisli[Option, xType, IO[Unit]] = for{
   xx <- updated(oldX, x)
   effect <- doStuff(xx, y)
} yield effect

You can perform stateful compuation with ReaderWriterStateT, so you keep oldX as a state.

0
Karl Bielefeldt On

My preference would be to wrap it in a Monix Observable, then you can use distinctUntilChanged to eliminate consecutive duplicates. Something like:

import monix.reactive._

val observable = Observable.create(OverflowStrategy.Fail(10)){(sync) =>
    val listener = new JavaServiceListener() {
      def updated(val x, val y): Unit = {
        sync.onNext(x)
      }
    }

    javaService.register(listener)
    Cancelable{() => javaService.unregister(listener)}
  }

val distinctObservable = observable.distinctUntilChanged

Reactive programming allows you to use a pure model while the library handles all the difficult stuff.