Race condition and interleaving when Struclog logger is used within Celery tasks

520 Views Asked by At

I use structlog and celery in my Django application and I'm having hard time when logging tasks with structlog in the console. Indeed, events are not properly aligned when printed in the console when a Celery task is being executed.

How can I fix the misalignment ?

EDIT

As you can, there is interleaving in the console when printing Task Events from django-structlog. Same thing happen if I print the logs from the applications.

Console

$ docker-compose logs -f


celery_worker-1  | 2022-10-14T15:21:11.064644Z [info     ] task_enqueued                  [django_structlog.celery.receivers] child_task_id=edad4137-4b9b-4f80-b335-65b9728afcc2 task_id=edad4137-4b9b-4f80-b335-65b9728afcc2
celery_worker-1  | 2022-10-14T15:21:15.224614Z [info     ] task_enqueued                  [django_structlog.celery.receivers] child_task_id=c2fa7030-7583-467f-96fa-0ff04b238cd7 child_task_name=Account______Fetch orders ip=178.238.xxx.ppp request_id=03a1f3ec-4a96-4bac-831f-630b9881fd41 task_id=3ee2d948-c179-43b4-8ec9-18e5a4d93327 user_id=1
celery_worker-1  | 2022-10-14T15:21:15.304021Z [info     ] task_enqueued                  [django_structlog.celery.receivers] child_task_id=f5db5a8d-d9fc-4238-b701-aefbf7412d08 child_task_name=Account______Fetch orders ip=178.238.xxx.ppp request_id=03a1f3ec-4a96-4bac-831f-630b9881fd41 task_id=3ee2d948-c179-43b4-8ec9-18e5a4d93327 user_id=1
celery_worker-1  | 2022-10-14T15:21:15.343569Z [info     ] task_succeeded                 [django_structlog.celery.receivers] ip=178.238.xxx.ppp request_id=03a1f3ec-4a96-4bac-831f-630b9881fd41 task_id=3ee2d948-c179-43b4-8ec9-18e5a4d93327 user_id=1
celery_worker-1  | 2022-10-14T15:21:17.036935Z [info     ] task_enqueued                  [django_structlog.celery.receivers] child_task_id2022-10-14T15:21:17.039814Z=bfdbad61-5a09-41a1-b96d-1fe13c8b6cf0 child_task_name=PnL_____Update_asset_inventory ip=178.238.xxx.ppp parent_task_id=f5db5a8d-d9fc-4238-b701-aefbf7412d08 request_id=03a1f3ec-4a96-4bac-831f-630b9881fd41 task_id=c5c6a3d8-d371-45c5-bc15-d555c40c9fe7 user_id=1
celery_worker-1  |  [info     ] task_enqueued                  [django_structlog.celery.receivers] child_task_id=c5c6a3d8-d371-45c5-bc15-d555c40c9fe7 child_task_name=PnL_____Update_inventories ip=178.238.xxx.ppp parent_task_id=3ee2d948-c179-43b4-8ec9-18e5a4d93327 request_id=03a1f3ec-4a96-4bac-831f-630b9881fd41 task_id=f5db5a8d-d9fc-4238-b701-aefbf7412d08 user_id=1
celery_worker-1  | 2022-10-14T15:21:17.058236Z2022-10-14T15:21:17.059109Z [info     ] task_enqueued                  [django_structlog.celery.receivers] child_task_id=92a94e61-0d86-4e12-be02-e54fda559df3  [info     child_task_name] =task_succeeded                PnL_____Update_contract_inventory [ django_structlog.celery.receivers] ipip=178.238.xxx.ppp parent_task_id==178.238.xxx.ppp f5db5a8d-d9fc-4238-b701-aefbf7412d08parent_task_id =request_id3ee2d948-c179-43b4-8ec9-18e5a4d93327 request_id=03a1f3ec-4a96-4bac-831f-630b9881fd41 task_id=f5db5a8d-d9fc-4238-b701-aefbf7412d08= 03a1f3ec-4a96-4bac-831f-630b9881fd41user_id =task_id1
celery_worker-1  | =c5c6a3d8-d371-45c5-bc15-d555c40c9fe7 user_id=1
celery_worker-1  | 2022-10-14T15:21:17.077882Z [info     ] task_succeeded                 [django_structlog.celery.receivers] ip=178.238.xxx.ppp parent_task_id=f5db5a8d-d9fc-4238-b701-aefbf7412d08 request_id=03a1f3ec-4a96-4bac-831f-630b9881fd41 task_id=c5c6a3d8-d371-45c5-bc15-d555c40c9fe7 user_id=1
celery_worker-1  | 2022-10-14T15:21:17.149324Z [info     ] task_succeeded                 [django_structlog.celery.receivers] ip=178.238.xxx.ppp parent_task_id=c5c6a3d8-d371-45c5-bc15-d555c40c9fe7 request_id=03a1f3ec-4a96-4bac-831f-630b9881fd41 task_id=92a94e61-0d86-4e12-be02-e54fda559df3 user_id=1
celery_worker-1  | 2022-10-14T15:21:17.171073Z [info     ] task_succeeded                 [django_structlog.celery.receivers] ip=178.238.xxx.ppp parent_task_id=c5c6a3d8-d371-45c5-bc15-d555c40c9fe7 request_id=03a1f3ec-4a96-4bac-831f-630b9881fd41 task_id=bfdbad61-5a09-41a1-b96d-1fe13c8b6cf0 user_id=1
celery_worker-1  | 2022-10-14T15:21:18.477145Z [info     2022-10-14T15:21:18.478784Z] task_enqueued                  [django_structlog.celery.receivers] child_task_id=99175fd3-12c7-4354-bdce-60dce014b888 child_task_name=PnL_____Update_asset_inventory ip=178.238.xxx.ppp parent_task_id=c2fa7030-7583-467f-96fa-0ff04b238cd7 request_id=03a1f3ec-4a96-4bac-831f-630b9881fd41 task_id=9fc68b47-4c56-44bc-a5c1-5df1062c98ef user_id=1
celery_worker-1  |  [info     ] task_enqueued                  [django_structlog.celery.receivers] child_task_id=9fc68b47-4c56-44bc-a5c1-5df1062c98ef child_task_name=PnL_____Update_inventories ip=178.238.xxx.ppp parent_task_id=3ee2d948-c179-43b4-8ec9-18e5a4d93327 request_id=03a1f3ec-4a96-4bac-831f-630b9881fd41 task_id=c2fa7030-7583-467f-96fa-0ff04b238cd7 user_id=1
celery_worker-1  | 2022-10-14T15:21:18.504804Z2022-10-14T15:21:18.505474Z [info     ] task_succeeded                 [django_structlog.celery.receivers] ip=178.238.xxx.ppp parent_task_id=3ee2d948-c179-43b4-8ec9-18e5a4d93327 request_id [=info     03a1f3ec-4a96-4bac-831f-630b9881fd41 task_id=c2fa7030-7583-467f-96fa-0ff04b238cd7 user_id=1
celery_worker-1  | ] task_enqueued                  [django_structlog.celery.receivers] child_task_id=27211ff3-15df-4d2f-a040-a4d7acf7441f child_task_name=PnL_____Update_contract_inventory ip=178.238.xxx.ppp parent_task_id=c2fa7030-7583-467f-96fa-0ff04b238cd7 request_id=03a1f3ec-4a96-4bac-831f-630b9881fd41 task_id=9fc68b47-4c56-44bc-a5c1-5df1062c98ef user_id=1
celery_worker-1  | 2022-10-14T15:21:18.541556Z [info     ] task_succeeded                 [django_structlog.celery.receivers] ip=178.238.xxx.ppp parent_task_id=c2fa7030-7583-467f-96fa-0ff04b238cd7 request_id=03a1f3ec-4a96-4bac-831f-630b9881fd41 task_id=9fc68b47-4c56-44bc-a5c1-5df1062c98ef user_id=1
celery_worker-1  | 2022-10-14T15:21:18.581544Z [info     ] task_succeeded                 [django_structlog.celery.receivers] ip=178.238.xxx.ppp parent_task_id=9fc68b47-4c56-44bc-a5c1-5df1062c98ef request_id=03a1f3ec-4a96-4bac-831f-630b9881fd41 task_id=99175fd3-12c7-4354-bdce-60dce014b888 user_id=1
celery_worker-1  | 2022-10-14T15:21:18.620567Z [info     ] task_succeeded                 [django_structlog.celery.receivers] ip=178.238.xxx.ppp parent_task_id=9fc68b47-4c56-44bc-a5c1-5df1062c98ef request_id=03a1f3ec-4a96-4bac-831f-630b9881fd41 task_id=27211ff3-15df-4d2f-a040-a4d7acf7441f user_id=1

