I already have IB Gateway setup on my local machine. When I run my app I get the error: "RuntimeError: This event loop is already running". Is it possible to have ib_insync running in a FastAPI app? If so how can I do it? Here is my existing code:
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .routers import auth, profile
from decouple import config
from .utils.constants import app_name
from ib_insync import IB
FRONTEND_DOMAIN = config("FRONTEND_DOMAIN")
app = FastAPI()
ib = IB()
ib.connect("127.0.0.1", 7497, clientId=1)
app.add_middleware(
CORSMiddleware,
allow_origins=[FRONTEND_DOMAIN],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(auth.router)
app.include_router(profile.router)
@app.get("/")
async def home():
return {"Welcome to the": f"{app_name} API"}
I also want to be able to access ib from all files in my app. How can I do this?
I have this running for both ib_insync and MQTT inside of FastAPI. My startup code looks like this (more or less - I took out stuff not related to the actual startup to show just the relevant bits):
The lifespan method is the key. It tells FastAPI to start other things when it starts, and then to shut them down when it ends. This starts MQTT and IB in their own loops, and then it automatically connects to IBKR and checks that connection every five seconds, reconnecting if it drops.
I'm new to asyncio, so this may not be the best way to do it, but it's the only way I could figure out to make it work at all.