Process unknown number of observables in RxJS pipe

232 Views Asked by At

I make thousands of calls to my server but in order to avoid overloading, I have put in place a concurrency limit of 10 with RxJS:

const calls = new Subject();
calls.pipe(
    mergeAll(10)
).subscribe();

while(int < unknown_number){
    calls.next(new Observable((observer) => {
        // Call the server here
    }))
}

The problem is that I don't know how many calls will be made and I need to know when the job is done. One approach is to get when nothing is left in the queue for 5 seconds or so.

How can I do that?

3

There are 3 best solutions below

0
Picci On

After reading your comment, I think the answer lays in issuing a complete command against the Subject as soon as we know there is no more data to be read from the DB.

So, is a sort of pseudo-code, this could be the draft for the solution

// callToServer is a function that calls the server and returns an Observable
const callToServer: Observable<any> = (input) => httpClient.post(url, input);

const calls = new Subject<any>();
calls.pipe(
    // margeMap allows to set concurrency level as second parameter
    mergeMap(input => callToServer(input), 10)
).subscribe(
   {
      next: response => {
        // do something with the response
      },
      error: err => {
        // manage error occurrences
      },
      complete: () => {
        // here you come when the Subject completes
      }
   }
);

const cursor = db.readFromMyTable();

while(cursor has still data){
    const data = cursor.getNext(100);
    // for each record issue a next against the calls Subject
    data.forEach(d => calls.next(d));
}

// when there are no more records completes the calls Subject
0
Християн Христов On

After i read trough you comment of the flow, I think that you don't need merge map, as each new request will be after the previous one is completed, so i will suggest the following approach;

let {
  of,
  Subject
} = rxjs;
let {
  switchMap,
  delay,
  scan
} = rxjs.operators;

const db = [1, 2, 3, 4, 5, 6, 7];

const requester = new Subject();

const mockRequest = ({ page, items }) =>
  of(db.slice(page * items, (page + 1) * items)).pipe(delay(500));

const ITEMS_PER_PAGE = 2;
requester
  .pipe(
    switchMap(pageData => mockRequest(pageData)),
    scan(
      (acc, x) => {
        const newIdx = +acc[0] + 1;
        return [newIdx, x];
      },
      [0, []]
    )
  )
  .subscribe(([i, x]) => {
    if (x.length !== 0) {
      requester.next({ page: i, items: ITEMS_PER_PAGE });
      console.log(x);
    } else {
      console.log("Done and ready to unsubscribe");
    }
  });
requester.next({ page: 0, items: ITEMS_PER_PAGE });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.js"></script>
(You can run the code snippet directly here or here stackBlitz)

The thing I'm doing here is calling the db while there items inside of it. In your case you should just call each time for 100 elements, while there are elements returned from the db.

0
Roberto Zvjerković On

Your don't really need to create your own streams, just:

Start with a subject that will call once:

const call$ = new BehaviorSubject(0);

call$.pipe(
    concatMap(call => {
        return callYourDatabase(call); // Pagination or whatever
    }),
    tap(res => {
        if (res) {
            call$.next(db); // Call again after completion if the previous has values
        }
    })
)