I attempted to incorporate Server-Sent Events (SSE) using FASTAPI, aiming to establish a unified Redis subscriber responsible for broadcasting data to the EventSource:
from sse_starlette import EventSourceResponse
from db.database import REDIS_SERVER
from dotenv import load_dotenv
from fastapi import APIRouter
load_dotenv()
router = APIRouter()
async def subscribe_to_redis_channel(channel_id: str):
pubsub = REDIS_SERVER.pubsub()
await pubsub.subscribe(channel_id)
async for message in pubsub.listen():
if message["type"] == "message":
message["data"] = message["data"].decode("utf-8")
message["channel"] = message["channel"].decode("utf-8")
yield message['data']
@router.get("/ltp-stream/{channel}")
def stream(channel: str):
return EventSourceResponse(subscribe_to_redis_channel(channel), media_type="text/event-stream")
I've tried implementing this:
from sse_starlette import EventSourceResponse
from db.database import REDIS_SERVER
from dotenv import load_dotenv
from fastapi import APIRouter
load_dotenv()
router = APIRouter()
global_subscriber = None
async def subscribe_to_redis_channel(channel_id: str):
global global_subscriber
if not global_subscriber:
pubsub = REDIS_SERVER.pubsub()
await pubsub.subscribe(channel_id)
global_subscriber = pubsub
else:
pubsub = global_subscriber
async for message in pubsub.listen():
if message["type"] == "message":
message["data"] = message["data"].decode("utf-8")
message["channel"] = message["channel"].decode("utf-8")
yield message['data']
@router.get("/ltp-stream/{channel}")
def stream(channel: str):
return EventSourceResponse(subscribe_to_redis_channel(channel), media_type="text/event-stream")
This works fine till the request is stopped an I get:
ERROR: ASGI callable returned without completing response.
Afterwards this error is persistant and cannot access the route.
Is it possible to mimic the behaviour from the first code but creating only a single connection to the redis channel? If yes, please do share your thoughts on how I can achieve that. Or maybe point out what I am doing wrong.