how to turn multiple observables into a completable?

100 Views Asked by At

I'm new to RxSwift and just inherited an old codebase, so please forgive me if this is a silly question.

In the code, data gets updated with the help of Completables and Observables. Below are two methods that loosely illustrate how that's done (obfuscated a bit for privacy purposes):

// note: `getNewData()` returns an Observable
func refreshData() -> Completable {
    dataManager.getNewData()
        .map { DataRepresentation(fromObject: $0) }
        .take(1)
        .asSingle()
        .flatMapCompletable { data in
            self.storageManager.save(data: data)
        }
}

// STORAGE MANAGER

func save(data: DataRepresentation) -> Completable {
    do {
        // PSEUDOCODE: save the data, emit an event about it if necessary.
        return Completable.completed()
    } catch let error {
        return Completable.error(error)
    }
}

So, my question is this: let's assume getNewData() allows me to pass in some parameters which will make it so that I don't just get the same data back every time. Moreover, let's say I want to call that method n times, wait for all the calls to come back, then still return a Completable from refreshData() (as to not need to change its signature). Is that sort of thing possible? I was looking into .zip but I'm not sure if it applies here. Thanks.

1

There are 1 best solutions below

11
Daniel T. On BEST ANSWER

Here is an updated solution based on all the comments:

final class Example {
    let dataManager: DataManager
    let storageManager: StorageManager

    init(dataManager: DataManager, storageManager: StorageManager) {
        self.dataManager = dataManager
        self.storageManager = storageManager
    }

    func refreshData() -> Completable {
        let myInputs = [1,2,3]
        return Observable.zip(myInputs.map(dataManager.getNewData))
            .map { $0.map(DataRepresentation.init(fromObject:)) }
            .asSingle()
            .flatMapCompletable { [storageManager] representations in
                storageManager.save(representations)
            }
    }
}

struct DataManager {
    let getNewData: (Int) -> Observable<Data>
}

struct StorageManager {
    let save: ([DataRepresentation]) -> Completable
}

struct DataRepresentation: Equatable {
    let fromObject: Data
}

Here is a test harness showing that it works (with extensive comments):

final class ExampleTests: XCTestCase {
    let scheduler = TestScheduler(initialClock: 0)
    // monitors the values sent to DataManager's `getNewData`
    lazy var dataManagerArgs = scheduler.createObserver(Int.self)
    // monitors the values sent to StorageManger's `save`.
    lazy var storageManagerArgs = scheduler.createObserver([DataRepresentation].self)
    // expect to receive an array of three `DataRepresentation` objects one second after the completable is
    //   subscribed to
    let expectedStorageManagerArgs = parseTimeline(
        "-*",
        values: [
            "*": [
                DataRepresentation(fromObject: "A".data(using: .utf8)!),
                DataRepresentation(fromObject: "B".data(using: .utf8)!),
                DataRepresentation(fromObject: "C".data(using: .utf8)!)
            ]
        ])
        .offsetTime(by: 200)

    func test_happy_path() {
        // this DataManager sends either "A", "B", or "C" and then a completed event for each subscription.
        let dataManager = createDataManager(timeline: { "-\($0)|" })

        // this StorageManager sends a completed event after it's subscribed to.
        let storageManager = createStorageManager(timeline: { _ in "-|" })

        let sut = Example(dataManager: dataManager, storageManager: storageManager)
        let result = scheduler.start {
            sut.refreshData() // call `refreshData()` and record the result of the Completable event.
        }

        // expect to receive all three values as `refreshData` is called.
        let expectedDataManagerArgs = parseTimelineEvents("*", values: { _ in [1, 2, 3] }).offsetTime(by: 100)
        XCTAssertEqual(dataManagerArgs.events, expectedDataManagerArgs[0])

        XCTAssertEqual(storageManagerArgs.events, expectedStorageManagerArgs[0])

        // expect for the `refreshData` completable to complete after the `save` completes.
        let expectedResult = parseTimeline("--|", values: { _ in fatalError() }).offsetTime(by: 200)
        XCTAssertEqual(result.events, expectedResult[0])
    }

    func test_getNewData_failure() {
        // this DataManager sends either "A", or "C", and then a completed event for subscriptions 1 and 3.
        //   it sends an error for subscription 2
        let dataManager = createDataManager(timeline: { $0 == 2 ? "-#" : "-\($0)|" })

        // this StorageManager sends a completed event after it's subscribed to.
        let storageManager = createStorageManager(timeline: { _ in "-|" })

        let sut = Example(dataManager: dataManager, storageManager: storageManager)
        let result = scheduler.start {
            sut.refreshData() // call `refreshData()` and record the result of the Completable event.
        }

        // expect to receive all three values as `refreshData` is called.
        let expectedDataManagerArgs = parseTimelineEvents("*", values: { _ in [1, 2, 3] }).offsetTime(by: 100)
        XCTAssertEqual(dataManagerArgs.events, expectedDataManagerArgs[0])

        // a single error from `getNewData` means no save events.
        XCTAssertEqual(storageManagerArgs.events, [])

        // expect for the `refreshData` completable to emit an error when the getNewData fails.
        let expectedResult = parseTimeline("-#", values: { _ in fatalError() }).offsetTime(by: 200)
        XCTAssertEqual(result.events, expectedResult[0])
    }

    func test_save_error() {
        // this DataManager sends either "A", "B", or "C" and then a completed event for each subscription.
        let dataManager = createDataManager(timeline: { "-\($0)|" })

        // this StorageManager sends an error one second after it's subscribed to.
        let storageManager = createStorageManager(timeline: { _ in "-#" })

        let sut = Example(dataManager: dataManager, storageManager: storageManager)
        let result = scheduler.start {
            sut.refreshData() // call `refreshData()` and record the result of the Completable event.
        }

        // expect to receive all three values as `refreshData` is called.
        let expectedDataManagerArgs = parseTimelineEvents("*", values: { _ in [1, 2, 3] }).offsetTime(by: 100)
        XCTAssertEqual(dataManagerArgs.events, expectedDataManagerArgs[0])

        XCTAssertEqual(storageManagerArgs.events, expectedStorageManagerArgs[0])

        // expect for the `refreshData` completable to emit an error when the save fails.
        let expectedResult = parseTimeline("--#", values: { _ in fatalError() }).offsetTime(by: 200)
        XCTAssertEqual(result.events, expectedResult[0])
    }

    func createDataManager(timeline: @escaping (Int) -> String) -> DataManager {
        DataManager(getNewData: scheduler.mock(
            args: dataManagerArgs,
            values: ["1": "A".data(using: .utf8)!, "2": "B".data(using: .utf8)!, "3": "C".data(using: .utf8)!],
            timelineSelector: timeline
        ))
    }

    func createStorageManager(timeline: @escaping ([DataRepresentation]) -> String) -> StorageManager {
        StorageManager(save: { [scheduler, storageManagerArgs] value in
            scheduler.mock(args: storageManagerArgs, values: ["N": ()], timelineSelector: timeline)(value)
                .ignoreElements()
                .asCompletable()
        })
    }
}