I have a worker fastapi. This api basically creates worker functions and then some data will be processed. My problem arises at the start of these workers. When I first try to make a request, the request stays on hold forever. It doesn't reach the api. When I cancel the request and make another one, everything works perfectly, my workers start.
This is the endpoint which I'm using to create workers:
@app.get("/connectors")
async def start_process(conf: Union[Connector, None] = None):
print(conf)
conf_list = conf.config[0]
if not conf_list:
return {"message": "Empty configuration list"}
# os.environ.update({str(i[0]): i[1] for i in conf_list})
log.info(f"Cassandra Host: {conf_list.cassandra_host}, Type: {type(conf_list.cassandra_host)}")
globalConfig.set(conf_list.cassandra_host,
conf_list.cassandra_port,
conf_list.cassandra_keyspace,
conf_list.kafka_bootstrap_server,
conf_list.topic)
# Check if the process with the same name already exists
existing_process = next((p for p in processes if p["name"] == conf.name), None)
if existing_process:
return {"message": "Connector is not created. Duplicated error."}
p = multiprocessing.Process(target=main, args=(len(processes) + 1,conf_list.connector_class,globalConfig,))
p.start()
processes.append({"p": p, "name": conf.name})
return {"message": f"Started process with ID {p.pid} or {conf.name}"}
This is the body I'm sending to the api:
{
"name":"cassandra-sink-dog1",
"config":[
{
"connector_class":"io.inovasyon.connect.cassandra.ShardedSinkConnector",
"cassandra_host":"192.168.2.165",
"cassandra_port":"9042",
"cassandra_keyspace":"orion_dog",
"kafka_bootstrap_server":"192.168.2.165:29092",
"topic":"eys.orion-dog.entities"
}
]
}
Why is my first request being blocked and I need to cancel it and retry it for it to work?
Edit 1: The entrypoint shown below;
from fastapi import FastAPI
import multiprocessing
from config.Config import Config
from typing import List,Union
from pydantic import BaseModel
from functions.workerReplication import ReplicationSinkConnector
from functions.workerShard import KafkaConsumerSinkConnector
from functions.workerLogstash import KafkaConsumerLogstashConnector
import logging
import coloredlogs
import uvicorn
from termcolor import colored
coloredlogs.install(level="INFO")
log = logging.getLogger("Functions")
app = FastAPI()
processes = []
globalConfig = Config()
class ConnectorConf(BaseModel):
connector_class: str
cassandra_port: str
cassandra_host: str
cassandra_keyspace: str
kafka_bootstrap_server: str
topic: str
class Connector(BaseModel):
name: str
config: List[ConnectorConf]
def main(num,classType,globalConfig):
"""A function that simulates a process"""
log.info(colored("Worker: ","blue")+ "{}".format(num) + colored(" Running","green"))
if "ShardedSinkConnector" in str(classType):
log.info(colored("ShardedSinkConnector: ","green")+ colored(" started.","green"))
KafkaConsumerSinkConnector().workerKafkaShard(globalConfig)
if "ReplicationSinkConnector" in str(classType):
log.info(colored("ReplicationSinkConnector: ","green")+ colored(" started.","green"))
ReplicationSinkConnector.workerCassandraReplication(globalConfig)
if "LogstashSinkConnector" in str(classType):
log.info(colored("LogstashSinkConnector: ","green")+ colored(" started.","green"))
KafkaConsumerLogstashConnector.workerKafkaLogstash(globalConfig,classType)
@app.get("/connectors")
async def start_process(conf: Union[Connector, None] = None):
conf_list = conf.config[0]
# os.environ.update({str(i[0]): i[1] for i in conf_list})
log.info(f"Cassandra Host: {conf_list.cassandra_host}, Type: {type(conf_list.cassandra_host)}")
globalConfig.set(conf_list.cassandra_host,
conf_list.cassandra_port,
conf_list.cassandra_keyspace,
conf_list.kafka_bootstrap_server,
conf_list.topic)
# Check if the process with the same name already exists
existing_process = next((p for p in processes if p["name"] == conf.name), None)
if existing_process:
return {"message": "Connector is not created. Duplicated error."}
p = multiprocessing.Process(target=main, args=(len(processes) + 1,conf_list.connector_class,globalConfig,))
p.start()
processes.append({"p": p, "name": conf.name})
return {"message": f"Started process with ID {p.pid} or {conf.name}"}
@app.get("/connectors/{name}")
async def stop_process(name: Union[str, None] = None):
for idx, p in enumerate(processes):
if p["name"] == name:
print(p["p"])
process_id = p["p"].pid
p = p["p"]
p.terminate()
processes.pop(idx)
log.info(f"Stopped process with ID: {process_id} - {name} Stopped")
# log.info(colored("message: Stopped process with ID ","blue")+ "{}".format(id)+ "\n{}".format(name) + colored(" Stopped","red"))
return {"message": f"Process with ID {id} not found"}
# if __name__ == "__main__":
# uvicorn.run(app, host="0.0.0.0", port=8000)
# python -m uvicorn CassandraBackend:app --host 0.0.0.0 --port 8000 --reload