I am a Python novice, and I am trying to develop an app using Flask. The app has a feature that needs to execute some processing on the gunicorn master using the server hook and perform other actions on the workers.
During debugging, I found that the app runs normally on Pycharm and exit properly, but when I try to test it on linux server by using Gunicorn, it crashes sometimes. When shutting down the gunicorn master, I always encounter errors. Additionally, workers' threads occasionally hang, and they remain stuck until I kill them using kill -9.
I have tried some solutions I can find by GPT or google, but it seems that none of them can solve these problems.
For example, I tried putting the Redis operations into gevent.spawn_raw(self.shutdown), but I still cannot gracefully shut down my workers.
I have print my code here, I think there must be something That I missed. Could anyone help me for this case?
The Error is :
[2024-03-08 19:22:52 +0800] [16485] [ERROR] Exception in worker process
Traceback (most recent call last):
File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/gunicorn/arbiter.py", line 589, in spawn_worker
worker.init_process()
File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/gunicorn/workers/ggevent.py", line 146, in init_process
super().init_process()
File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/gunicorn/workers/base.py", line 142, in init_process
self.run()
File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/gunicorn/workers/ggevent.py", line 87, in run
gevent.sleep(1.0)
File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/gevent/hub.py", line 166, in sleep
hub.wait(t)
File "src/gevent/_hub_primitives.py", line 46, in gevent._gevent_c_hub_primitives.WaitOperationsGreenlet.wait
File "src/gevent/_hub_primitives.py", line 55, in gevent._gevent_c_hub_primitives.WaitOperationsGreenlet.wait
File "src/gevent/_waiter.py", line 154, in gevent._gevent_c_waiter.Waiter.get
File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_greenlet_primitives.py", line 65, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_gevent_c_greenlet_primitives.pxd", line 35, in gevent._gevent_c_greenlet_primitives._greenlet_switch
File "/data/code/my_demo_app/common/extensions/extension_myapp.py", line 64, in _shutdown_signal_handler
self.shutdown()
File "/data/code/my_demo_app/common/extensions/extension_myapp.py", line 75, in shutdown
self.redis_client.delete("test-key")
File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/redis/commands/core.py", line 1712, in delete
return self.execute_command("DEL", *names)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/redis/client.py", line 533, in execute_command
conn = self.connection or pool.get_connection(command_name, **options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/redis/connection.py", line 1086, in get_connection
connection.connect()
File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/redis/connection.py", line 264, in connect
sock = self.retry.call_with_retry(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/redis/retry.py", line 46, in call_with_retry
return do()
^^^^
File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/redis/connection.py", line 265, in <lambda>
lambda: self._connect(), lambda error: self.disconnect(error)
^^^^^^^^^^^^^^^
File "/data/code/my_demo_app## Heading ##/venv/lib/python3.11/site-packages/redis/connection.py", line 595, in _connect
for res in socket.getaddrinfo(
^^^^^^^^^^^^^^^^^^^
File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/gevent/_socketcommon.py", line 247, in getaddrinfo
addrlist = get_hub().resolver.getaddrinfo(host, port, family, type, proto, flags)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/gevent/resolver/thread.py", line 63, in getaddrinfo
return self.pool.apply(_socket.getaddrinfo, args, kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/gevent/pool.py", line 161, in apply
return self.spawn(func, *args, **kwds).get()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "src/gevent/event.py", line 329, in gevent._gevent_cevent.AsyncResult.get
File "src/gevent/event.py", line 356, in gevent._gevent_cevent.AsyncResult.get
File "src/gevent/_abstract_linkable.py", line 487, in gevent._gevent_c_abstract_linkable.AbstractLinkable._wait_core
File "src/gevent/_abstract_linkable.py", line 490, in gevent._gevent_c_abstract_linkable.AbstractLinkable._wait_core
File "src/gevent/_abstract_linkable.py", line 442, in gevent._gevent_c_abstract_linkable.AbstractLinkable._AbstractLinkable__wait_to_be_notified
File "src/gevent/_abstract_linkable.py", line 451, in gevent._gevent_c_abstract_linkable.AbstractLinkable._switch_to_hub
File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_greenlet_primitives.py", line 64, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_greenlet_primitives.py", line 67, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch_out
File "src/gevent/_greenlet_primitives.py", line 68, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch_out
gevent.exceptions.BlockingSwitchOutError: Impossible to call blocking function in the event loop callback
Here is my test flask app structure, without the other common packages.
__init__.py
# coding:utf-8
import logging
import os
from dotenv import load_dotenv
from flask import Flask
from flask_restx import Api, Namespace, Resource
from common.extensions.extension_logger import FlaskLogger
from configs.myapp_gunicorn_conf import name
from configs.settings import environment_config
from myapp.extensions import cors, test_service
# register api
api = Api(
version='1.0',
title='test-myapp',
default_mediatype='application/json', )
ns: Namespace = Namespace("test-myapp", description="test")
api.add_namespace(ns, path='/myapp')
@ns.route("/test")
class SearchBond(Resource):
@ns.doc("test-app")
def get(self):
return "succeed"
def create_app(config_name=None):
load_dotenv()
app = Flask(name)
if config_name is None:
config_name = os.getenv("ENV", "prod")
app.config.from_object(environment_config[config_name])
register_extensions(app)
return app
def register_extensions(app):
# a customized logger class, nothing to do with this problem
flask_logger = FlaskLogger()
flask_logger.init_app(app=app, logger_name="myapp")
api.init_app(app)
cors.init_app(app)
test_service.init_app(app)
extensions.py
# coding:utf-8
from flask_cors import CORS
from common.extensions.extension_myapp import TestService
cors = CORS()
test_service = TestService()
wsgi.py
# coding:utf-8
from system import create_app
app = create_app()
extension_myapp.py
# coding:utf-8
import logging
import os
import signal
import threading
import redis
# define a test service
class TestService:
def __init__(self):
self.from_whom = ''
self.logger = None
self.app = None
self.app = None
# set signal handler
signal.signal(signal.SIGINT, self._shutdown_signal_handler)
signal.signal(signal.SIGTERM, self._shutdown_signal_handler)
# set shutdown event
self.global_shutdown_event = threading.Event()
# initialize redis client
redis_cp = redis.ConnectionPool(
host='192.168.24.1',
port=6379,
db=0,
password='xxxxx',
decode_responses=True
)
self.redis_client = redis.StrictRedis(connection_pool=redis_cp)
def init_server(self, server):
"""init by gunicorn"""
self.server = server
self.from_whom = 'master'
self._initialize_logs()
self.logger.debug(f"{self.from_whom} {threading.current_thread()}:init server")
heartbeat_thread = threading.Thread(target=self._server_heartbeat, args=(self.global_shutdown_event,))
heartbeat_thread.daemon = True
heartbeat_thread.start()
def init_app(self, app):
"""init by app"""
self.app = app
self.from_whom = 'worker'
self._initialize_logs()
self.logger.debug(f"{self.from_whom} {threading.current_thread()}:init app")
heartbeat_thread = threading.Thread(target=self._app_heartbeat, args=(self.global_shutdown_event,))
heartbeat_thread.daemon = True
heartbeat_thread.start()
pass
def _shutdown_signal_handler(self, signum, frame):
if signum == signal.SIGINT or signum == signal.SIGTERM:
self.logger.debug(f"{self.from_whom} {threading.current_thread()}:ready to shutdown by signal")
self.shutdown()
signal.default_int_handler(signum, frame)
def _shutdown_signal_handler_by_atexit(self):
self.logger.debug(f"{self.from_whom} {threading.current_thread()}:ready to shutdown by atexit")
self.shutdown()
def shutdown(self):
self.logger.debug(f"{self.from_whom} {threading.current_thread()}:invoking shutdown")
self.global_shutdown_event.set()
# presume that there is a key named "test-key" in redis, just confirm to remove it when shutdown
self.redis_client.delete("test-key")
self.logger.debug(f"{self.from_whom} {threading.current_thread()}:all properly shutdown")
def _server_heartbeat(self, shutdown_event):
while not shutdown_event.is_set():
# print a log for demo
self.logger.debug(f"{self.from_whom} {threading.current_thread()}:heart beat succeed")
shutdown_event.wait(5)
self.logger.debug(f"{self.from_whom} {threading.current_thread()}:heartbeat properly shutdown")
def _app_heartbeat(self, shutdown_event):
while not shutdown_event.is_set():
# print a log for demo
self.logger.debug(f"{self.from_whom} {threading.current_thread()}:heart beat succeed")
shutdown_event.wait(5)
self.logger.debug(f"{self.from_whom} {threading.current_thread()}:heartbeat properly shutdown")
def _initialize_logs(self):
"""to initialize a logger"""
if self.app is None:
self.logger = logging.getLogger(f'myapp')
# start by gunicorn master, follow gunicorn settings
log_level = self.server.cfg.settings['loglevel']
log_level = log_level.__dict__['value'].upper() if log_level.__dict__['value'] else 'INFO'
log_level = logging.getLevelName(log_level)
else:
# start by worker, just use app.logger
self.logger = self.app.logger
return
# get the absolute path
script_path = os.path.dirname(os.path.abspath(__file__))
# goto log folder
log_path = os.path.join(script_path, '../logs')
# create if not exists
if not os.path.exists(log_path):
os.mkdir(log_path)
log_file = os.path.join(log_path, 'myapp.log')
# initialize log
self.logger.setLevel(log_level)
fh = logging.FileHandler(log_file, mode='a', encoding='utf-8')
fh.setLevel(log_level)
ch = logging.StreamHandler()
ch.setLevel(log_level)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(filename)s - %(lineno)d - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
if self.app is None:
# add hander only for server
self.logger.addHandler(fh)
if os.getenv('ENV', 'test') == 'dev':
# direct print to console in dev
self.logger.addHandler(ch)
self.logger.addHandler(ch)
myapp_gunicorn_conf.py
# coding:utf-8
from common.extensions.extension_myapp import TestService
bind = "0.0.0.0:5005"
workers = 5
worker_class = "gevent"
name = "myapp"
reuse_port = True
forwarded_allow_ips = "*"
proxy_allow_from = "*"
capture_output = False
access_log_format = '%(t)s "%(r)s" %(s)s %(L)s %(b)s %(h)s "%(f)s" "%(a)s"'
accesslog = '-'
graceful_timeout = 120
timeout = 600
loglevel = 'debug'
# start the server side with gunicorn webhook
register_port = int(bind.split(":")[1])
test_service = TestService()
def when_ready(server):
test_service.init_server(server)
def on_exit(server):
if test_service:
test_service.shutdown()

