Flask-APScheduler on Flask missing jobs without logging

72 Views Asked by At

I'm to developing a simple interface using Apscheduler and Flask, it works fine and the cron is on point, but in the next day the job doesnt run after the next_run_time is updated. I have a jobstore inside my db named apscheduler_jobs created by the module that stores the id, next_run_time and job_state, the next_run_time its updated to the next day but when the time comes it doesnt start the job_function (the flask server was on all the time)

I have a flask route that passes the execution of a script with args

**routes.py**
from datetime import datetime
import os
from random import randint
import subprocess
from flask import flash, jsonify, redirect, request, render_template, url_for
import pytz
from app import create_app
from apscheduler.jobstores.base import JobLookupError
app, db, scheduler = create_app()

#TODO Criar arquivo isolado models.py
class Processos(db.Model):
    __tablename__ = 'processos'
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    documento = db.Column(db.String(50), nullable=False)
    data_inicio = db.Column(db.Date, nullable=True)
    data_fim = db.Column(db.Date, nullable=True)
    status = db.Column(db.String(50), nullable=True)
    prioridade = db.Column(db.Integer, nullable=False)
    departamento = db.Column(db.String(50), nullable=True)
    tempo_gasto = db.Column(db.Date, nullable=True)
    data_criacao = db.Column(db.Date, nullable=False)
    responsavel = db.Column(db.String(50), nullable=True)

class APSchedulerJob(db.Model):
    __tablename__ = 'apscheduler_jobs'
    
    id = db.Column(db.String(191), primary_key=True)
    next_run_time = db.Column(db.Float)
    job_state = db.Column(db.LargeBinary)

def job_function(script):
    try:
        subprocess.run([r'C:\RPA\flask_scheduler\venv\Scripts\python.exe', script], check=True)
        #flash(f"Script {script} executado!")
    except subprocess.CalledProcessError as e:
        print(f"Erro ao executar o script crawler.py: {e}")

@app.route('/schedule/<int:id>', methods=['POST'])
def schedule(id)
    filename = db.session.query(Processos).filter(Processos.id == id).first().documento
    months = request.form.get('month')
    days = request.form.getlist('days')
    time = request.form.get('time')
    dates = request.form.getlist('dates')
    interval = request.form.get('interval')
    
    script_path = os.path.join(app.config['UPLOAD_PATH'], filename)
    
    if filename:
        if dates[0]:
            unique_dates = set(dates[0].split(','))  # Remove duplicatas
            months = set()
            days = set()

            for date in unique_dates:
                date_obj = datetime.strptime(date, '%Y-%m-%d')
                months.add(date_obj.month)
                days.add(date_obj.day)
            
            try:
                scheduler.add_job(id = str(id), func = job_function, args = [script_path], trigger = 'cron', month = ','.join(map(str, months)), day = ','.join(map(str, days)), hour = str(time)[:2], minute = str(time)[3:])
                flash("Job agendado","success")
            except:
                scheduler.modify_job(id = str(id), func = job_function, args = [script_path], trigger = 'cron', month = ','.join(map(str, months)), day = ','.join(map(str, days)), hour = str(time)[:2], minute = str(time)[3:])
                flash("Job reagendado","success")

            return redirect(url_for('index'))

Inside the app.py fabric I have a jobstore configured like this:

from flask import Flask
from flask_apscheduler import APScheduler
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

def create_app():
    app = Flask(__name__)
    app.secret_key = "0d(&*^*5bas34jk(&^))"
    app.config['UPLOAD_EXTENSIONS'] = ['.py']
    app.config['MAX_CONTENT_LENGTH'] = 1024 * 1024 * 2
    app.config['UPLOAD_PATH'] = 'uploads'
    app.config['SQLALCHEMY_DATABASE_URI'] = DB_URL
    app.config['SCHEDULER_JOBSTORES'] = {
        "default": SQLAlchemyJobStore(url=DB_URL)
    }

    class Config:
        SCHEDULER_API_ENABLED = True

    app.config.from_object(Config())

    db = SQLAlchemy(app)
    migrate = Migrate(app, db)
    scheduler = APScheduler()
    scheduler.init_app(app)
    scheduler.start()
    
    return app, db, scheduler

Its like the apsheduler couldnt fire the job after the first time

I tried to increase the logging level to debug and its all alright, but again, when the next run time comes it doesnt fire

Logger output:

DEBUG:apscheduler.scheduler:Looking for jobs to run INFO:apscheduler.executors.default:Running job "13 (trigger: cron[day_of_week='mon,tue,wed,thu,fri,sat,sun', hour='12', minute='6'], next run at: 2023-10-30 12:06:00 -03)" (scheduled at 2023-10-30 12:06:00-03:00) DEBUG:apscheduler.scheduler:Next wakeup is due at 2023-10-31 12:06:00-03:00 (in 86398.820154 seconds)

0

There are 0 best solutions below