How can I configure ib_insync to run in a FastAPI app?

210 Views Asked by At

I already have IB Gateway setup on my local machine. When I run my app I get the error: "RuntimeError: This event loop is already running". Is it possible to have ib_insync running in a FastAPI app? If so how can I do it? Here is my existing code:

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .routers import auth, profile
from decouple import config
from .utils.constants import app_name
from ib_insync import IB

FRONTEND_DOMAIN = config("FRONTEND_DOMAIN")

app = FastAPI()


ib = IB()
ib.connect("127.0.0.1", 7497, clientId=1)


app.add_middleware(
    CORSMiddleware,
    allow_origins=[FRONTEND_DOMAIN],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

app.include_router(auth.router)
app.include_router(profile.router)


@app.get("/")
async def home():
    return {"Welcome to the": f"{app_name} API"}

I also want to be able to access ib from all files in my app. How can I do this?

1

There are 1 best solutions below

0
monachus On

I have this running for both ib_insync and MQTT inside of FastAPI. My startup code looks like this (more or less - I took out stuff not related to the actual startup to show just the relevant bits):

from gmqtt import Client as MQTTClient
from fastapi_mqtt import FastMQTT, MQTTConfig

from ib_insync import IB

# config file
with open('config.yaml') as f:
    config = safe_load(f)

# IBKR
ib = IB()
ib.RequestTimeout = 10.0

mqtt_config = MQTTConfig(
    host=config['MQTT']['BROKER_URL'],
    port=config['MQTT']['PORT']
)
mqtt = FastMQTT(config = mqtt_config)

async def connect_ibkr():
    while True:
        try:
            if not ib.isConnected():
                await ib.connectAsync(config['IBKR']['HOST'], config['IBKR']['PORT'], 0)
        except Exception as e:
            pass

        await asyncio.sleep(5)

@asynccontextmanager
async def lifespan(app: FastAPI):
    '''startup and shutdown events'''
    asyncio.create_task(mqtt.mqtt_startup())
    asyncio.create_task(connect_ibkr())
    yield
    await ib.disconnect()
    await mqtt.mqtt_shutdown()
    print("Shutting down.")

app = FastAPI(lifespan=lifespan)

The lifespan method is the key. It tells FastAPI to start other things when it starts, and then to shut them down when it ends. This starts MQTT and IB in their own loops, and then it automatically connects to IBKR and checks that connection every five seconds, reconnecting if it drops.

I'm new to asyncio, so this may not be the best way to do it, but it's the only way I could figure out to make it work at all.