Packages version

celery==5.2.7  # latest
django-structlog==3.0.1  # latest
structlog==22.1.0  # latest

tasks.py

import structlog

logger = structlog.get_logger(__name__)


@app.task(bind=True, name='Update_inventory')
def update_inventory(self, pk):

    account = Account.objects.get(pk=pk)
    log_asset = logger.bind(account=account.name)
    log_asset.info('Log event')
    ...

celery.py

import structlog
from django_structlog.celery.steps import DjangoStructLogInitStep

app.steps['worker'].add(DjangoStructLogInitStep)


@setup_logging.connect
def receiver_setup_logging(loglevel, logfile, format, colorize, **kwargs):  # pragma: no cover
    logging.config.dictConfig(
        {
            "version": 1,
            "disable_existing_loggers": False,
            "formatters": {
                "json_formatter": {
                    "()": structlog.stdlib.ProcessorFormatter,
                    "processor": structlog.processors.JSONRenderer(),
                    "foreign_pre_chain": [
                        structlog.contextvars.merge_contextvars,
                        structlog.processors.TimeStamper(fmt="iso"),
                        structlog.stdlib.add_logger_name,
                        structlog.stdlib.add_log_level,
                        structlog.stdlib.PositionalArgumentsFormatter(),
                    ],
                },
                "plain_console": {
                    "()": structlog.stdlib.ProcessorFormatter,
                    "processor": structlog.dev.ConsoleRenderer(),
                },
                "key_value": {
                    "()": structlog.stdlib.ProcessorFormatter,
                    "processor": structlog.processors.KeyValueRenderer(
                        key_order=['timestamp', 'level', 'event', 'logger']),
                },
            },
            "handlers": {
                "console": {
                    "class": "logging.StreamHandler",
                    "formatter": "plain_console",
                },
                "json_file": {
                    "class": "logging.handlers.WatchedFileHandler",
                    "filename": "logs/json.log",
                    "formatter": "json_formatter",
                },
                "flat_line_file": {
                    "class": "logging.handlers.WatchedFileHandler",
                    "filename": "logs/flat_line.log",
                    "formatter": "key_value",
                },
            },
            "loggers": 
                "django_structlog": {
                    "handlers": ["console", "flat_line_file", "json_file"],
                    "level": "INFO",
                    'propagate': False
                },
            }
        }
    )

    structlog.configure(
        processors=[
            structlog.contextvars.merge_contextvars,
            structlog.stdlib.filter_by_level,
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.stdlib.add_logger_name,
            structlog.stdlib.add_log_level,
            structlog.stdlib.PositionalArgumentsFormatter(),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.processors.UnicodeDecoder(),
            structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
        ],
        logger_factory=structlog.stdlib.LoggerFactory(),
        cache_logger_on_first_use=True,
    )

