I am working on an angular application.
Instead of making an API call for every record, I want to make API calls for a batch of items after a certain time period when there are no new items being added to the queue(observable queue).
In my case if there are 'NO' new items being added to the batchChangesQueue observable for more than 5 seconds then I want to execute an API call with all the accumulated items in the batch.
However with the code I tried I always get a single item instead of an array of items. I tried various code samples which uses switchmap,concatmap,toArray, etc
switchMap(() => this.batchChangesQueue.pipe(toArray()))
concatMap(() => this.batchChangesQueue.pipe(toArray()))
also used timer(5000).pipe(take(1)) and takeUntil(timer(5000))
But nothing seems to be working, can any one suggest what is being done wrong in my code or if there is a simpler way to implement what I wanted to achieve?
My service class has the following code.
The changeListener$ is being called from a different component using .next()
@Injectable({
providedIn: 'root'
})
export class EventPublisher {
private changeListener$ = new Subject < any > ();
private batchChangesQueue = new Subject < unknown > ();
private bufferSize = 3;
constructor() {
this.processBatch();
this.subscribeToDataChanges();
}
private subscribeToDataChanges() {
this.changeListener$.subscribe(async(data: {
name: string,
changes: any
}) => {
await this.saveChanges(data.name, this.convertToString(data.changes));
});
}
public async processBatch() {
this.batchChangesQueue.pipe(
debounceTime(5000),
switchMap(() => this.batchChangesQueue.pipe(buffer(this.batchCellChangesQueue)))
).subscribe({
next: batch => {
console.log('Processing batch:', batch);
// Perform API calls here
},
error: err => console.error('Error processing batch:', err),
});
}
public async saveChanges(sheetName: string, cellChanges: any) {
this.batchChangesQueue.next({
name,
cellChanges
});
}
}
I think you can simply use
bufferTimeoperator to specify a time and a buffer limit.Implementation could look something like this:
Note: It looks like you could probably simplify down to use a single subject since you are subscribing to one only to emit the results into the other one.