gunicorn workers not shutdown properly

29 Views Asked by At

I am a Python novice, and I am trying to develop an app using Flask. The app has a feature that needs to execute some processing on the gunicorn master using the server hook and perform other actions on the workers.

During debugging, I found that the app runs normally on Pycharm and exit properly, but when I try to test it on linux server by using Gunicorn, it crashes sometimes. When shutting down the gunicorn master, I always encounter errors. Additionally, workers' threads occasionally hang, and they remain stuck until I kill them using kill -9.

enter image description here

I have tried some solutions I can find by GPT or google, but it seems that none of them can solve these problems.

For example, I tried putting the Redis operations into gevent.spawn_raw(self.shutdown), but I still cannot gracefully shut down my workers.

I have print my code here, I think there must be something That I missed. Could anyone help me for this case?

The Error is :

[2024-03-08 19:22:52 +0800] [16485] [ERROR] Exception in worker process
Traceback (most recent call last):
  File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/gunicorn/arbiter.py", line 589, in spawn_worker
    worker.init_process()
  File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/gunicorn/workers/ggevent.py", line 146, in init_process
    super().init_process()
  File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/gunicorn/workers/base.py", line 142, in init_process
    self.run()
  File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/gunicorn/workers/ggevent.py", line 87, in run
    gevent.sleep(1.0)
  File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/gevent/hub.py", line 166, in sleep
    hub.wait(t)
  File "src/gevent/_hub_primitives.py", line 46, in gevent._gevent_c_hub_primitives.WaitOperationsGreenlet.wait
  File "src/gevent/_hub_primitives.py", line 55, in gevent._gevent_c_hub_primitives.WaitOperationsGreenlet.wait
  File "src/gevent/_waiter.py", line 154, in gevent._gevent_c_waiter.Waiter.get
  File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_greenlet_primitives.py", line 65, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_gevent_c_greenlet_primitives.pxd", line 35, in gevent._gevent_c_greenlet_primitives._greenlet_switch
  File "/data/code/my_demo_app/common/extensions/extension_myapp.py", line 64, in _shutdown_signal_handler
    self.shutdown()
  File "/data/code/my_demo_app/common/extensions/extension_myapp.py", line 75, in shutdown
    self.redis_client.delete("test-key")
  File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/redis/commands/core.py", line 1712, in delete
    return self.execute_command("DEL", *names)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/redis/client.py", line 533, in execute_command
    conn = self.connection or pool.get_connection(command_name, **options)
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/redis/connection.py", line 1086, in get_connection
    connection.connect()
  File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/redis/connection.py", line 264, in connect
    sock = self.retry.call_with_retry(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/redis/retry.py", line 46, in call_with_retry
    return do()
           ^^^^
  File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/redis/connection.py", line 265, in <lambda>
    lambda: self._connect(), lambda error: self.disconnect(error)
            ^^^^^^^^^^^^^^^
  File "/data/code/my_demo_app## Heading ##/venv/lib/python3.11/site-packages/redis/connection.py", line 595, in _connect
    for res in socket.getaddrinfo(
               ^^^^^^^^^^^^^^^^^^^
  File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/gevent/_socketcommon.py", line 247, in getaddrinfo
    addrlist = get_hub().resolver.getaddrinfo(host, port, family, type, proto, flags)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/gevent/resolver/thread.py", line 63, in getaddrinfo
    return self.pool.apply(_socket.getaddrinfo, args, kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data/code/my_demo_app/venv/lib/python3.11/site-packages/gevent/pool.py", line 161, in apply
    return self.spawn(func, *args, **kwds).get()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "src/gevent/event.py", line 329, in gevent._gevent_cevent.AsyncResult.get
  File "src/gevent/event.py", line 356, in gevent._gevent_cevent.AsyncResult.get
  File "src/gevent/_abstract_linkable.py", line 487, in gevent._gevent_c_abstract_linkable.AbstractLinkable._wait_core
  File "src/gevent/_abstract_linkable.py", line 490, in gevent._gevent_c_abstract_linkable.AbstractLinkable._wait_core
  File "src/gevent/_abstract_linkable.py", line 442, in gevent._gevent_c_abstract_linkable.AbstractLinkable._AbstractLinkable__wait_to_be_notified
  File "src/gevent/_abstract_linkable.py", line 451, in gevent._gevent_c_abstract_linkable.AbstractLinkable._switch_to_hub
  File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_greenlet_primitives.py", line 64, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_greenlet_primitives.py", line 67, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch_out
  File "src/gevent/_greenlet_primitives.py", line 68, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch_out
gevent.exceptions.BlockingSwitchOutError: Impossible to call blocking function in the event loop callback

Here is my test flask app structure, without the other common packages.

enter image description here

__init__.py

# coding:utf-8
import logging
import os

from dotenv import load_dotenv
from flask import Flask
from flask_restx import Api, Namespace, Resource

from common.extensions.extension_logger import FlaskLogger
from configs.myapp_gunicorn_conf import name
from configs.settings import environment_config
from myapp.extensions import cors, test_service

# register api
api = Api(
    version='1.0',
    title='test-myapp',
    default_mediatype='application/json', )
ns: Namespace = Namespace("test-myapp", description="test")
api.add_namespace(ns, path='/myapp')


@ns.route("/test")
class SearchBond(Resource):
    @ns.doc("test-app")
    def get(self):
        return "succeed"


def create_app(config_name=None):
    load_dotenv()
    app = Flask(name)
    if config_name is None:
        config_name = os.getenv("ENV", "prod")
    app.config.from_object(environment_config[config_name])
    register_extensions(app)
    return app


def register_extensions(app):
    # a customized logger class, nothing to do with this problem
    flask_logger = FlaskLogger()
    flask_logger.init_app(app=app, logger_name="myapp")

    api.init_app(app)
    cors.init_app(app)
    test_service.init_app(app)

extensions.py

# coding:utf-8

from flask_cors import CORS

from common.extensions.extension_myapp import TestService

cors = CORS()
test_service = TestService()

wsgi.py

# coding:utf-8

from system import create_app

app = create_app()

extension_myapp.py

# coding:utf-8
import logging
import os
import signal
import threading

import redis


# define a test service
class TestService:
    def __init__(self):
        self.from_whom = ''
        self.logger = None
        self.app = None
        self.app = None
        
        # set signal handler
        signal.signal(signal.SIGINT, self._shutdown_signal_handler)
        signal.signal(signal.SIGTERM, self._shutdown_signal_handler)

        # set shutdown event
        self.global_shutdown_event = threading.Event()

        # initialize redis client
        redis_cp = redis.ConnectionPool(
            host='192.168.24.1',
            port=6379,
            db=0,
            password='xxxxx',
            decode_responses=True
        )
        self.redis_client = redis.StrictRedis(connection_pool=redis_cp)

    def init_server(self, server):
        """init by gunicorn"""
        self.server = server
        self.from_whom = 'master'
        self._initialize_logs()
        self.logger.debug(f"{self.from_whom} {threading.current_thread()}:init server")
        heartbeat_thread = threading.Thread(target=self._server_heartbeat, args=(self.global_shutdown_event,))
        heartbeat_thread.daemon = True
        heartbeat_thread.start()

    def init_app(self, app):
        """init by app"""
        self.app = app
        self.from_whom = 'worker'
        self._initialize_logs()
        self.logger.debug(f"{self.from_whom} {threading.current_thread()}:init app")
        heartbeat_thread = threading.Thread(target=self._app_heartbeat, args=(self.global_shutdown_event,))
        heartbeat_thread.daemon = True
        heartbeat_thread.start()
        pass

    def _shutdown_signal_handler(self, signum, frame):
        if signum == signal.SIGINT or signum == signal.SIGTERM:
            self.logger.debug(f"{self.from_whom} {threading.current_thread()}:ready to shutdown by signal")
            self.shutdown()
        signal.default_int_handler(signum, frame)

    def _shutdown_signal_handler_by_atexit(self):
        self.logger.debug(f"{self.from_whom} {threading.current_thread()}:ready to shutdown by atexit")
        self.shutdown()

    def shutdown(self):
        self.logger.debug(f"{self.from_whom} {threading.current_thread()}:invoking shutdown")
        self.global_shutdown_event.set()
        # presume that there is a key named "test-key" in redis, just confirm to remove it when shutdown
        self.redis_client.delete("test-key")
        self.logger.debug(f"{self.from_whom} {threading.current_thread()}:all properly shutdown")

    def _server_heartbeat(self, shutdown_event):
        while not shutdown_event.is_set():
            # print a log for demo
            self.logger.debug(f"{self.from_whom} {threading.current_thread()}:heart beat succeed")
            shutdown_event.wait(5)
        self.logger.debug(f"{self.from_whom} {threading.current_thread()}:heartbeat properly shutdown")

    def _app_heartbeat(self, shutdown_event):
        while not shutdown_event.is_set():
            # print a log for demo
            self.logger.debug(f"{self.from_whom} {threading.current_thread()}:heart beat succeed")
            shutdown_event.wait(5)
        self.logger.debug(f"{self.from_whom} {threading.current_thread()}:heartbeat properly shutdown")

    def _initialize_logs(self):
        """to initialize a logger"""
        if self.app is None:
            self.logger = logging.getLogger(f'myapp')
            # start by gunicorn master, follow gunicorn settings
            log_level = self.server.cfg.settings['loglevel']
            log_level = log_level.__dict__['value'].upper() if log_level.__dict__['value'] else 'INFO'
            log_level = logging.getLevelName(log_level)
        else:
            # start by worker, just use app.logger
            self.logger = self.app.logger
            return
        # get the absolute path
        script_path = os.path.dirname(os.path.abspath(__file__))
        # goto log folder
        log_path = os.path.join(script_path, '../logs')
        # create if not exists
        if not os.path.exists(log_path):
            os.mkdir(log_path)
        log_file = os.path.join(log_path, 'myapp.log')
        # initialize log
        self.logger.setLevel(log_level)
        fh = logging.FileHandler(log_file, mode='a', encoding='utf-8')
        fh.setLevel(log_level)
        ch = logging.StreamHandler()
        ch.setLevel(log_level)
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(filename)s - %(lineno)d - %(message)s')
        fh.setFormatter(formatter)
        ch.setFormatter(formatter)
        if self.app is None:
            # add hander only for server
            self.logger.addHandler(fh)
            if os.getenv('ENV', 'test') == 'dev':
                # direct print to console in dev
                self.logger.addHandler(ch)
            self.logger.addHandler(ch)

myapp_gunicorn_conf.py

# coding:utf-8
from common.extensions.extension_myapp import TestService

bind = "0.0.0.0:5005"
workers = 5
worker_class = "gevent"
name = "myapp"
reuse_port = True

forwarded_allow_ips = "*"
proxy_allow_from = "*"

capture_output = False
access_log_format = '%(t)s "%(r)s" %(s)s %(L)s %(b)s %(h)s "%(f)s" "%(a)s"'
accesslog = '-'

graceful_timeout = 120
timeout = 600
loglevel = 'debug'

# start the server side with gunicorn webhook
register_port = int(bind.split(":")[1])
test_service = TestService()


def when_ready(server):
    test_service.init_server(server)


def on_exit(server):
    if test_service:
        test_service.shutdown()

0

There are 0 best solutions below