Combine: Using DispatchQueue.global() as a delay scheduler causes missing published values

42 Views Asked by At

I've noticed that sometimes my combine pipeline is "stuck" when I use "DispatchQueue.global()" as a scheduler of "delay".

for i in 0..<100 {
   let scheduler = DispatchQueue.global()

   let timeoutPublisher = Just<Int>(0)
       .delay(for: .seconds(0.1), scheduler: scheduler)

   let anotherJust = Just(1)
   var ok = false            
   
   self.cancellable = anotherJust
         .merge(with: timeoutPublisher)
         .sink { [weak self] state in                    
            if state == 0 {
               ok = true 
            }
         }
   
   try? await Task.wait(.seconds(1))
   // ...
}

In this code snippet, sometimes the "ok" is never set to true. It only happens occasionally so I suspect it's some threading issue. However, when it gets into that invalid state, the value "0" is never published.

When I switch scheduler to "DispatchQueue.main", it always works.

Anyone could explain what am I missing here?

1

There are 1 best solutions below

0
Scott Thompson On BEST ANSWER

The problem has to do with the timing and when the pipelines publish their "finish" events.

I took your code and wrapped it up in a XCTestCase

class TestDelay : XCTestCase {
  func testDelayOperator() {
    print("running the test")
    let scheduler = DispatchQueue.global()

    let timeoutPublisher = Just(0)
      .print("just 0")
      .delay(for: .seconds(0.1), scheduler: scheduler)

    let expectZero = expectation(description: "Expect 0 to be received.")
    let cancellable = Just(1)
      .merge(with: timeoutPublisher)
      .print("just 1")
      .sink { completion in
        switch completion {
        case .finished:
          print("finished")
        case .failure(_):
          print("failure")
        }
      } receiveValue: { state in
        print("State is \(state)")
        if state == 0 {
          expectZero.fulfill()
        }
      }

    wait(for: [expectZero], timeout:  1.0)
  }
}

And then I started a timer at intervals of 1 second and ran the test watching the output until the test failed:

var cancellable = Timer.publish(every: 1.0, on: .main, in: .common)
  .autoconnect()
  .sink { _ in
    TestDelay.defaultTestSuite.run()
  }

Here's the transcript from a successful test:

Test Suite 'TestDelay' started at 2024-01-29 15:01:17.261.
Test Case '-[__lldb_expr_96.TestDelay testDelayOperator]' started.
running the test
just 1: receive subscription: (Merge)
just 1: request unlimited
just 1: receive value: (1)
State is 1
just 0: receive subscription: (Just)
just 0: request unlimited
just 0: receive value: (0)
just 0: receive finished
just 1: receive value: (0)
State is 0
just 1: receive finished
finished
Test Case '-[__lldb_expr_96.TestDelay testDelayOperator]' passed (0.103 seconds).
Test Suite 'TestDelay' passed at 2024-01-29 15:01:17.364.
     Executed 1 test, with 0 failures (0 unexpected) in 0.103 (0.103) seconds

You can see that the "just 1" pipeline received the value 0 before it receives the "just 1's" finished event and the test succeeded.

Here's a example of a failed test:

est Suite 'TestDelay' started at 2024-01-29 15:01:12.263.
Test Case '-[__lldb_expr_96.TestDelay testDelayOperator]' started.
running the test
just 1: receive subscription: (Merge)
just 1: request unlimited
just 1: receive value: (1)
State is 1
just 0: receive subscription: (Just)
just 0: request unlimited
just 0: receive value: (0)
just 0: receive finished
just 1: receive finished
finished
<unknown>:0: error: -[__lldb_expr_96.TestDelay testDelayOperator] : Asynchronous wait failed: Exceeded timeout of 1 seconds, with unfulfilled expectations: "Expect 0 to be received.".
Test Case '-[__lldb_expr_96.TestDelay testDelayOperator]' failed (1.021 seconds).
Test Suite 'TestDelay' failed at 2024-01-29 15:01:13.287.
     Executed 1 test, with 1 failure (0 unexpected) in 1.021 (1.024) seconds

Notice that both finished events arrived before the just 1 pipeline received a value of 0.

Since the global dispatch queues are not serial queues. I suspect that the delay operator is creating separate, parallel blocks for the "publish a 0" event and the "publish a finish" event. Your test succeeds or fails depending on which of those gets delivered first. When you use the main queue, that is a serial queue and the problem goes away. Try using a serial queue that you create yourself as the scheduler for your delay and see if that resolves your problem.