settings.py

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'json_formatter': {
            '()': structlog.stdlib.ProcessorFormatter,
            'processor': structlog.processors.JSONRenderer(sort_keys=False),
        },
        'plain_console': {
            '()': structlog.stdlib.ProcessorFormatter,
            'processor': structlog.dev.ConsoleRenderer(pad_event=43, colors=True, force_colors=True),
        },
        'key_value': {
            '()': structlog.stdlib.ProcessorFormatter,
            'processor': structlog.processors.KeyValueRenderer(key_order=['level',
                                                                          'logger',
                                                                          'event',
                                                                          'timestamp'],
                                                               sort_keys=False
                                                               ),
        },
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'plain_console',
        },
        'json_file': {
            'class': 'logging.handlers.WatchedFileHandler',
            'filename': 'logs/json.log',
            'formatter': 'json_formatter',
        },
        'flat_line_file': {
            'class': 'logging.handlers.WatchedFileHandler',
            'filename': 'logs/flat_line.log',
            'formatter': 'key_value',
        },
    },
    'loggers': {
        'django-structlog': {
            'handlers': ['console', 'flat_line_file', 'json_file'],
            'level': 'WARNING',
            'propagate': False,
        }
    }
}

structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.stdlib.filter_by_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.stdlib.PositionalArgumentsFormatter(),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.UnicodeDecoder(),
        structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
    ],
    logger_factory=structlog.stdlib.LoggerFactory(),
    cache_logger_on_first_use=True,
)
0

There are 0 best solutions below