Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVENT_JOB_SUBMITTED fires when job has not executed due to ThreadPoolExecutor(max_workers=1) #356

Open
ecbftw opened this issue Jan 28, 2019 · 1 comment

Comments

@ecbftw
Copy link

ecbftw commented Jan 28, 2019

I'm finding that the EVENT_JOB_SUBMITTED event executes when I configure a ThreadPoolExecutor with max_workers=1.

What I'm really trying to do is track job execution state. I want to know when a job is in one of the following states (my own nomenclature): "scheduled", "queued", "running", "completed" or "error". Since apscheduler does not provide a direct way to do this (see #332), I'm trying to use event handlers to track state of each job. This is turning out to be remarkably difficult.

By design, my application may run one of 4 different job types (job_ids), but I want only one of these jobs to run at a time. (In the future, I may expand the number/types of jobs that can run in parallel, but for now, I have set ThreadPoolExecutor's max_workers=1 and I also set the job_defaults of coalesce=True and max_instances=1. In order to queue up misfired jobs for the future, I set misfire_grace_time to a large value. This has the desired effect of queuing up all jobs behind other jobs of different types/job_ids.

Expected Behavior

Given this simple, effectively single-threaded design, I expect to be able to track state of jobs without race conditions or other issues.

Current Behavior

Unfortunately, apscheduler fails to check if the ThreadPoolExecutor's max_workers setting will cause a job to misfire, and EVENT_JOB_SUBMITTED is run even if a subsequent job is misfired and re-evaluated for execution later.

Steps to Reproduce

Here are a few snippets of code that I'm able to share. Scheduler setup:

        jobstores = {
            'default': MemoryJobStore()
        }
        executors = {
            'default': ThreadPoolExecutor(max_workers=1)
        }
        job_defaults = {
            'coalesce': True,
            'max_instances': 1,
            'misfire_grace_time': 60*60*4
        }
        self.scheduler = TornadoScheduler()
        self.scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
        self.scheduler.add_listener(self.on_job_start, apscheduler.events.EVENT_JOB_SUBMITTED)
        self.scheduler.start()

This this method is fired prematurely, causing my job to be listed as 'running' when it isn't.

    def on_job_start(self, event):
        logging.debug('*** starting job: ' + event.job_id)
        self.current_jobs[event.job_id]['status'] = 'running'
        self.current_jobs[event.job_id]['started'] = time.time()

When I add a non-scheduled job, I set up the status as 'queued', but this is always overridden immediately by the event handler above.

        self.current_jobs[job_id] = {'job_id':job_id, 'task':task,..., 'status':'queued'}
        self.scheduler.add_job(runner,
                               id=job_id,
                               replace_existing=True,
                               kwargs=kwargs)

Context (Environment)

I'm unable to properly track state on my jobs without more hacks. I'm likely going to need to add a wrapper function to each of my jobs that simply updates it's state to 'running' and abandon the EVENT_JOB_SUBMITTED event handler. If apscheduler provided a more complete API for tracking job state, then that would actually be the best situation of all.

Detailed Description

In the short term, it would be nice if EVENT_JOB_SUBMITTED was not fired when ThreadPoolExecutor's max_workers limit caused a misfire. Longer term, I'd love to see a solution for #332.

@ljluestc
Copy link

import logging
import time
from apscheduler.schedulers.tornado import TornadoScheduler
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.events import EVENT_JOB_SUBMITTED
from concurrent.futures import ThreadPoolExecutor

class JobManager:
def init(self):
self.current_jobs = {}
self._setup_scheduler()

def _setup_scheduler(self):
    jobstores = {'default': MemoryJobStore()}
    executors = {'default': ThreadPoolExecutor(max_workers=1)}
    job_defaults = {'coalesce': True, 'max_instances': 1, 'misfire_grace_time': 60*60*4}

    self.scheduler = TornadoScheduler()
    self.scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
    self.scheduler.add_listener(self.on_job_start, EVENT_JOB_SUBMITTED)
    self.scheduler.start()

def on_job_start(self, event):
    logging.debug('*** Job submitted: ' + event.job_id)
    self.current_jobs[event.job_id]['status'] = 'queued'

def job_wrapper(self, func, job_id, *args, **kwargs):
    try:
        self.current_jobs[job_id]['status'] = 'running'
        self.current_jobs[job_id]['started'] = time.time()
        result = func(*args, **kwargs)
        self.current_jobs[job_id]['status'] = 'completed'
        return result
    except Exception as e:
        self.current_jobs[job_id]['status'] = 'error'
        logging.error(f"Error in job {job_id}: {e}")
        # Handle the exception as needed

def add_job(self, func, job_id, *args, **kwargs):
    wrapped_func = lambda: self.job_wrapper(func, job_id,

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants