Single Global Redis Subscriber

47 Views Asked by At

I attempted to incorporate Server-Sent Events (SSE) using FASTAPI, aiming to establish a unified Redis subscriber responsible for broadcasting data to the EventSource:

from sse_starlette import EventSourceResponse
from db.database import REDIS_SERVER

from dotenv import load_dotenv
from fastapi import APIRouter

load_dotenv()

router = APIRouter()


async def subscribe_to_redis_channel(channel_id: str):
    pubsub = REDIS_SERVER.pubsub()
    await pubsub.subscribe(channel_id)

    async for message in pubsub.listen():
        if message["type"] == "message":
            message["data"] = message["data"].decode("utf-8")
            message["channel"] = message["channel"].decode("utf-8")
            yield message['data']


@router.get("/ltp-stream/{channel}")
def stream(channel: str):
    return EventSourceResponse(subscribe_to_redis_channel(channel), media_type="text/event-stream")

I've tried implementing this:

from sse_starlette import EventSourceResponse
from db.database import REDIS_SERVER

from dotenv import load_dotenv
from fastapi import APIRouter

load_dotenv()

router = APIRouter()
global_subscriber = None


async def subscribe_to_redis_channel(channel_id: str):
    global global_subscriber
    if not global_subscriber:
        pubsub = REDIS_SERVER.pubsub()
        await pubsub.subscribe(channel_id)
        global_subscriber = pubsub
    else:
        pubsub = global_subscriber

    async for message in pubsub.listen():
        if message["type"] == "message":
            message["data"] = message["data"].decode("utf-8")
            message["channel"] = message["channel"].decode("utf-8")
            yield message['data']


@router.get("/ltp-stream/{channel}")
def stream(channel: str):
    return EventSourceResponse(subscribe_to_redis_channel(channel), media_type="text/event-stream")

This works fine till the request is stopped an I get:

ERROR: ASGI callable returned without completing response.

Afterwards this error is persistant and cannot access the route.

Is it possible to mimic the behaviour from the first code but creating only a single connection to the redis channel? If yes, please do share your thoughts on how I can achieve that. Or maybe point out what I am doing wrong.

0

There are 0 best solutions below