I have implemented notification system in SpringBoot using WebFlux, below is my sse endpoint
@GetMapping(path = "/backoffice/sse/notifications")
public Flux<ServerSentEvent<NotificationData>> sse()
{
return this.sseNotificationService.subscribe();
}
And here is the implementation of SSENotificationService
@Component
public class SSENotificationService
{
private static final Logger LOGGER = LoggerFactory.getLogger(SSENotificationService.class);
private final Flux<ServerSentEvent<NotificationData>> notificationFlux;
private final NotificationRepository notificationRepository;
private final UserService userService;
public SSENotificationService(
NotificationRepository notificationRepository,
UserService userService
)
{
this.notificationRepository = notificationRepository;
this.userService = userService;
notificationFlux = Flux.push(this::generateNotifications);
}
@Nonnull
private Flux<ServerSentEvent<NotificationData>> keepAlive(
@Nonnull Duration duration,
@Nonnull Flux<ServerSentEvent<NotificationData>> data,
@Nonnull String id
)
{
Flux<ServerSentEvent<NotificationData>> heartBeat = Flux.interval(duration)
.map(_ -> ServerSentEvent.<NotificationData>builder()
.event("comment")
.comment(STR."keep alive for: \{id}")
.build())
.doFinally(_ -> LOGGER.info("Heartbeat closed for id: {}", id));
return Flux.merge(heartBeat, data);
}
@Nonnull
public Flux<ServerSentEvent<NotificationData>> subscribe()
{
var userIdOrSystem = userService.userIdOrSystem();
return keepAlive(Duration.ofSeconds(7), notificationFlux, userIdOrSystem);
}
private void generateNotifications(@Nonnull FluxSink<ServerSentEvent<NotificationData>> sink)
{
var userIdOrSystem = userService.userIdOrSystem();
Flux.interval(Duration.ofSeconds(5))
.flatMap(_ -> {
var pendingNotifications = this.notificationRepository.pendingNotifications(userIdOrSystem);
return Flux.fromIterable(pendingNotifications)
.map(notificationData -> {
this.notificationRepository.updateNotificationStatus(notificationData.id());
return ServerSentEvent.<NotificationData>builder()
.id(notificationData.id())
.data(notificationData)
.event("message")
.build();
});
})
.doOnNext(sink::next)
.onErrorResume(throwable -> {
LOGGER.error("An error occurred while processing notifications: {}", throwable.getMessage());
return Flux.empty();
})
.doFinally(signalType -> LOGGER.debug(signalType.toString()))
.takeWhile(_ -> !sink.isCancelled())
.subscribe();
}
}
This imlementation works fine when I test with postman,

But I'm having trouble handling notifications on client side with React.js, I have tried everything, but cannot receive events, sse.onopen function gets called when i call the endpoint, but onmessage won't work for some reason, any ideas why ?