I am working on an angular application.

Instead of making an API call for every record, I want to make API calls for a batch of items after a certain time period when there are no new items being added to the queue(observable queue).

In my case if there are 'NO' new items being added to the batchChangesQueue observable for more than 5 seconds then I want to execute an API call with all the accumulated items in the batch.

However with the code I tried I always get a single item instead of an array of items. I tried various code samples which uses switchmap,concatmap,toArray, etc

switchMap(() => this.batchChangesQueue.pipe(toArray()))

concatMap(() => this.batchChangesQueue.pipe(toArray()))

also used timer(5000).pipe(take(1)) and takeUntil(timer(5000))

But nothing seems to be working, can any one suggest what is being done wrong in my code or if there is a simpler way to implement what I wanted to achieve?

My service class has the following code.

The changeListener$ is being called from a different component using .next()

@Injectable({
  providedIn: 'root'
})
export class EventPublisher {
  private changeListener$ = new Subject < any > ();
  private batchChangesQueue = new Subject < unknown > ();
  private bufferSize = 3;

  constructor() {
    this.processBatch();
    this.subscribeToDataChanges();
  }
  private subscribeToDataChanges() {
    this.changeListener$.subscribe(async(data: {
      name: string,
      changes: any
    }) => {
      await this.saveChanges(data.name, this.convertToString(data.changes));
    });
  }
  public async processBatch() {
    this.batchChangesQueue.pipe(
      debounceTime(5000),
      switchMap(() => this.batchChangesQueue.pipe(buffer(this.batchCellChangesQueue)))
    ).subscribe({
      next: batch => {
        console.log('Processing batch:', batch);
        // Perform API calls here
      },
      error: err => console.error('Error processing batch:', err),
    });
  }
  public async saveChanges(sheetName: string, cellChanges: any) {
    this.batchChangesQueue.next({
      name,
      cellChanges
    });
  }
}

2

There are 2 best solutions below

0
BizzyBob On BEST ANSWER

I think you can simply use bufferTime operator to specify a time and a buffer limit.

bufferTime(5000, null, 3), // emits array every 5000ms or when 3 items collected

Implementation could look something like this:

  private changeListener$ = new Subject<{name: string, changes: any}>();
  private changesQueue = new Subject<{sheetName: string, cellChanges: any}>();
  private bufferSize = 3;
  private bufferInterval = 5000;

  private batchChangesQueue = this.changesQueue.pipe(
    bufferTime(this.bufferInterval, null, this.bufferSize),
    filter(batch => !!batch.length) // Ignore empty batches
  );

  public processBatch() {
    this.batchChangesQueue.subscribe({
      next: batch => { console.log('Processing batch:', batch); /* Perform API calls here */ },
      error:  err => console.error('Error processing batch:', err),
    });
  }

Note: It looks like you could probably simplify down to use a single subject since you are subscribing to one only to emit the results into the other one.

0
maxime1992 On

Let start by isolating the issue and re-create a minimal repro.

First of all, let's build a stream to mimic your batchChangesQueue$.

We'll create a function to get random integer from a given interval and then build the stream by emitting a random value that'll represent the number of milliseconds before the value will be emitted (between 100ms and 2000ms):

function randomIntFromInterval(min: number, max: number) {
  // min and max included
  return Math.floor(Math.random() * (max - min + 1) + min);
}

const batchChangesQueue$ = defer(() => {
  const value = randomIntFromInterval(100, 2000);
  return of(value).pipe(delay(value));
}).pipe(
  tap((value) => console.log(`Queue emitted value ${value}`)),
  repeat()
);

Then, let's build a small mock for the processing of the batch you mentioned Perform API calls here:

// simulate what could be an HTTP call to a service
function processBatch(values: number[]) {
  console.log(`Processing values at once: ${values}`);
  return of(true).pipe(
    delay(1000),
    tap(() => console.log(`Values processed`))
  );
}

Now, the real code to handle queuing the items and running them into a process batch function that returns an observable:

batchChangesQueue$
  .pipe(
    bufferTime(5000),
    concatMap((values) => processBatch(values))
  )

Here's an example of a possible output:

Queue emitted value 1438
Queue emitted value 729
Queue emitted value 361
Queue emitted value 952
Processing values at once: 1438,729,361,952
Queue emitted value 558
Queue emitted value 631
Values processed
Queue emitted value 566
Queue emitted value 726
Queue emitted value 1271
Processing values at once: 558,631,566,726,1271
Queue emitted value 670
Values processed
Queue emitted value 1923
Queue emitted value 887
Queue emitted value 1248
Queue emitted value 329
Queue emitted value 402
Processing values at once: 670,1923,887,1248,329,402
Values processed

Stackblitz live demo.