Server sent events in pedestal returns empty response

71 Views Asked by At

I am currently implementing server sent events (SSE) in an web application using Clojure, Pedestal and Jetty.

When I print the message in the backend the channel is open and when I invoke io.pedestal.http.sse/send-event it returns true, however in the frontend I get no console.log() prints in Javascript in the browser console. as if there is no data received. I tested the SSE connection in Postman and it is successful but the response is (empty).

Backend:

    (def SSE-REGISTRY (atom {}))
     
     (defn get-user-channel [token]    
       (get @SSE-REGISTRY token))
     
     (defn test-print [channel]    
       (println "channel:" channel) ;; channel:#object[clojure.core.async.impl.chan...]   
       (println "channel opened:" (not (chan/closed? channel))) ;; channel opened: true  
       (println "sent-event:" 
       (sse/send-event channel "status"
         (json/write-str {:id 1
                          :workflowId 3
                          :status :GOOD}) ;; sent-event: true
     
     (defn send-sse-msg [name data id]   
       (when-let [sse-channels (vals @SSE-REGISTRY)]
         (doseq [channel sse-channels] 
           (when-not (chan/closed? channel)
             (test-print channel)
             (sse/send-event channel name data id)))))   

(def sse-route ["/rest/sse" :get 
                  (sse/start-event-stream send-sse-msg 60 100) 
                  :route-name :sse])
     
     (defn create-sse-channel []   
        (async/chan (async/sliding-buffer 100)))

enter image description here

Frontend:

 const protocol = window.location.protocol;
const host = window.location.port; 
const sseUrl = protocol + '//'+ host + '/rest/sse';
const RUN_BUTTON_STATUS_TIME = 2000;
const sseInitOptionsMap = {
                            headers: {
                                       'Content-Type': 'text/event-stream; charset=utf-8',
                                       'Connection': 'keep-alive', 
                                       'Cache-Control': 'no-cache' 
                                      },
                            withCredentials: true,
                            https: {rejectUnauthorized: true}
                          };

export const eventSource = new EventSource(sseUrl, sseInitOptionsMap);
eventSource.addEventListener('integrationStatus', sendStatusHandler);
eventSource.addEventListener('stopAction', sendStopActionHandler);

eventSource.onopen = (e) => {
  console.log("SSE connection opened:" + e);
};

eventSource.onerror = (e) => {
  console.log("error:" + e);
  if (e.readyState == EventSource.CLOSED) {
      console.log("connection closed:" + e);
  } else {
    eventSource.close();
    console.log("SSE connection closed:" + e);
  }
};

export function sendStatusHandler(event) {
  const data = JSON.parse(event.data);
  console.log("data:" + data);
  let id = data.id;
  let workflowId = data.workflowId;
  let status = data.status;
  cljsDisp("set-global-operation-status", id, workflowId, status);
  configurtorActionRunButtonStatus(id, workflowId, status);
  setTimeout(cljsDisp("get-last-run-action-data", id, workflowId, status),
             300);
}

function configurtorActionRunButtonStatus (id, workflowId, status) {
  if (status === "GOOD") {
    showStatus(id, workflowId, true);
  } else if (status === "BAD") {
    showStatus(id, workflowId, true);
    cljsDisp("configurator-error-message-show", id, workflowId, true);
  } else {
    cljsDisp("configurator-action-run-button-status-visible?", id, workflowId, false);
  }
}

export function sendStopWorkflowHandler(event) {
  const data = JSON.parse(event.data);
  console.log("data:" + data); // prints nothing
  let workflowId = data.workflowId;
  cljsDisp("stop-action-by-sse-msg", workflowId);
}

function showStatus(integrationId, operationId, showStatus) {
  setTimeout(cljsDisp("configurator-action-run-button-status-visible?", 
                      integrationId, operationId, showStatus),
             RUN_BUTTON_STATUS_TIME);
}

export function closeSse() {
  if (eventSource.readyState != eventSource.CLOSED) {
    eventSource.close();
  }
}

enter image description here

It seems I am missing something. Can you help. The problem is that there are not much examples of Clojure with Pedestal implementation of SSE. Most are in JavaScript and other languages. This is the implementation of pedestal SSE itself

1

There are 1 best solutions below

2
Phil Cooper On

There is a small example on the pedestal docs which you could add as a separate route.

The basic issue with your code is that when your create the intercepor with:

(sse/start-event-stream send-sse-msg 60 100) 

send-sse-msg function should be one that takes 2 args, the channel and the context of the original request. Context can be deconstructed as with any other interecptor.

A slightly modified version of the docs example that uses context is:

(defn sse-stream-ready
  "Starts sending counter events to client."
  [event-ch context]
  (let [count-num (Integer/parseInt
                   (get-in context [:request :query-params :counter] "5"))]
    (loop [counter count-num]
      (async/put!
       event-ch {:name "count"
                 :data (str counter ", T: "
                            (.getId (Thread/currentThread)))})
      (Thread/sleep 2000)
      (if (> counter 1)
        (recur (dec counter))
        (do
          (async/put! event-ch {:name "close" :data "I am done!"})
          (async/close! event-ch))))))

Your code looks more like you would save the event- channel into your registry (keyed by some context element?) and have another async function sending messages to all channels (chat like is what I infer there).